Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Delta Between Two Patch Sets: src/plugin/ActiveQueue.h

Issue 29323611: Issue #1234, #2058 - Rewrite log facility, improving thread implementation
Left Patch Set: Created Aug. 19, 2015, 5:42 p.m.
Right Patch Set: rebase only Created July 27, 2016, 9:11 p.m.
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « adblockplus.gyp ('k') | src/plugin/Config.h » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 /* 1 /*
2 * This file is part of Adblock Plus <https://adblockplus.org/>, 2 * This file is part of Adblock Plus <https://adblockplus.org/>,
3 * Copyright (C) 2006-2015 Eyeo GmbH 3 * Copyright (C) 2006-2016 Eyeo GmbH
4 * 4 *
5 * Adblock Plus is free software: you can redistribute it and/or modify 5 * Adblock Plus is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License version 3 as 6 * it under the terms of the GNU General Public License version 3 as
7 * published by the Free Software Foundation. 7 * published by the Free Software Foundation.
8 * 8 *
9 * Adblock Plus is distributed in the hope that it will be useful, 9 * Adblock Plus is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details. 12 * GNU General Public License for more details.
13 * 13 *
14 * You should have received a copy of the GNU General Public License 14 * You should have received a copy of the GNU General Public License
15 * along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. 15 * along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>.
16 */ 16 */
17 17
18 #include <thread> 18 #include <thread>
19 #include <mutex> 19 #include <mutex>
20 #include <condition_variable> 20 #include <condition_variable>
21 #include <deque> 21 #include <deque>
22 #include "Placeholder.h" 22 #include "Placeholder.h"
23 23
24 /** 24 /**
25 * A synchronized FIFO queue with thread notice on receipt. 25 * A synchronized FIFO queue with thread notice on receipt.
sergei 2015/10/01 15:50:06 So, why is it called MessageQueue? It seems better
Eric 2015/10/08 21:05:40 Because that's what it is, a message queue.
26 * 26 *
27 * \tparam T 27 * \tparam T
28 * Class of the elements of the queue. 28 * Class of the elements of the queue.
29 * It must have a move constructor. 29 * It must have a move constructor.
30 * \tparam F
31 * Functor type of object to receive notice on Insert().
32 */ 30 */
33 template<class T> 31 template<class T>
34 class MessageQueue 32 class MessageQueue
35 { 33 {
36 std::deque<T> queue; 34 std::deque<T> queue;
37 typedef std::lock_guard<std::mutex> SentryType; 35 typedef std::lock_guard<std::mutex> SentryType;
38 typedef std::unique_lock<std::mutex> UniqueLockType; 36 typedef std::unique_lock<std::mutex> UniqueLockType;
39 std::mutex mutex; 37 std::mutex mutex;
40 std::condition_variable cv; 38 std::condition_variable cv;
41 public: 39 public:
42 MessageQueue() {} // = default 40 MessageQueue() {} // = default
sergei 2015/10/01 15:50:06 Why do we need `= default`? I would remove it.
Eric 2015/10/08 21:05:40 It's for when we have C++11 available. The string
Oleksandr 2016/01/28 10:15:32 We have quite few // = default comments in the cod
43 41
44 /* 42 /*
45 * This class does not have any responsibility with regard to the elements of its queue. 43 * This class does not have any responsibility with regard to the elements of its queue.
46 * An instance may be destroyed with elements still in the queue. 44 * An instance may be destroyed with elements still in the queue.
47 */ 45 */
48 ~MessageQueue() {} // = default; 46 ~MessageQueue() {} // = default;
49 47
50 /** 48 /**
51 * Insert an element, copy constructor version. 49 * Insert an element, copy semantics version.
52 */ 50 */
53 void Insert(const T& t) 51 void Insert(const T& t)
sergei 2015/10/01 15:50:07 Do you mind to rename it to `push_back` or `PushBa
Eric 2015/10/08 21:05:39 I completely object. It's contrary to the abstract
Oleksandr 2016/01/28 10:15:33 I wouldn't mind Insert as well. I think we should
54 { 52 {
55 { 53 SentryType sentry(mutex);
sergei 2015/10/01 15:50:06 This additional scope is not needed here nor in th
Eric 2015/10/08 21:05:40 Yep. I think this is a left-over artifact from dev
56 SentryType sentry(mutex); 54 queue.push_back(t);
57 queue.push_back(t); 55 cv.notify_one();
58 cv.notify_one(); 56 }
59 } 57
60 } 58 /**
61 59 * Insert an element, move semantics version.
62 /**
63 * Insert an element, move constructor version.
64 */ 60 */
65 void Insert(T&& t) 61 void Insert(T&& t)
66 { 62 {
67 { 63 SentryType sentry(mutex);
68 SentryType sentry(mutex); 64 queue.push_back(std::move(t));
69 queue.push_back(std::move(t)); 65 cv.notify_one();
70 cv.notify_one();
71 }
72 } 66 }
73 67
74 /** 68 /**
75 * Wake up anything waiting on the condition variable. 69 * Wake up anything waiting on the condition variable.
76 * 70 *
77 * \tparam Function 71 * \tparam Function
78 * Functor type for argument 72 * Functor type for argument
79 * \param f 73 * \param f
80 * Functor to execute _inside_ the protection of the queue's mutex 74 * Functor to execute _inside_ the protection of the queue's mutex
81 */ 75 */
82 template<typename Function> 76 template<typename Function>
83 void Rouse(Function& f) 77 void Rouse(Function& f)
sergei 2015/10/01 15:50:06 The reason we need such method is to set a value o
Eric 2015/10/08 21:05:40 You get a race condition if you do it that way. Th
84 { 78 {
85 SentryType sentry(mutex); 79 SentryType sentry(mutex);
86 f(); 80 f();
87 cv.notify_one(); 81 cv.notify_one();
88 } 82 }
89 83
90 /** 84 /**
91 * Remove and Test. 85 * Test, Remove, and Execute
92 * Test that the queue is not empty. 86 *
93 * If so, remove the next element from the consumer end of the queue and pass it to the argument function 87 * If the queue is not empty, remove the next element from the consumer end
88 * of the queue and execute the argument functor on it.
94 * 89 *
95 * \param f 90 * \param f
96 * If return value is true, a functor to which to pass the element at the co nsumer end of the queue. 91 * A functor to which to pass the element at the consumer end of the queue.
97 * This functor is executed _outside_ the protection of the queue's mutex. 92 * This functor is executed _outside_ the protection of the queue's mutex.
98 * \return 93 * \return
99 * True if and only if an element was removed from the front of the queue. 94 * True if and only if an element was removed from the front of the queue.
100 */ 95 */
101 template<typename Function> 96 template<typename Function>
102 bool Remove(Function& f) 97 bool Remove(Function& f)
sergei 2015/10/01 15:50:06 Why not to call it `pop_front` or `try_pop_front(.
Eric 2015/10/08 21:05:39 As I've said before, this is not any kind of gener
103 { 98 {
104 /* 99 /*
105 * We need to remove an element atomically from the queue, 100 * We need to remove an element atomically from the queue,
106 * so we need to construct that element _inside_ the protection of the mut ex, 101 * so we need to construct that element _inside_ the protection of the mut ex,
107 * On the other hand, we need to execute the function _outside_ the protecti on of the queue, 102 * On the other hand, we need to execute the function _outside_ the protecti on
108 * because otherwise performance may suffer. 103 * of the queue, because otherwise performance may suffer.
109 * Hence we use 'Placeholder' so that we can declare a variable to hold the removed element 104 * Hence we use 'Placeholder' so that we can declare a variable to hold the
110 * without requiring it to have a default constructor or assignment operat ors. 105 * removed element without requiring it to have a default constructor or
106 * assignment operators.
111 */ 107 */
112 Placeholder<T> x; 108 Placeholder<T> x;
sergei 2015/10/01 15:50:06 I see only one reason to use a class `Placeholder`
Eric 2015/10/08 21:05:40 You see only one reason, yet there are other reaso
113 { 109 {
114 SentryType sentry(mutex); 110 SentryType sentry(mutex);
115 if (queue.empty()) 111 if (queue.empty())
116 return false; 112 return false;
117 x.Construct(std::move(queue.front())); // only require move-constructib ility 113 x.Construct(std::move(queue.front())); // only require move-constructib ility
118 queue.pop_front(); 114 queue.pop_front();
119 } 115 }
120 f(std::move(x.Object())); 116 f(std::move(x.Object()));
121 return true; 117 return true;
122 } 118 }
123 119
124 /** 120 /**
125 * If a condition is satisfied, wait indefinitely. If not, don't wait. 121 * If a condition is satisfied, wait indefinitely. If not, don't wait.
126 * 122 *
127 * \return False, immediately without waiting, if the argument evaluates false . 123 * \return False, immediately without waiting, if the argument evaluates false .
128 * \return True, only after waiting, if the argument evaluates true. 124 * \return True, only after waiting, if the argument evaluates true.
129 */ 125 */
130 template<typename Predicate> 126 template<typename Predicate>
131 bool WaitIf(Predicate& p) 127 bool WaitIf(Predicate& p)
sergei 2015/10/01 15:50:06 It's still `MessageQueue` class, how is this metho
Eric 2015/10/08 21:05:39 You get a race condition if you do it that way. Tr
Oleksandr 2016/01/28 10:15:32 I agree with Sergei, this method is very awkward.
Eric 2016/02/04 21:01:42 What's awkward about it? Testing a predicate under
132 { 128 {
133 UniqueLockType ul(mutex); 129 UniqueLockType ul(mutex);
134 if (!p()) 130 if (!p())
135 { 131 {
136 return false; 132 return false;
137 } 133 }
138 cv.wait(ul); 134 cv.wait(ul);
139 return true; 135 return true;
140 } 136 }
141 137
142 /* 138 /*
143 * A defect in the compiler for VS2012 requires these definitions. 139 * A defect in the compiler for VS2012 requires these definitions.
144 */ 140 */
145 #if defined(_MSC_VER) && _MSC_VER <= 1700 141 #if defined(_MSC_VER) && _MSC_VER <= 1700
146 #define WAIT_FOR_RETURN_TYPE std::cv_status::cv_status 142 #define WAIT_FOR_RETURN_TYPE std::cv_status::cv_status
147 #else 143 #else
148 #define WAIT_FOR_RETURN_TYPE std::cv_status 144 #define WAIT_FOR_RETURN_TYPE std::cv_status
149 #endif 145 #endif
150 146
151 /** 147 /**
152 * Wait for a limited duration. 148 * Wait for a limited duration.
153 * 149 *
154 * Return type was going to be "decltype(cv.wait_for)" for clarity. 150 * Return type was going to be "decltype(cv.wait_for)" for clarity.
155 * A defect in the toolset for VS 2012 causes that declaration not to compile. 151 * A defect in the toolset for VS 2012 causes that declaration not to compile.
156 */ 152 */
157 template <class R, class P> 153 template <class R, class P>
158 WAIT_FOR_RETURN_TYPE WaitFor(const std::chrono::duration<R,P>& relativeDuratio n) 154 WAIT_FOR_RETURN_TYPE WaitFor(const std::chrono::duration<R,P>& relativeDuratio n)
sergei 2015/10/01 15:50:06 Why do we need this method? It seems to be not use
Eric 2015/10/08 21:05:39 It's used in the unit tests. Since you didn't comm
159 { 155 {
160 return cv.wait_for(UniqueLockType(mutex), relativeDuration); 156 return cv.wait_for(UniqueLockType(mutex), relativeDuration);
161 } 157 }
162 }; 158 };
163 159
164 /** 160 /**
165 * An active queue with a single, internal consumer. 161 * An active queue with a single, internal consumer.
166 * 162 *
167 * This class presents the front of a message queue for arbitrary producers. 163 * This class presents the front of a message queue for arbitrary producers.
168 * The back of the message queue is managed by an internal consumer, 164 * The back of the message queue is managed by an internal consumer,
169 * which calls the processor for each element passed through the queue. 165 * which calls the processor for each element passed through the queue.
170 * 166 *
171 * The internal thread has the same lifetime as the object. 167 * The internal thread has the same lifetime as the object.
172 * There's no external interface to pause or kill the thread. 168 * There's no external interface to pause or kill the thread.
173 * Destroy the object to terminate the thread. 169 * Destroy the object to terminate the thread.
174 * The consumer drains the queue before terminating the thread. 170 * The consumer drains the queue before terminating the thread.
175 * 171 *
176 * \tparam T Class of elements in the queue 172 * \tparam T Class of elements in the queue
177 * \tparam F Type of functor to process each element consumed from the queue 173 * \tparam F Type of functor to process each element consumed from the queue
178 */ 174 */
179 template<class T, class F> 175 template<class T, class F>
180 class ActiveQueue 176 class ActiveQueue
sergei 2015/10/01 15:50:07 I would like to get rid of template parameters T a
Eric 2015/10/08 21:05:40 And I would like to keep them. You're suggesting n
181 { 177 {
182 /** 178 /**
183 * Signal flag indicating to the consumer thread that it should continue runni ng. 179 * Signal flag indicating to the consumer thread that it should continue runni ng.
184 * 180 *
185 * This flag is true for the entire duration of the object lifetime; 181 * This flag is true for the entire duration of the object lifetime;
186 * it's set to false only in the destructor. 182 * it's set to false only in the destructor.
187 * To avoid race conditions, it's only accessed under mutex protection with ca lls to notify and wait. 183 * To avoid race conditions, it's only accessed under mutex protection with ca lls to notify and wait.
188 * We accomplish this with lambdas passed as the functor arguments of the mess age queue functions 'Rouse' and 'WaitIf'. 184 * We accomplish this with lambdas passed as the functor arguments of the mess age queue functions 'Rouse' and 'WaitIf'.
189 * These lambdas are the only references to the variable after construction. 185 * These lambdas are the only references to the variable after construction.
190 */ 186 */
191 bool running; 187 bool running;
sergei 2015/10/01 15:50:07 Hint: if we access it only from the working thread
Eric 2015/10/08 21:05:40 Again: race conditions. And there's no extra prote
192 188
193 /** 189 /**
194 * Functor to process each element as it is removed from the queue. 190 * Functor to process each element as it is removed from the queue.
195 */ 191 */
196 F& processor; 192 F& processor;
sergei 2015/10/01 15:50:07 In general it's a bad idea to use reference member
Eric 2015/10/08 21:05:40 You did notice that this class contains a thread m
197 193
198 /** 194 /**
199 * The queue that the active thread behavior is wrapped around. 195 * The queue that the active thread behavior is wrapped around.
200 */ 196 */
201 MessageQueue<T> queue; 197 MessageQueue<T> queue;
202 198
203 /** 199 /**
204 * Thread running the consumer process. 200 * Thread running the consumer process.
205 * 201 *
206 * This thread runs the entire lifetime of this object. 202 * This thread runs the entire lifetime of this object.
207 * It's started in the constructor and joined in the destructor. 203 * It's started in the constructor and joined in the destructor.
208 * 204 *
209 * This member variable is declared last so that it is constructed last, 205 * This member variable is declared last so that it is constructed last,
210 * after all the other member necessary for proper functioning of the consum er. 206 * after all the other member necessary for proper functioning of the consum er.
211 */ 207 */
212 std::thread thread; 208 std::thread thread;
213 209
214 /** 210 /**
215 * Main function for the consumer thread. 211 * Main function for the consumer thread.
216 * 212 *
217 * Strictly speaking, it's the effective main function for the thread. 213 * Strictly speaking, it's the effective main function for the thread.
218 * The actual main function is a lambda that only calls this function and noth ing else. 214 * The actual main function is a lambda that only calls this function and noth ing else.
219 */ 215 */
220 void Consumer() 216 void Consumer()
221 { 217 {
222 while (queue.WaitIf([this]() { return running; })) 218 while (queue.WaitIf([this]() { return running; }))
223 { 219 {
224 // Drain the queue, processing all of its elements 220 // Drain the queue, processing all of its elements
225 while ( 221 while (
sergei 2015/10/01 15:50:06 It means that it will run until the queue is not e
Eric 2015/10/08 21:05:39 Of course it's intended. The whole point here is t
226 queue.Remove( 222 queue.Remove(
227 [&](T&& t) { 223 [&](T&& t) {
228 try 224 try
229 { 225 {
230 processor(std::move(t)); 226 processor(std::move(t));
231 } 227 }
232 catch (...) 228 catch (...)
233 { 229 {
234 // Ignore any exception 'processor' may throw. 230 // Ignore any exception 'processor' may throw.
235 } 231 }
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
276 } 272 }
277 273
278 /** 274 /**
279 * Insert 275 * Insert
280 */ 276 */
281 void Insert(T&& t) 277 void Insert(T&& t)
282 { 278 {
283 queue.Insert(std::move(t)); 279 queue.Insert(std::move(t));
284 } 280 }
285 }; 281 };
LEFTRIGHT

Powered by Google App Engine
This is Rietveld