// Copyright (c) 2012-2017 VideoStitch SAS // Copyright (c) 2018 stitchEm #pragma once /** * Bounded, thread-safe data structures */ #include <condition_variable> #include <mutex> #include <queue> namespace VideoStitch { struct queue_tag {}; struct priority_queue_tag {}; template <typename Container> struct container_traits; template <typename T> struct container_traits<std::queue<T>> { typedef queue_tag container_category; }; template <typename T, typename Compare> struct container_traits<std::priority_queue<T, std::vector<T>, Compare>> { typedef priority_queue_tag container_category; }; template <typename Container> class BoundedThreadSafeQueue { public: typedef typename Container::value_type T; explicit BoundedThreadSafeQueue(size_t boundary = std::numeric_limits<size_t>::max()) : boundary(boundary), stp(false) {} ~BoundedThreadSafeQueue() {} void push(T& item) { { std::unique_lock<std::mutex> sl(mutex); condv.wait(sl, [&]() { return queue.size() < boundary || stp; }); queue.push(std::move(item)); } condv.notify_all(); } template <typename Predicate> bool front(Predicate lambda) { std::unique_lock<std::mutex> sl(mutex); condv.wait(sl, [&]() { return !queue.empty() || stp; }); typename container_traits<Container>::container_category category; return lambda(front_dispatch(category)); } bool pop(T& item) { { std::unique_lock<std::mutex> sl(mutex); condv.wait(sl, [&]() { return !queue.empty() || stp; }); if (stp && queue.empty()) { return false; } typename container_traits<Container>::container_category category; item = front_dispatch(category); queue.pop(); } condv.notify_all(); return true; } typename std::queue<T>::size_type size() { std::lock_guard<std::mutex> sl(mutex); return queue.size(); } void stop() { { std::lock_guard<std::mutex> sl(mutex); stp = true; } condv.notify_all(); } private: T front_dispatch(queue_tag) { return std::move(queue.front()); // invalidates the front effectively } T front_dispatch(priority_queue_tag) { return std::move(queue.top()); // invalidates the top effectively } Container queue; std::mutex mutex; std::condition_variable condv; size_t boundary; bool stp; }; template <typename T, typename Compare> using PriorityQueue = BoundedThreadSafeQueue<std::priority_queue<T, std::vector<T>, Compare>>; template <typename T> using Queue = BoundedThreadSafeQueue<std::queue<T>>; } // namespace VideoStitch