// Copyright (c) 2012-2017 VideoStitch SAS
// Copyright (c) 2018 stitchEm

#include "asyncOutput.hpp"

#include "common/container.hpp"

#include "libvideostitch/gpu_device.hpp"

namespace VideoStitch {
namespace Core {
/**
 * Async*Buffer
 */
Potential<AsyncSourceBuffer> AsyncSourceBuffer::create(
    const std::vector<std::shared_ptr<SourceFrameBuffer::Surface>>& surfs,
    const std::vector<std::shared_ptr<Output::VideoWriter>>& writers) {
  auto ab = std::make_unique<AsyncSourceBuffer>();
  FAIL_RETURN(ab->initialize(surfs, writers));
  return ab.release();
}

Potential<AsyncPanoBuffer> AsyncPanoBuffer::create(const std::vector<std::shared_ptr<PanoFrameBuffer::Surface>>& surfs,
                                                   const std::vector<std::shared_ptr<Output::VideoWriter>>& writers) {
  auto ab = std::make_unique<AsyncPanoBuffer>();
  FAIL_RETURN(ab->initialize(surfs, writers));
  return ab.release();
}

template <typename FrameBuffer>
Status AsyncBuffer<FrameBuffer>::initialize(const std::vector<std::shared_ptr<typename FrameBuffer::Surface>>& surfs,
                                            const std::vector<std::shared_ptr<Output::VideoWriter>>& writers) {
  FAIL_RETURN(GPU::useDefaultBackendDevice());
  for (auto surf : surfs) {
    Potential<FrameBuffer> frame = FrameBuffer::create(surf, writers);
    if (!frame.ok()) {
      return frame.status();
    }
    blankFrames.push_back(frame.object());
    allFrames.push_back(frame.release());
  }

  return Status::OK();
}

template <typename FrameBuffer>
AsyncBuffer<FrameBuffer>::~AsyncBuffer() {
  deleteAll(allFrames);
}

// -------- Stitcher thread functions

GPU::Surface& AsyncSourceBuffer::acquireFrame(mtime_t date, GPU::Stream& stream) {
  SourceFrameBuffer* frame = getCurrentFrame(date);
  return frame->acquireFrame(date, stream);
}

PanoSurface& AsyncPanoBuffer::acquireFrame(mtime_t date) {
  PanoFrameBuffer* frame = getCurrentFrame(date);
  return frame->acquireFrame(date);
}

template <typename FrameBuffer>
Status AsyncBuffer<FrameBuffer>::pushVideo(mtime_t date) {
  FrameBuffer* frame = getCurrentFrame(date);
  // release the surf frame
  frame->releaseFrame();
  inUse.erase(date);

  // put the frame as "stitched".
  {
    std::unique_lock<std::mutex> lock(stMu);
    stitchedFrames.push_back(std::make_pair(date, frame));
  }
  stCond.notify_one();

  return Status::OK();
}

template <typename FrameBuffer>
auto AsyncBuffer<FrameBuffer>::getCurrentFrame(mtime_t date) -> FrameBuffer* {
  // first of all have we already seen this frame ?
  FrameBuffer* frame = inUse[date];

  if (frame == nullptr) {
    // new frame, wait for a blank frame to be available
    {
      std::unique_lock<std::mutex> lock(bkMu);
      bkCond.wait(lock, [this] { return blankFrames.size() > 0; });
      inUse[date] = blankFrames.front();
      blankFrames.pop_front();
    }
  }
  return inUse[date];
}

template <typename FrameBuffer>
auto AsyncBuffer<FrameBuffer>::getUsedFrame(mtime_t date) -> FrameBuffer* {
  return inUse.at(date);
}

// -------- Register thread functions

template <typename FrameBuffer>
Status AsyncBuffer<FrameBuffer>::registerWriters(const std::vector<std::shared_ptr<Output::VideoWriter>>& writers) {
  for (auto writer : writers) {
    FAIL_RETURN(registerWriter(writer));
  }
  return Status::OK();
}

template <typename FrameBuffer>
Status AsyncBuffer<FrameBuffer>::registerWriter(std::shared_ptr<Output::VideoWriter> writer) {
  FAIL_RETURN(GPU::useDefaultBackendDevice());
  for (auto frame : allFrames) {
    FAIL_RETURN(frame->registerWriter(writer));
  }
  return Status::OK();
}

template class AsyncBuffer<PanoFrameBuffer>;
template class AsyncBuffer<SourceFrameBuffer>;

/**
 * AsyncBufferedOutput
 */
template <typename Pimpl, typename AsyncBuffer, typename Pusher, typename Device>
bool AsyncBufferedOutput<Pimpl, AsyncBuffer, Pusher, Device>::setRenderers(
    const std::vector<std::shared_ptr<typename AsyncBuffer::FB::Renderer>>& r) {
  return Pusher::setRenderers(r);
}

template <typename Pimpl, typename AsyncBuffer, typename Pusher, typename Device>
AsyncBufferedOutput<Pimpl, AsyncBuffer, Pusher, Device>::AsyncBufferedOutput(
    const std::vector<std::shared_ptr<typename AsyncBuffer::FB::Surface>>& surfs,
    const std::vector<std::shared_ptr<Writer>>& writers)
    : Pimpl(surfs[0]->getWidth(), surfs[0]->getHeight()),
      Pusher(surfs[0]->getWidth(), surfs[0]->getHeight(), writers),
      shutdown(false) {}

template <typename Pimpl, typename AsyncBuffer, typename Pusher, typename Device>
Status AsyncBufferedOutput<Pimpl, AsyncBuffer, Pusher, Device>::initialize(
    const std::vector<std::shared_ptr<typename AsyncBuffer::FB::Surface>>& surfs,
    const std::vector<std::shared_ptr<Writer>>& writers) {
  FAIL_RETURN(AsyncBuffer::initialize(surfs, writers));
  worker = new std::thread(consumerThread, this);
  return Status::OK();
}

template <typename Pimpl, typename AsyncBuffer, typename Pusher, typename Device>
AsyncBufferedOutput<Pimpl, AsyncBuffer, Pusher, Device>::~AsyncBufferedOutput() {
  shutdown = true;
  this->stCond.notify_one();

  if (worker != nullptr) {
    worker->join();
    delete worker;
    worker = nullptr;
  }
}

template <typename Pimpl, typename AsyncBuffer, typename Pusher, typename Device>
bool AsyncBufferedOutput<Pimpl, AsyncBuffer, Pusher, Device>::setWriters(
    const std::vector<std::shared_ptr<Writer>>& writers) {
  AsyncBuffer::registerWriters(writers);
  return Pusher::setWriters(writers);
}

template <typename Pimpl, typename AsyncBuffer, typename Pusher, typename Device>
bool AsyncBufferedOutput<Pimpl, AsyncBuffer, Pusher, Device>::addWriter(std::shared_ptr<Writer> writer) {
  // Be careful to always register the writer before adding it
  AsyncBuffer::registerWriter(writer);
  return Pusher::addWriter(writer);
}

template <typename Pimpl, typename AsyncBuffer, typename Pusher, typename Device>
bool AsyncBufferedOutput<Pimpl, AsyncBuffer, Pusher, Device>::removeWriter(const std::string& id) {
  return Pusher::removeWriter(id);
}

template <typename Pimpl, typename AsyncBuffer, typename Pusher, typename Device>
bool AsyncBufferedOutput<Pimpl, AsyncBuffer, Pusher, Device>::updateWriter(const std::string& id,
                                                                           const Ptv::Value& config) {
  return Pusher::updateWriter(id, config);
}

template <typename Pimpl, typename AsyncBuffer, typename Pusher, typename Device>
void AsyncBufferedOutput<Pimpl, AsyncBuffer, Pusher, Device>::consumerThread(AsyncBufferedOutput* that) {
  if (!GPU::useDefaultBackendDevice().ok()) {
    return;
  }
  std::pair<mtime_t, typename AsyncBuffer::Frame> frame;
  for (;;) {
    // Wait for a frame to be scheduled.
    {
      std::unique_lock<std::mutex> lock(that->stMu);
      that->stCond.wait(lock, [that] { return that->stitchedFrames.size() > 0 || that->shutdown; });
      if (that->shutdown && that->stitchedFrames.size() == 0) {
        return;
      }
      frame = that->stitchedFrames.front();
      that->stitchedFrames.pop_front();
    }

    // Wait for the scheduled frame to be produced.
    that->synchronize(frame.second);

    // Consume it
    // Util::SimpleProfiler prof("  write frame", false, Logger::get(Logger::Verbose));
    that->pushVideoToWriters(frame.first, frame.second);

    // Put the frame back as blank
    {
      std::unique_lock<std::mutex> lock(that->bkMu);
      that->blankFrames.push_back(frame.second);
    }
    that->bkCond.notify_all();
  }
}

// ------------- Stereoscopy --------------------

/**
 * AsyncStereoBuffer
 */
Potential<AsyncStereoBuffer> AsyncStereoBuffer::create(
    const std::vector<std::shared_ptr<PanoSurface>>& surfs,
    const std::vector<std::shared_ptr<Output::StereoWriter>>& writers) {
  std::unique_ptr<AsyncStereoBuffer> ret(new AsyncStereoBuffer);
  FAIL_RETURN(ret->initialize(surfs, writers));
  return ret.release();
}

Status AsyncStereoBuffer::initialize(const std::vector<std::shared_ptr<PanoSurface>>& surfs,
                                     const std::vector<std::shared_ptr<Output::StereoWriter>>& writers) {
  FAIL_RETURN(GPU::useDefaultBackendDevice());
  for (auto surf : surfs) {
    Potential<StereoFrameBuffer> left = StereoFrameBuffer::create(surf, writers);
    if (!left.ok()) {
      return left.status();
    }

    Potential<StereoFrameBuffer> right = StereoFrameBuffer::create(surf, writers);
    if (!right.ok()) {
      return right.status();
    }

    allFrames.push_back(std::make_pair(left.object(), right.object()));
    blankFrames.push_back(std::make_pair(left.release(), right.release()));
  }

  return Status::OK();
}

AsyncStereoBuffer::~AsyncStereoBuffer() {
  for (auto stereoFrame : allFrames) {
    // delete left buffer
    delete stereoFrame.first;
    // delete right buffer
    delete stereoFrame.second;
  }
}

PanoSurface& AsyncStereoBuffer::acquireLeftFrame(mtime_t date) { return acquireFrame(date, LeftEye); }

PanoSurface& AsyncStereoBuffer::acquireRightFrame(mtime_t date) { return acquireFrame(date, RightEye); }

PanoSurface& AsyncStereoBuffer::acquireFrame(mtime_t date, Eye eye) {
  Frame frame = getCurrentFrame(date);
  if (eye == LeftEye) {
    return frame.first->acquireFrame(date);
  } else {
    return frame.second->acquireFrame(date);
  }
}

Status AsyncStereoBuffer::pushVideo(mtime_t date, Eye eye) {
  Frame stereoFrame = getCurrentFrame(date);
  Status res;
  if (eye == LeftEye) {
    res = stereoFrame.first->pushVideo();
  } else {
    res = stereoFrame.second->pushVideo();
  }

  // prevent concurrent access from the two stitcher's threads (left/right)
  // by checking inUse again under lock
  {
    std::unique_lock<std::mutex> lock(stMu);
    if (inUse.find(date) != inUse.end()) {
      inUse.erase(date);
      stitchedFrames.push_back(std::make_pair(date, stereoFrame));
      stCond.notify_one();
    }
  }

  return res;
}

auto AsyncStereoBuffer::getCurrentFrame(mtime_t date) -> Frame {
  // if new frame, wait for a blank frame to be available
  std::unique_lock<std::mutex> bkLock(bkMu);
  bkCond.wait(bkLock, [this, date] { return blankFrames.size() > 0 || inUse.find(date) != inUse.end(); });
  // here we need to check again for inUse : indeed, the left & right stitchers
  // could have been waiting simultaneously on a blank frame to be available
  if (inUse.find(date) == inUse.end()) {
    inUse[date] = blankFrames.front();
    blankFrames.pop_front();
  }
  return inUse[date];
}

auto AsyncStereoBuffer::getUsedFrame(mtime_t date) -> Frame { return inUse.at(date); }

void AsyncStereoBuffer::synchronize(Frame frame) {
  frame.first->streamSynchronize();
  frame.second->streamSynchronize();
}

Status AsyncStereoBuffer::registerWriters(const std::vector<std::shared_ptr<Output::StereoWriter>>& writers) {
  for (auto writer : writers) {
    FAIL_RETURN(registerWriter(writer));
  }
  return Status::OK();
}

Status AsyncStereoBuffer::registerWriter(std::shared_ptr<Output::StereoWriter> writer) {
  for (auto frame : allFrames) {
    FAIL_RETURN(frame.first->registerWriter(writer));
    FAIL_RETURN(frame.second->registerWriter(writer));
  }
  return Status::OK();
}

template class AsyncBufferedOutput<ExtractOutput::Pimpl, AsyncSourceBuffer, SourceWriterPusher, PanoDeviceDefinition>;
template class AsyncBufferedOutput<StitchOutput::Pimpl, AsyncPanoBuffer, PanoWriterPusher, PanoDeviceDefinition>;
template class AsyncBufferedOutput<StereoOutput::Pimpl, AsyncStereoBuffer, StereoWriterPusher, StereoDeviceDefinition>;
}  // namespace Core
}  // namespace VideoStitch