// Copyright (c) 2012-2017 VideoStitch SAS // Copyright (c) 2018 stitchEm #pragma once #include "rtmpEnums.hpp" #include "rtmpStructures.hpp" #include "audioDecoder.hpp" #include "videoDecoder.hpp" #include "libvideostitch/imuData.hpp" #include "libvideostitch/inputFactory.hpp" #include "libvideostitch/orah/exposureData.hpp" #include "libvideostitch/utils/semaphore.hpp" #ifdef USE_AVFORMAT #include "avMuxer.hpp" #endif #include "librtmpIncludes.hpp" #include <atomic> #include <condition_variable> #include <functional> #include <memory> #include <mutex> #include <queue> #include <thread> namespace VideoStitch { namespace Input { class RTMPClient : public VideoReader, public AudioReader, public MetadataReader, public SinkReader { public: virtual ~RTMPClient(); static RTMPClient* create(readerid_t id, const Ptv::Value* config, const int64_t width, const int64_t height); static bool handles(const Ptv::Value* config); ReadStatus readFrame(mtime_t& date, unsigned char* video) override; ReadStatus readSamples(size_t, Audio::Samples&) override; size_t available() override; bool eos() override; Status readIMUSamples(std::vector<VideoStitch::IMU::Measure>& imuData) override; MetadataReadStatus readExposure(std::map<videoreaderid_t, Metadata::Exposure>& exposure) override; MetadataReadStatus readWhiteBalance(std::map<videoreaderid_t, Metadata::WhiteBalance>& whiteBalance) override; MetadataReadStatus readToneCurve(std::map<videoreaderid_t, Metadata::ToneCurve>& toneCurve) override; virtual Status seekFrame(frameid_t) override { return VideoStitch::Status::OK(); } virtual Status seekFrame(mtime_t) override { return VideoStitch::Status::OK(); } Status addSink(const Ptv::Value* config, const mtime_t videoTimeStamp, const mtime_t audioTimeStamp) override; void removeSink() override; private: RTMPClient(readerid_t id, const Ptv::Value* config, const std::string& displayName, int64_t width, int64_t height, VideoStitch::PixelFormat fmt, FrameRate framerate, VideoDecoder::Type decoderType, Audio::ChannelLayout chanLayout, Audio::SamplingRate srate, Audio::SamplingDepth sdepth, FrameRate frameRateIMU); void readPacket(const RTMPPacket& packet); void readVideoPacket(const RTMPPacket& packet); void readInfoPacket(const RTMPPacket& packet); void readAudioPacket(const RTMPPacket& packet); void initEncoderData(); void metaDataParse(AMFObject* amfObj); void metaDataParseOnText(AMFObject* amfObj, const uint32_t timestamp); videoreaderid_t getOrahInputID() const; /** @brief Retrieve the IMU data from an RTMP AMF object * * @param amfObj: rtmp AMF object * @param imuData: struct which contains the fields of IMU data * note: the timestamp field is not retrived from the AMF object, * it is retrived from the timestamp of the RTMP packet * @return code: True if all the fields have been properly retrieved. False otherwise. */ bool metaDataParseIMU(AMFObject* amfObj, VideoStitch::IMU::Measure& imuData); void readLoop(); void decodeLoop(); void audioDecodeLoop(); void flushAudio(); void flushVideo(); void decodeFrame(); bool connect(); std::unique_ptr<AudioDecoder> audioDecoder; VideoDecoder* videoDecoder; std::atomic<mtime_t> inputVideoTimestamp; /* in ms */ FrameRate fps; VideoDecoder::Type decoderType; uint64_t sampleRate; // Hz uint8_t audioChannels; //----------------------------------------------- // stream startup stuff std::string URL; const Ptv::Value* config; // change connection status and notify listeners void setConnectionStatus(IO::RTMPConnectionStatus rtmpConnectionStatus); IO::RTMPConnectionStatus rtmpConnectionStatus; std::atomic<bool> stopping; std::unique_ptr<RTMP, std::function<void(RTMP*)>> rtmp; std::mutex frameMu; bool stoppingFrames = false; Semaphore videoSem; std::condition_variable frameCV; typedef std::pair<mtime_t, VideoPtr> Frame; std::queue<Frame> frames; // audio rtmp pkt buffer std::mutex audioPktQueueMutex; std::condition_variable audioPktQueueCond; std::queue<VideoStitch::IO::DataPacket> audioPktQueue; bool stoppingAudioQueue = false; // decoded audio buffer AudioStream audioStream; // decoded imu data queue std::mutex imuQueueMutex; std::queue<VideoStitch::IMU::Measure> imuQueue; // decoded exposure data queue std::mutex exposureQueueMutex; std::queue<std::map<videoreaderid_t, VideoStitch::Metadata::Exposure>> exposureQueue; // decoded tone curve data queue std::mutex toneCurveQueueMutex; std::queue<std::map<videoreaderid_t, VideoStitch::Metadata::ToneCurve>> toneCurveQueue; // These threads are launched during construction. // Please keep their declaration at the end to make sure that the rest of the object is initialized when they are // launched. std::thread* readThread; std::thread* decodeThread; std::thread* audioDecodeThread; Span<unsigned char> videoHeader; #ifdef USE_AVFORMAT VideoStitch::Output::AvMuxer avSink; #endif }; } // namespace Input } // namespace VideoStitch