// Copyright (c) 2012-2017 VideoStitch SAS // Copyright (c) 2018 stitchEm #pragma once #include "frameBuffer.hpp" #include "stitchOutput.hpp" #include "stereoOutput.hpp" #include "libvideostitch/controller.hpp" #include <atomic> #include <deque> #include <memory> #include <mutex> #include <thread> namespace VideoStitch { namespace Output { class VideoWriter; class StereoWriter; } // namespace Output namespace Core { /** * A set of host panoramic buffers that can be filled asynchronously. * * It buffers N frames and waits when no frame is available. * Warning: on destruction, waits for all pending frames to be written. * This can effectively block forever if there are missing frames. * * Frames should start at 0. * * Past frames will be ignored, current frame undergoes a * special treatment if refilled (for restitch). * Future frames are buffered when possible, if not fill() * becomes blocking. * * 3 threads are manipulating the panoramic buffers. * - "register callbacks" thread : read all buffers, don't change their state (stitched / blank / in use), * modify them * - "stitcher" thread : pop one from blanks, put one in stitched * - "consumer" thread : pop one from stitched, put one in blank */ template <typename FrameBuffer> class AsyncBuffer { public: typedef FrameBuffer* Frame; typedef FrameBuffer FB; Status initialize(const std::vector<std::shared_ptr<typename FrameBuffer::Surface>>&, const std::vector<std::shared_ptr<Output::VideoWriter>>& writers); virtual ~AsyncBuffer(); Status pushVideo(mtime_t date); Status registerWriters(const std::vector<std::shared_ptr<Output::VideoWriter>>&); Status registerWriter(std::shared_ptr<Output::VideoWriter>); protected: void synchronize(FrameBuffer* frame) { frame->streamSynchronize(); } FrameBuffer* getUsedFrame(mtime_t date); FrameBuffer* getCurrentFrame(mtime_t date); // Frames std::mutex bkMu; std::condition_variable bkCond; std::deque<FrameBuffer*> blankFrames; std::mutex stMu; std::condition_variable stCond; std::deque<std::pair<mtime_t, FrameBuffer*>> stitchedFrames; bool shutDown = false; // Hold the frames while the stitcher schedules everything // The stitcher asks first for the device buffer in which // to schedule the stitching, then, once everything is scheduled, // it asks for the host buffer where to render the results. // // Since multiple frames can be scheduled at the same time, if more // than double buffering is used, we ask // to store the correspondence in between. // Unlocked, since only the "stitcher" thread // accesses it. std::map<mtime_t, FrameBuffer*> inUse; // used by the "register" thread only std::vector<FrameBuffer*> allFrames; }; class AsyncSourceBuffer : public AsyncBuffer<SourceFrameBuffer> { public: static Potential<AsyncSourceBuffer> create(const std::vector<std::shared_ptr<SourceFrameBuffer::Surface>>&, const std::vector<std::shared_ptr<Output::VideoWriter>>&); GPU::Surface& acquireFrame(mtime_t date, GPU::Stream& stream); }; class AsyncPanoBuffer : public AsyncBuffer<PanoFrameBuffer> { public: static Potential<AsyncPanoBuffer> create(const std::vector<std::shared_ptr<PanoFrameBuffer::Surface>>&, const std::vector<std::shared_ptr<Output::VideoWriter>>&); PanoSurface& acquireFrame(mtime_t date); }; /** * A set of host stereoscopic buffers that can be filled asynchronously. * * It buffers N frames and waits when no frame is available. * Warning: on destruction, waits for all pending frames to be written. * This can effectively block forever if there are missing frames. * * Frames should start at 0. * * Past frames will be ignored, current frame undergoes a * special treatment if refilled (for restitch). * Future frames are buffered when possible, if not fill() * becomes blocking. * 3 types of threads are manipulating the panoramic buffers. * - "register callbacks" thread : read all buffers, don't change their state (stitched / blank / in use), * modify them * - 2 "stitcher" threads : pop one from blanks, put one in stitched, collaborate (one pop - one push) * - "consumer" thread : pop one from stitched, put one in blank */ class AsyncStereoBuffer { public: typedef std::pair<StereoFrameBuffer*, StereoFrameBuffer*> Frame; typedef PanoFrameBuffer FB; static Potential<AsyncStereoBuffer> create(const std::vector<std::shared_ptr<PanoSurface>>&, const std::vector<std::shared_ptr<Output::StereoWriter>>& writers); Status initialize(const std::vector<std::shared_ptr<PanoSurface>>&, const std::vector<std::shared_ptr<Output::StereoWriter>>& writers); virtual ~AsyncStereoBuffer(); PanoSurface& acquireLeftFrame(mtime_t); PanoSurface& acquireRightFrame(mtime_t); Status pushVideo(mtime_t date, Eye eye); Status registerWriters(const std::vector<std::shared_ptr<Output::StereoWriter>>&); Status registerWriter(std::shared_ptr<Output::StereoWriter>); protected: AsyncStereoBuffer() {} PanoSurface& acquireFrame(mtime_t date, Eye eye); void synchronize(Frame frame); Frame getUsedFrame(mtime_t date); Frame getCurrentFrame(mtime_t date); // Frames std::mutex bkMu; std::condition_variable bkCond; std::deque<Frame> blankFrames; std::mutex stMu; std::condition_variable stCond; std::deque<std::pair<mtime_t, Frame>> stitchedFrames; bool shutDown = false; // Hold the frames while the stitcher schedules everything // The stitcher asks first for the device buffer in which // to schedule the stitching, then, once everything is scheduled, // it asks for the host buffer where to render the results. // // Since multiple frames can be scheduled at the same time, if more // than double buffering is used, we ask // to store the correspondence in between. std::map<mtime_t, Frame> inUse; // used by the "register" thread only std::vector<Frame> allFrames; }; template <typename Pimpl, typename AsyncBuffer, typename Pusher, typename Device> class AsyncBufferedOutput : public Pimpl, protected AsyncBuffer, protected Pusher { public: typedef typename Pimpl::Writer Writer; AsyncBufferedOutput(const std::vector<std::shared_ptr<typename AsyncBuffer::FB::Surface>>&, const std::vector<std::shared_ptr<Writer>>& writers); virtual ~AsyncBufferedOutput(); virtual bool setRenderers(const std::vector<std::shared_ptr<typename AsyncBuffer::FB::Renderer>>&) override; virtual bool addRenderer(std::shared_ptr<typename AsyncBuffer::FB::Renderer> renderer) override { return Pusher::addRenderer(renderer); } virtual bool removeRenderer(const std::string& name) override { return Pusher::removeRenderer(name); } virtual bool setWriters(const std::vector<std::shared_ptr<Writer>>&) override; virtual bool addWriter(std::shared_ptr<Writer>) override; virtual bool removeWriter(const std::string&) override; virtual bool updateWriter(const std::string&, const Ptv::Value&) override; protected: Status initialize(const std::vector<std::shared_ptr<typename AsyncBuffer::FB::Surface>>&, const std::vector<std::shared_ptr<Writer>>& writers); private: static void consumerThread(AsyncBufferedOutput* that); std::thread* worker; std::atomic<bool> shutdown; }; class AsyncSourceOutput : public AsyncBufferedOutput<ExtractOutput::Pimpl, AsyncSourceBuffer, WriterPusher<SourceFrameBuffer>, PanoDeviceDefinition> { public: typedef AsyncBufferedOutput<ExtractOutput::Pimpl, AsyncSourceBuffer, WriterPusher<SourceFrameBuffer>, PanoDeviceDefinition> Base; static Potential<AsyncSourceOutput> create(const std::vector<std::shared_ptr<SourceSurface>>& surfs, const std::vector<std::shared_ptr<SourceRenderer>>& renderers, const std::vector<std::shared_ptr<Output::VideoWriter>>& writers, int source) { AsyncSourceOutput* aso = new AsyncSourceOutput(surfs, renderers, writers, source); FAIL_RETURN(aso->initialize(surfs, writers)); return aso; } Status pushVideo(mtime_t date) override { return AsyncBuffer<SourceFrameBuffer>::pushVideo(date); } GPU::Surface& acquireFrame(mtime_t date, GPU::Stream& stream) override { return AsyncSourceBuffer::acquireFrame(date, stream); } private: AsyncSourceOutput(const std::vector<std::shared_ptr<SourceSurface>>& surfs, const std::vector<std::shared_ptr<SourceRenderer>>& renderers, const std::vector<std::shared_ptr<Output::VideoWriter>>& writers, int source) : Base(surfs, writers) { sourceIdx = source; setRenderers(renderers); } }; class AsyncStitchOutput : public AsyncBufferedOutput<StitchOutput::Pimpl, AsyncPanoBuffer, WriterPusher<PanoFrameBuffer>, PanoDeviceDefinition> { public: static Potential<AsyncStitchOutput> create(const std::vector<std::shared_ptr<PanoSurface>>& surfs, const std::vector<std::shared_ptr<PanoRenderer>>& renderers, const std::vector<std::shared_ptr<Output::VideoWriter>>& writers) { AsyncStitchOutput* aso = new AsyncStitchOutput(surfs, renderers, writers); FAIL_RETURN(aso->initialize(surfs, writers)); return aso; } Status pushVideo(mtime_t date) override { return AsyncBuffer<PanoFrameBuffer>::pushVideo(date); } virtual void setCompositor(const std::shared_ptr<GPU::Overlayer>& c) override { WriterPusher<PanoFrameBuffer>::setCompositor(c); } PanoSurface& acquireFrame(mtime_t date) override { return AsyncPanoBuffer::acquireFrame(date); } protected: typedef AsyncBufferedOutput<StitchOutput::Pimpl, AsyncPanoBuffer, WriterPusher<PanoFrameBuffer>, PanoDeviceDefinition> Base; AsyncStitchOutput(const std::vector<std::shared_ptr<PanoSurface>>& surfs, const std::vector<std::shared_ptr<PanoRenderer>>& renderers, const std::vector<std::shared_ptr<Output::VideoWriter>>& writers) : Base(surfs, writers) { setRenderers(renderers); } }; class AsyncStereoOutput : public AsyncBufferedOutput<StereoOutput::Pimpl, AsyncStereoBuffer, StereoWriterPusher, StereoDeviceDefinition> { public: static Potential<AsyncStereoOutput> create(const std::vector<std::shared_ptr<PanoSurface>>& surfs, const std::vector<std::shared_ptr<PanoRenderer>>& renderers, const std::vector<std::shared_ptr<Output::StereoWriter>>& writers) { AsyncStereoOutput* ret = new AsyncStereoOutput(surfs, renderers, writers); FAIL_RETURN(ret->initialize(surfs, writers)); return ret; } virtual ~AsyncStereoOutput() {} Status pushVideo(mtime_t date, Eye eye) override { return AsyncStereoBuffer::pushVideo(date, eye); } virtual PanoSurface& acquireLeftFrame(mtime_t date) override { return AsyncStereoBuffer::acquireLeftFrame(date); } virtual PanoSurface& acquireRightFrame(mtime_t date) override { return AsyncStereoBuffer::acquireRightFrame(date); } protected: AsyncStereoOutput(const std::vector<std::shared_ptr<PanoSurface>>& surfs, const std::vector<std::shared_ptr<PanoRenderer>>& renderers, const std::vector<std::shared_ptr<Output::StereoWriter>>& writers) : AsyncBufferedOutput(surfs, writers) { setRenderers(renderers); } }; } // namespace Core } // namespace VideoStitch