// Copyright (c) 2012-2017 VideoStitch SAS // Copyright (c) 2018 stitchEm #include "bufferedReader.hpp" namespace VideoStitch { namespace Core { PotentialValue allocate(size_t size, GPU::HostBuffer) { auto host = GPU::HostBuffer::allocate(size, "Input Frames", GPUHostAllocPinned | GPUHostAllocHostWriteOnly); FAIL_RETURN(host.status()); return Buffer{host.value()}; } PotentialValue allocate(size_t size, GPU::Buffer) { auto device = GPU::Buffer::allocate(size, "Input Frames"); FAIL_RETURN(device.status()); return Buffer{device.value()}; } template PotentialValue> allocateBuffers(const Input::VideoReader::Spec& readerSpec, unsigned numBuffers) { std::vector buffers; auto tryAllocatingBuffers = [&]() -> Status { for (unsigned i = 0; i < numBuffers; i++) { PotentialValue buf = allocate(readerSpec.frameDataSize, buffer_t()); FAIL_RETURN(buf.status()); buffers.push_back(buf.value()); } return Status::OK(); }; Status allocationStatus = tryAllocatingBuffers(); if (!allocationStatus.ok()) { for (auto buf : buffers) { buf.release(); } return allocationStatus; } return buffers; } BufferedReader::BufferedReader(std::shared_ptr delegate, std::vector buffers) : lastLoadedFrame(), delegate(delegate) { lastLoadedFrame.readerStatus = Input::ReadStatus::fromCode(); for (auto buf : buffers) { // no need to lock, haven't started yet availableBuffers.push(buf); } start(); } Potential BufferedReader::create(std::shared_ptr reader, unsigned preloadCacheSize) { FAIL_RETURN(GPU::useDefaultBackendDevice()); // one buffer for async loading // another buffer for the last loaded frame, which needs to be kept for possible reloads and thus is unavailable for // preloading unsigned minNumBuffers = 2; unsigned numBuffers = preloadCacheSize + minNumBuffers; switch (reader->getSpec().addressSpace) { case Host: { auto buffers = allocateBuffers>(reader->getSpec(), numBuffers); FAIL_RETURN(buffers.status()); return new BufferedReader(reader, buffers.value()); } case Device: { auto buffers = allocateBuffers>(reader->getSpec(), numBuffers); FAIL_RETURN(buffers.status()); return new BufferedReader(reader, buffers.value()); } } assert(false); return Status::OK(); } BufferedReader::~BufferedReader() { { std::lock_guard la(availableMutex); stoppingAvailable = true; } availableCV.notify_all(); { std::lock_guard ll(loadedMutex); stoppingLoaded = true; } loadedCV.notify_all(); { // make sure any outstanding load have finished std::lock_guard loadingLock(loadedMutex); // flush reload frame so it can be released updateCurrentFrame({Input::ReadStatus::fromCode(), 0, Buffer()}); } // wait for background reading to wind down join(); { std::lock_guard la(availableMutex); while (!availableBuffers.empty()) { availableBuffers.front().release(); availableBuffers.pop(); } } { std::lock_guard ll(loadedMutex); while (!loadedFrames.empty()) { auto loaded = std::move(loadedFrames.front()); loadedFrames.pop(); loaded.buffer.release(); } } } void BufferedReader::run() { GPU::useDefaultBackendDevice(); for (;;) { Buffer frame; { std::unique_lock lock(availableMutex); availableCV.wait(lock, [&]() { return !availableBuffers.empty() || stoppingAvailable; }); // queue has stopped, wind down reading as well if (stoppingAvailable) { return; } frame = availableBuffers.front(); availableBuffers.pop(); } { mtime_t date; Input::ReadStatus readStatus; std::lock_guard lock(delegateMutex); readStatus = delegate->readFrame(date, frame.rawPtr()); { InputFrame loaded{readStatus, date, frame}; std::lock_guard lock(loadedMutex); loadedFrames.push(std::move(loaded)); } } loadedCV.notify_one(); } } Status BufferedReader::seekFrame(frameid_t seekFrame) { // block readFrame std::lock_guard delegateLock(delegateMutex); // block loading std::lock_guard loadingLock(loadedMutex); if (stoppingLoaded) { return Status::OK(); } std::vector localLoadedFrames; while (!loadedFrames.empty()) { InputFrame loaded = std::move(loadedFrames.front()); loadedFrames.pop(); localLoadedFrames.push_back(std::move(loaded)); } bool seekTargetFrameIsCached = false; // TODO API to convert frame ID (used by seek) <--> date (used by readFrame) ? mtime_t seekDate = (mtime_t)((double)seekFrame * 1000000.0 * (double)getSpec().frameRate.den / (double)getSpec().frameRate.num); for (auto& frame : localLoadedFrames) { mtime_t frameDate = frame.date; if (frameDate == seekDate) { seekTargetFrameIsCached = true; } if (seekTargetFrameIsCached) { loadedFrames.push(std::move(frame)); } else { makeBufferAvailable(frame.buffer); } } if (seekTargetFrameIsCached) { return Status::OK(); } return delegate->seekFrame(seekFrame); } void BufferedReader::makeBufferAvailable(Buffer buf) { { std::lock_guard la(availableMutex); availableBuffers.push(buf); } availableCV.notify_one(); } InputFrame BufferedReader::fetchLoadedFrame() { std::unique_lock lock(loadedMutex); loadedCV.wait(lock, [&]() { return !loadedFrames.empty() || stoppingLoaded; }); // queue has stopped, wind down reading as well if (stoppingLoaded) { return {Input::ReadStatus::fromCode(), -1, Core::Buffer()}; } InputFrame loaded = std::move(loadedFrames.front()); loadedFrames.pop(); return loaded; } InputFrame BufferedReader::load() { InputFrame loadedFrame = fetchLoadedFrame(); updateCurrentFrame(loadedFrame); return loadedFrame; } InputFrame BufferedReader::reload() { std::lock_guard lock(borrowedMutex); borrowed[lastLoadedFrame.buffer]++; return lastLoadedFrame; } void BufferedReader::updateCurrentFrame(InputFrame frame) { std::lock_guard lock(borrowedMutex); InputFrame lastFrame = lastLoadedFrame; lastLoadedFrame = frame; // 1st borrow for the frame that load() returns // 2nd: keep it around for possible reloads borrowed[lastLoadedFrame.buffer] = 2; releaseBuffer(lastFrame.buffer); } void BufferedReader::releaseBuffer(Buffer frame) { std::lock_guard lock(borrowedMutex); if (frame.rawPtr()) { borrowed[frame]--; if (borrowed[frame] == 0) { makeBufferAvailable(frame); borrowed.erase(frame); } } } } // namespace Core } // namespace VideoStitch