1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// Copyright (c) 2012-2017 VideoStitch SAS
// Copyright (c) 2018 stitchEm
#pragma once
/**
* Bounded, thread-safe data structures
*/
#include <condition_variable>
#include <mutex>
#include <queue>
namespace VideoStitch {
struct queue_tag {};
struct priority_queue_tag {};
template <typename Container>
struct container_traits;
template <typename T>
struct container_traits<std::queue<T>> {
typedef queue_tag container_category;
};
template <typename T, typename Compare>
struct container_traits<std::priority_queue<T, std::vector<T>, Compare>> {
typedef priority_queue_tag container_category;
};
template <typename Container>
class BoundedThreadSafeQueue {
public:
typedef typename Container::value_type T;
explicit BoundedThreadSafeQueue(size_t boundary = std::numeric_limits<size_t>::max())
: boundary(boundary), stp(false) {}
~BoundedThreadSafeQueue() {}
void push(T& item) {
{
std::unique_lock<std::mutex> sl(mutex);
condv.wait(sl, [&]() { return queue.size() < boundary || stp; });
queue.push(std::move(item));
}
condv.notify_all();
}
template <typename Predicate>
bool front(Predicate lambda) {
std::unique_lock<std::mutex> sl(mutex);
condv.wait(sl, [&]() { return !queue.empty() || stp; });
typename container_traits<Container>::container_category category;
return lambda(front_dispatch(category));
}
bool pop(T& item) {
{
std::unique_lock<std::mutex> sl(mutex);
condv.wait(sl, [&]() { return !queue.empty() || stp; });
if (stp && queue.empty()) {
return false;
}
typename container_traits<Container>::container_category category;
item = front_dispatch(category);
queue.pop();
}
condv.notify_all();
return true;
}
typename std::queue<T>::size_type size() {
std::lock_guard<std::mutex> sl(mutex);
return queue.size();
}
void stop() {
{
std::lock_guard<std::mutex> sl(mutex);
stp = true;
}
condv.notify_all();
}
private:
T front_dispatch(queue_tag) {
return std::move(queue.front()); // invalidates the front effectively
}
T front_dispatch(priority_queue_tag) {
return std::move(queue.top()); // invalidates the top effectively
}
Container queue;
std::mutex mutex;
std::condition_variable condv;
size_t boundary;
bool stp;
};
template <typename T, typename Compare>
using PriorityQueue = BoundedThreadSafeQueue<std::priority_queue<T, std::vector<T>, Compare>>;
template <typename T>
using Queue = BoundedThreadSafeQueue<std::queue<T>>;
} // namespace VideoStitch