Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Side by Side Diff: src/plugin/ActiveQueue.h

Issue 29323611: Issue #1234, #2058 - Rewrite log facility, improving thread implementation
Patch Set: address comments Created Feb. 4, 2016, 8:59 p.m.
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « adblockplus.gyp ('k') | src/plugin/Config.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*
2 * This file is part of Adblock Plus <https://adblockplus.org/>,
3 * Copyright (C) 2006-2016 Eyeo GmbH
4 *
5 * Adblock Plus is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License version 3 as
7 * published by the Free Software Foundation.
8 *
9 * Adblock Plus is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 #include <thread>
19 #include <mutex>
20 #include <condition_variable>
21 #include <deque>
22 #include "Placeholder.h"
23
24 /**
25 * A synchronized FIFO queue with thread notice on receipt.
26 *
27 * \tparam T
28 * Class of the elements of the queue.
29 * It must have a move constructor.
30 */
31 template<class T>
32 class MessageQueue
33 {
34 std::deque<T> queue;
35 typedef std::lock_guard<std::mutex> SentryType;
36 typedef std::unique_lock<std::mutex> UniqueLockType;
37 std::mutex mutex;
38 std::condition_variable cv;
39 public:
40 MessageQueue() {} // = default
41
42 /*
43 * This class does not have any responsibility with regard to the elements of its queue.
44 * An instance may be destroyed with elements still in the queue.
45 */
46 ~MessageQueue() {} // = default;
47
48 /**
49 * Insert an element, copy semantics version.
50 */
51 void Insert(const T& t)
52 {
53 SentryType sentry(mutex);
54 queue.push_back(t);
55 cv.notify_one();
56 }
57
58 /**
59 * Insert an element, move semantics version.
60 */
61 void Insert(T&& t)
62 {
63 SentryType sentry(mutex);
64 queue.push_back(std::move(t));
65 cv.notify_one();
66 }
67
68 /**
69 * Wake up anything waiting on the condition variable.
70 *
71 * \tparam Function
72 * Functor type for argument
73 * \param f
74 * Functor to execute _inside_ the protection of the queue's mutex
75 */
76 template<typename Function>
77 void Rouse(Function& f)
78 {
79 SentryType sentry(mutex);
80 f();
81 cv.notify_one();
82 }
83
84 /**
85 * Test, Remove, and Execute
86 *
87 * If the queue is not empty, remove the next element from the consumer end
88 * of the queue and execute the argument functor on it.
89 *
90 * \param f
91 * A functor to which to pass the element at the consumer end of the queue.
92 * This functor is executed _outside_ the protection of the queue's mutex.
93 * \return
94 * True if and only if an element was removed from the front of the queue.
95 */
96 template<typename Function>
97 bool Remove(Function& f)
98 {
99 /*
100 * We need to remove an element atomically from the queue,
101 * so we need to construct that element _inside_ the protection of the mut ex,
102 * On the other hand, we need to execute the function _outside_ the protecti on
103 * of the queue, because otherwise performance may suffer.
104 * Hence we use 'Placeholder' so that we can declare a variable to hold the
105 * removed element without requiring it to have a default constructor or
106 * assignment operators.
107 */
108 Placeholder<T> x;
109 {
110 SentryType sentry(mutex);
111 if (queue.empty())
112 return false;
113 x.Construct(std::move(queue.front())); // only require move-constructib ility
114 queue.pop_front();
115 }
116 f(std::move(x.Object()));
117 return true;
118 }
119
120 /**
121 * If a condition is satisfied, wait indefinitely. If not, don't wait.
122 *
123 * \return False, immediately without waiting, if the argument evaluates false .
124 * \return True, only after waiting, if the argument evaluates true.
125 */
126 template<typename Predicate>
127 bool WaitIf(Predicate& p)
128 {
129 UniqueLockType ul(mutex);
130 if (!p())
131 {
132 return false;
133 }
134 cv.wait(ul);
135 return true;
136 }
137
138 /*
139 * A defect in the compiler for VS2012 requires these definitions.
140 */
141 #if defined(_MSC_VER) && _MSC_VER <= 1700
142 #define WAIT_FOR_RETURN_TYPE std::cv_status::cv_status
143 #else
144 #define WAIT_FOR_RETURN_TYPE std::cv_status
145 #endif
146
147 /**
148 * Wait for a limited duration.
149 *
150 * Return type was going to be "decltype(cv.wait_for)" for clarity.
151 * A defect in the toolset for VS 2012 causes that declaration not to compile.
152 */
153 template <class R, class P>
154 WAIT_FOR_RETURN_TYPE WaitFor(const std::chrono::duration<R,P>& relativeDuratio n)
155 {
156 return cv.wait_for(UniqueLockType(mutex), relativeDuration);
157 }
158 };
159
160 /**
161 * An active queue with a single, internal consumer.
162 *
163 * This class presents the front of a message queue for arbitrary producers.
164 * The back of the message queue is managed by an internal consumer,
165 * which calls the processor for each element passed through the queue.
166 *
167 * The internal thread has the same lifetime as the object.
168 * There's no external interface to pause or kill the thread.
169 * Destroy the object to terminate the thread.
170 * The consumer drains the queue before terminating the thread.
171 *
172 * \tparam T Class of elements in the queue
173 * \tparam F Type of functor to process each element consumed from the queue
174 */
175 template<class T, class F>
176 class ActiveQueue
177 {
178 /**
179 * Signal flag indicating to the consumer thread that it should continue runni ng.
180 *
181 * This flag is true for the entire duration of the object lifetime;
182 * it's set to false only in the destructor.
183 * To avoid race conditions, it's only accessed under mutex protection with ca lls to notify and wait.
184 * We accomplish this with lambdas passed as the functor arguments of the mess age queue functions 'Rouse' and 'WaitIf'.
185 * These lambdas are the only references to the variable after construction.
186 */
187 bool running;
188
189 /**
190 * Functor to process each element as it is removed from the queue.
191 */
192 F& processor;
193
194 /**
195 * The queue that the active thread behavior is wrapped around.
196 */
197 MessageQueue<T> queue;
198
199 /**
200 * Thread running the consumer process.
201 *
202 * This thread runs the entire lifetime of this object.
203 * It's started in the constructor and joined in the destructor.
204 *
205 * This member variable is declared last so that it is constructed last,
206 * after all the other member necessary for proper functioning of the consum er.
207 */
208 std::thread thread;
209
210 /**
211 * Main function for the consumer thread.
212 *
213 * Strictly speaking, it's the effective main function for the thread.
214 * The actual main function is a lambda that only calls this function and noth ing else.
215 */
216 void Consumer()
217 {
218 while (queue.WaitIf([this]() { return running; }))
219 {
220 // Drain the queue, processing all of its elements
221 while (
222 queue.Remove(
223 [&](T&& t) {
224 try
225 {
226 processor(std::move(t));
227 }
228 catch (...)
229 {
230 // Ignore any exception 'processor' may throw.
231 }
232 }
233 )
234 )
235 {
236 }
237 // Loop termination: queue.Remove() removed nothing from the queue, so it' s now empty
238 }
239 }
240
241 public:
242 ActiveQueue(F& f)
243 : running(true), processor(f), queue(), thread([this]() { Consumer(); })
244 {}
245
246 ~ActiveQueue()
247 {
248 /*
249 * The consumer thread is waiting on the condition variable.
250 * If we don't wake it up, our thread won't terminate.
251 *
252 * Note that we don't have a race condition here with new elements arriving
253 * while we are draining the queue for the last time.
254 * We are executing in the destructor, so our member function Insert() is un available,
255 * and it's the only way that new element enter the message queue.
256 */
257 queue.Rouse([this](){ running = false; });
258 thread.join();
259 }
260
261 ActiveQueue(const ActiveQueue&); // = delete
262 ActiveQueue(ActiveQueue&&); // = delete
263 ActiveQueue& operator=(const ActiveQueue&); // = delete
264 ActiveQueue& operator=(ActiveQueue&&); // = delete
265
266 /**
267 * Insert
268 */
269 void Insert(const T& t)
270 {
271 queue.Insert(t);
272 }
273
274 /**
275 * Insert
276 */
277 void Insert(T&& t)
278 {
279 queue.Insert(std::move(t));
280 }
281 };
OLDNEW
« no previous file with comments | « adblockplus.gyp ('k') | src/plugin/Config.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld