Left: | ||
Right: |
OLD | NEW |
---|---|
(Empty) | |
1 /* | |
2 * This file is part of Adblock Plus <https://adblockplus.org/>, | |
3 * Copyright (C) 2006-2015 Eyeo GmbH | |
Oleksandr
2016/01/28 10:15:34
Nit: Here and further sadly we have already update
Eric
2016/02/04 21:01:44
Done.
| |
4 * | |
5 * Adblock Plus is free software: you can redistribute it and/or modify | |
6 * it under the terms of the GNU General Public License version 3 as | |
7 * published by the Free Software Foundation. | |
8 * | |
9 * Adblock Plus is distributed in the hope that it will be useful, | |
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 * GNU General Public License for more details. | |
13 * | |
14 * You should have received a copy of the GNU General Public License | |
15 * along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. | |
16 */ | |
17 | |
18 #include <thread> | |
19 #include <mutex> | |
20 #include <condition_variable> | |
21 #include <deque> | |
22 #include "Placeholder.h" | |
23 | |
24 /** | |
25 * A synchronized FIFO queue with thread notice on receipt. | |
26 * | |
27 * \tparam T | |
28 * Class of the elements of the queue. | |
29 * It must have a move constructor. | |
30 * \tparam F | |
31 * Functor type of object to receive notice on Insert(). | |
Oleksandr
2016/01/28 10:15:34
Nit: I guess this parameter was removed at some po
Eric
2016/02/04 21:01:43
Yes. As I recall, I originally only had one class.
| |
32 */ | |
33 template<class T> | |
34 class MessageQueue | |
35 { | |
36 std::deque<T> queue; | |
37 typedef std::lock_guard<std::mutex> SentryType; | |
38 typedef std::unique_lock<std::mutex> UniqueLockType; | |
39 std::mutex mutex; | |
40 std::condition_variable cv; | |
41 public: | |
42 MessageQueue() {} // = default | |
43 | |
44 /* | |
45 * This class does not have any responsibility with regard to the elements of its queue. | |
46 * An instance may be destroyed with elements still in the queue. | |
47 */ | |
48 ~MessageQueue() {} // = default; | |
49 | |
50 /** | |
51 * Insert an element, copy constructor version. | |
Oleksandr
2016/01/28 10:15:34
Nit: this is not a constructor. I think "lvalue ve
Eric
2016/02/04 21:01:44
OK. I'll use "copy semantics".
| |
52 */ | |
53 void Insert(const T& t) | |
54 { | |
55 { | |
Oleksandr
2016/01/28 10:15:35
Just to reiterate Sergei's point - the extra scope
Eric
2016/02/04 21:01:44
Done.
| |
56 SentryType sentry(mutex); | |
57 queue.push_back(t); | |
58 cv.notify_one(); | |
59 } | |
60 } | |
61 | |
62 /** | |
63 * Insert an element, move constructor version. | |
64 */ | |
65 void Insert(T&& t) | |
66 { | |
67 { | |
68 SentryType sentry(mutex); | |
69 queue.push_back(std::move(t)); | |
70 cv.notify_one(); | |
71 } | |
72 } | |
73 | |
74 /** | |
75 * Wake up anything waiting on the condition variable. | |
76 * | |
77 * \tparam Function | |
78 * Functor type for argument | |
79 * \param f | |
80 * Functor to execute _inside_ the protection of the queue's mutex | |
81 */ | |
82 template<typename Function> | |
83 void Rouse(Function& f) | |
84 { | |
85 SentryType sentry(mutex); | |
86 f(); | |
87 cv.notify_one(); | |
88 } | |
89 | |
90 /** | |
91 * Remove and Test. | |
92 * Test that the queue is not empty. | |
93 * If so, remove the next element from the consumer end of the queue and pass it to the argument function | |
Oleksandr
2016/01/28 10:15:34
Nit: I think there are too many comments in this f
Eric
2016/02/04 21:01:43
Reworded.
| |
94 * | |
95 * \param f | |
96 * If return value is true, a functor to which to pass the element at the co nsumer end of the queue. | |
97 * This functor is executed _outside_ the protection of the queue's mutex. | |
98 * \return | |
99 * True if and only if an element was removed from the front of the queue. | |
100 */ | |
101 template<typename Function> | |
102 bool Remove(Function& f) | |
103 { | |
104 /* | |
105 * We need to remove an element atomically from the queue, | |
106 * so we need to construct that element _inside_ the protection of the mut ex, | |
107 * On the other hand, we need to execute the function _outside_ the protecti on of the queue, | |
108 * because otherwise performance may suffer. | |
109 * Hence we use 'Placeholder' so that we can declare a variable to hold the removed element | |
110 * without requiring it to have a default constructor or assignment operat ors. | |
111 */ | |
112 Placeholder<T> x; | |
113 { | |
114 SentryType sentry(mutex); | |
115 if (queue.empty()) | |
116 return false; | |
117 x.Construct(std::move(queue.front())); // only require move-constructib ility | |
118 queue.pop_front(); | |
119 } | |
120 f(std::move(x.Object())); | |
121 return true; | |
122 } | |
123 | |
124 /** | |
125 * If a condition is satisfied, wait indefinitely. If not, don't wait. | |
126 * | |
127 * \return False, immediately without waiting, if the argument evaluates false . | |
128 * \return True, only after waiting, if the argument evaluates true. | |
129 */ | |
130 template<typename Predicate> | |
131 bool WaitIf(Predicate& p) | |
132 { | |
133 UniqueLockType ul(mutex); | |
134 if (!p()) | |
135 { | |
136 return false; | |
137 } | |
138 cv.wait(ul); | |
139 return true; | |
140 } | |
141 | |
142 /* | |
143 * A defect in the compiler for VS2012 requires these definitions. | |
144 */ | |
145 #if defined(_MSC_VER) && _MSC_VER <= 1700 | |
146 #define WAIT_FOR_RETURN_TYPE std::cv_status::cv_status | |
147 #else | |
148 #define WAIT_FOR_RETURN_TYPE std::cv_status | |
149 #endif | |
150 | |
151 /** | |
152 * Wait for a limited duration. | |
153 * | |
154 * Return type was going to be "decltype(cv.wait_for)" for clarity. | |
155 * A defect in the toolset for VS 2012 causes that declaration not to compile. | |
156 */ | |
157 template <class R, class P> | |
158 WAIT_FOR_RETURN_TYPE WaitFor(const std::chrono::duration<R,P>& relativeDuratio n) | |
159 { | |
160 return cv.wait_for(UniqueLockType(mutex), relativeDuration); | |
161 } | |
162 }; | |
163 | |
164 /** | |
165 * An active queue with a single, internal consumer. | |
166 * | |
167 * This class presents the front of a message queue for arbitrary producers. | |
168 * The back of the message queue is managed by an internal consumer, | |
169 * which calls the processor for each element passed through the queue. | |
170 * | |
171 * The internal thread has the same lifetime as the object. | |
172 * There's no external interface to pause or kill the thread. | |
173 * Destroy the object to terminate the thread. | |
174 * The consumer drains the queue before terminating the thread. | |
175 * | |
176 * \tparam T Class of elements in the queue | |
177 * \tparam F Type of functor to process each element consumed from the queue | |
178 */ | |
179 template<class T, class F> | |
180 class ActiveQueue | |
181 { | |
182 /** | |
183 * Signal flag indicating to the consumer thread that it should continue runni ng. | |
184 * | |
185 * This flag is true for the entire duration of the object lifetime; | |
186 * it's set to false only in the destructor. | |
187 * To avoid race conditions, it's only accessed under mutex protection with ca lls to notify and wait. | |
188 * We accomplish this with lambdas passed as the functor arguments of the mess age queue functions 'Rouse' and 'WaitIf'. | |
189 * These lambdas are the only references to the variable after construction. | |
190 */ | |
191 bool running; | |
192 | |
193 /** | |
194 * Functor to process each element as it is removed from the queue. | |
195 */ | |
196 F& processor; | |
197 | |
198 /** | |
199 * The queue that the active thread behavior is wrapped around. | |
200 */ | |
201 MessageQueue<T> queue; | |
202 | |
203 /** | |
204 * Thread running the consumer process. | |
205 * | |
206 * This thread runs the entire lifetime of this object. | |
207 * It's started in the constructor and joined in the destructor. | |
208 * | |
209 * This member variable is declared last so that it is constructed last, | |
210 * after all the other member necessary for proper functioning of the consum er. | |
211 */ | |
212 std::thread thread; | |
213 | |
214 /** | |
215 * Main function for the consumer thread. | |
216 * | |
217 * Strictly speaking, it's the effective main function for the thread. | |
218 * The actual main function is a lambda that only calls this function and noth ing else. | |
219 */ | |
220 void Consumer() | |
221 { | |
222 while (queue.WaitIf([this]() { return running; })) | |
223 { | |
224 // Drain the queue, processing all of its elements | |
225 while ( | |
226 queue.Remove( | |
227 [&](T&& t) { | |
228 try | |
229 { | |
230 processor(std::move(t)); | |
231 } | |
232 catch (...) | |
233 { | |
234 // Ignore any exception 'processor' may throw. | |
235 } | |
236 } | |
237 ) | |
238 ) | |
239 { | |
240 } | |
241 // Loop termination: queue.Remove() removed nothing from the queue, so it' s now empty | |
242 } | |
243 } | |
244 | |
245 public: | |
246 ActiveQueue(F& f) | |
247 : running(true), processor(f), queue(), thread([this]() { Consumer(); }) | |
248 {} | |
249 | |
250 ~ActiveQueue() | |
251 { | |
252 /* | |
253 * The consumer thread is waiting on the condition variable. | |
254 * If we don't wake it up, our thread won't terminate. | |
255 * | |
256 * Note that we don't have a race condition here with new elements arriving | |
257 * while we are draining the queue for the last time. | |
258 * We are executing in the destructor, so our member function Insert() is un available, | |
259 * and it's the only way that new element enter the message queue. | |
260 */ | |
261 queue.Rouse([this](){ running = false; }); | |
262 thread.join(); | |
263 } | |
264 | |
265 ActiveQueue(const ActiveQueue&); // = delete | |
266 ActiveQueue(ActiveQueue&&); // = delete | |
267 ActiveQueue& operator=(const ActiveQueue&); // = delete | |
268 ActiveQueue& operator=(ActiveQueue&&); // = delete | |
269 | |
270 /** | |
271 * Insert | |
272 */ | |
273 void Insert(const T& t) | |
274 { | |
275 queue.Insert(t); | |
276 } | |
277 | |
278 /** | |
279 * Insert | |
280 */ | |
281 void Insert(T&& t) | |
282 { | |
283 queue.Insert(std::move(t)); | |
284 } | |
285 }; | |
OLD | NEW |