| Left: | ||
| Right: |
| OLD | NEW |
|---|---|
| (Empty) | |
| 1 /* | |
| 2 * This file is part of Adblock Plus <https://adblockplus.org/>, | |
| 3 * Copyright (C) 2006-2015 Eyeo GmbH | |
| 4 * | |
| 5 * Adblock Plus is free software: you can redistribute it and/or modify | |
| 6 * it under the terms of the GNU General Public License version 3 as | |
| 7 * published by the Free Software Foundation. | |
| 8 * | |
| 9 * Adblock Plus is distributed in the hope that it will be useful, | |
| 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| 12 * GNU General Public License for more details. | |
| 13 * | |
| 14 * You should have received a copy of the GNU General Public License | |
| 15 * along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. | |
| 16 */ | |
| 17 | |
| 18 #include <thread> | |
| 19 #include <mutex> | |
| 20 #include <condition_variable> | |
| 21 #include <deque> | |
| 22 #include "Placeholder.h" | |
| 23 | |
| 24 /** | |
| 25 * A synchronized FIFO queue with thread notice on receipt. | |
|
sergei
2015/10/01 15:50:06
So, why is it called MessageQueue? It seems better
Eric
2015/10/08 21:05:40
Because that's what it is, a message queue.
| |
| 26 * | |
| 27 * \tparam T | |
| 28 * Class of the elements of the queue. | |
| 29 * It must have a move constructor. | |
| 30 * \tparam F | |
| 31 * Functor type of object to receive notice on Insert(). | |
| 32 */ | |
| 33 template<class T> | |
| 34 class MessageQueue | |
| 35 { | |
| 36 std::deque<T> queue; | |
| 37 typedef std::lock_guard<std::mutex> SentryType; | |
| 38 typedef std::unique_lock<std::mutex> UniqueLockType; | |
| 39 std::mutex mutex; | |
| 40 std::condition_variable cv; | |
| 41 public: | |
| 42 MessageQueue() {} // = default | |
|
sergei
2015/10/01 15:50:06
Why do we need `= default`? I would remove it.
Eric
2015/10/08 21:05:40
It's for when we have C++11 available. The string
Oleksandr
2016/01/28 10:15:32
We have quite few // = default comments in the cod
| |
| 43 | |
| 44 /* | |
| 45 * This class does not have any responsibility with regard to the elements of its queue. | |
| 46 * An instance may be destroyed with elements still in the queue. | |
| 47 */ | |
| 48 ~MessageQueue() {} // = default; | |
| 49 | |
| 50 /** | |
| 51 * Insert an element, copy constructor version. | |
| 52 */ | |
| 53 void Insert(const T& t) | |
|
sergei
2015/10/01 15:50:07
Do you mind to rename it to `push_back` or `PushBa
Eric
2015/10/08 21:05:39
I completely object. It's contrary to the abstract
Oleksandr
2016/01/28 10:15:33
I wouldn't mind Insert as well. I think we should
| |
| 54 { | |
| 55 { | |
|
sergei
2015/10/01 15:50:06
This additional scope is not needed here nor in th
Eric
2015/10/08 21:05:40
Yep. I think this is a left-over artifact from dev
| |
| 56 SentryType sentry(mutex); | |
| 57 queue.push_back(t); | |
| 58 cv.notify_one(); | |
| 59 } | |
| 60 } | |
| 61 | |
| 62 /** | |
| 63 * Insert an element, move constructor version. | |
| 64 */ | |
| 65 void Insert(T&& t) | |
| 66 { | |
| 67 { | |
| 68 SentryType sentry(mutex); | |
| 69 queue.push_back(std::move(t)); | |
| 70 cv.notify_one(); | |
| 71 } | |
| 72 } | |
| 73 | |
| 74 /** | |
| 75 * Wake up anything waiting on the condition variable. | |
| 76 * | |
| 77 * \tparam Function | |
| 78 * Functor type for argument | |
| 79 * \param f | |
| 80 * Functor to execute _inside_ the protection of the queue's mutex | |
| 81 */ | |
| 82 template<typename Function> | |
| 83 void Rouse(Function& f) | |
|
sergei
2015/10/01 15:50:06
The reason we need such method is to set a value o
Eric
2015/10/08 21:05:40
You get a race condition if you do it that way. Th
| |
| 84 { | |
| 85 SentryType sentry(mutex); | |
| 86 f(); | |
| 87 cv.notify_one(); | |
| 88 } | |
| 89 | |
| 90 /** | |
| 91 * Remove and Test. | |
| 92 * Test that the queue is not empty. | |
| 93 * If so, remove the next element from the consumer end of the queue and pass it to the argument function | |
| 94 * | |
| 95 * \param f | |
| 96 * If return value is true, a functor to which to pass the element at the co nsumer end of the queue. | |
| 97 * This functor is executed _outside_ the protection of the queue's mutex. | |
| 98 * \return | |
| 99 * True if and only if an element was removed from the front of the queue. | |
| 100 */ | |
| 101 template<typename Function> | |
| 102 bool Remove(Function& f) | |
|
sergei
2015/10/01 15:50:06
Why not to call it `pop_front` or `try_pop_front(.
Eric
2015/10/08 21:05:39
As I've said before, this is not any kind of gener
| |
| 103 { | |
| 104 /* | |
| 105 * We need to remove an element atomically from the queue, | |
| 106 * so we need to construct that element _inside_ the protection of the mut ex, | |
| 107 * On the other hand, we need to execute the function _outside_ the protecti on of the queue, | |
| 108 * because otherwise performance may suffer. | |
| 109 * Hence we use 'Placeholder' so that we can declare a variable to hold the removed element | |
| 110 * without requiring it to have a default constructor or assignment operat ors. | |
| 111 */ | |
| 112 Placeholder<T> x; | |
|
sergei
2015/10/01 15:50:06
I see only one reason to use a class `Placeholder`
Eric
2015/10/08 21:05:40
You see only one reason, yet there are other reaso
| |
| 113 { | |
| 114 SentryType sentry(mutex); | |
| 115 if (queue.empty()) | |
| 116 return false; | |
| 117 x.Construct(std::move(queue.front())); // only require move-constructib ility | |
| 118 queue.pop_front(); | |
| 119 } | |
| 120 f(std::move(x.Object())); | |
| 121 return true; | |
| 122 } | |
| 123 | |
| 124 /** | |
| 125 * If a condition is satisfied, wait indefinitely. If not, don't wait. | |
| 126 * | |
| 127 * \return False, immediately without waiting, if the argument evaluates false . | |
| 128 * \return True, only after waiting, if the argument evaluates true. | |
| 129 */ | |
| 130 template<typename Predicate> | |
| 131 bool WaitIf(Predicate& p) | |
|
sergei
2015/10/01 15:50:06
It's still `MessageQueue` class, how is this metho
Eric
2015/10/08 21:05:39
You get a race condition if you do it that way. Tr
Oleksandr
2016/01/28 10:15:32
I agree with Sergei, this method is very awkward.
Eric
2016/02/04 21:01:42
What's awkward about it? Testing a predicate under
| |
| 132 { | |
| 133 UniqueLockType ul(mutex); | |
| 134 if (!p()) | |
| 135 { | |
| 136 return false; | |
| 137 } | |
| 138 cv.wait(ul); | |
| 139 return true; | |
| 140 } | |
| 141 | |
| 142 /* | |
| 143 * A defect in the compiler for VS2012 requires these definitions. | |
| 144 */ | |
| 145 #if defined(_MSC_VER) && _MSC_VER <= 1700 | |
| 146 #define WAIT_FOR_RETURN_TYPE std::cv_status::cv_status | |
| 147 #else | |
| 148 #define WAIT_FOR_RETURN_TYPE std::cv_status | |
| 149 #endif | |
| 150 | |
| 151 /** | |
| 152 * Wait for a limited duration. | |
| 153 * | |
| 154 * Return type was going to be "decltype(cv.wait_for)" for clarity. | |
| 155 * A defect in the toolset for VS 2012 causes that declaration not to compile. | |
| 156 */ | |
| 157 template <class R, class P> | |
| 158 WAIT_FOR_RETURN_TYPE WaitFor(const std::chrono::duration<R,P>& relativeDuratio n) | |
|
sergei
2015/10/01 15:50:06
Why do we need this method? It seems to be not use
Eric
2015/10/08 21:05:39
It's used in the unit tests. Since you didn't comm
| |
| 159 { | |
| 160 return cv.wait_for(UniqueLockType(mutex), relativeDuration); | |
| 161 } | |
| 162 }; | |
| 163 | |
| 164 /** | |
| 165 * An active queue with a single, internal consumer. | |
| 166 * | |
| 167 * This class presents the front of a message queue for arbitrary producers. | |
| 168 * The back of the message queue is managed by an internal consumer, | |
| 169 * which calls the processor for each element passed through the queue. | |
| 170 * | |
| 171 * The internal thread has the same lifetime as the object. | |
| 172 * There's no external interface to pause or kill the thread. | |
| 173 * Destroy the object to terminate the thread. | |
| 174 * The consumer drains the queue before terminating the thread. | |
| 175 * | |
| 176 * \tparam T Class of elements in the queue | |
| 177 * \tparam F Type of functor to process each element consumed from the queue | |
| 178 */ | |
| 179 template<class T, class F> | |
| 180 class ActiveQueue | |
|
sergei
2015/10/01 15:50:07
I would like to get rid of template parameters T a
Eric
2015/10/08 21:05:40
And I would like to keep them. You're suggesting n
| |
| 181 { | |
| 182 /** | |
| 183 * Signal flag indicating to the consumer thread that it should continue runni ng. | |
| 184 * | |
| 185 * This flag is true for the entire duration of the object lifetime; | |
| 186 * it's set to false only in the destructor. | |
| 187 * To avoid race conditions, it's only accessed under mutex protection with ca lls to notify and wait. | |
| 188 * We accomplish this with lambdas passed as the functor arguments of the mess age queue functions 'Rouse' and 'WaitIf'. | |
| 189 * These lambdas are the only references to the variable after construction. | |
| 190 */ | |
| 191 bool running; | |
|
sergei
2015/10/01 15:50:07
Hint: if we access it only from the working thread
Eric
2015/10/08 21:05:40
Again: race conditions. And there's no extra prote
| |
| 192 | |
| 193 /** | |
| 194 * Functor to process each element as it is removed from the queue. | |
| 195 */ | |
| 196 F& processor; | |
|
sergei
2015/10/01 15:50:07
In general it's a bad idea to use reference member
Eric
2015/10/08 21:05:40
You did notice that this class contains a thread m
| |
| 197 | |
| 198 /** | |
| 199 * The queue that the active thread behavior is wrapped around. | |
| 200 */ | |
| 201 MessageQueue<T> queue; | |
| 202 | |
| 203 /** | |
| 204 * Thread running the consumer process. | |
| 205 * | |
| 206 * This thread runs the entire lifetime of this object. | |
| 207 * It's started in the constructor and joined in the destructor. | |
| 208 * | |
| 209 * This member variable is declared last so that it is constructed last, | |
| 210 * after all the other member necessary for proper functioning of the consum er. | |
| 211 */ | |
| 212 std::thread thread; | |
| 213 | |
| 214 /** | |
| 215 * Main function for the consumer thread. | |
| 216 * | |
| 217 * Strictly speaking, it's the effective main function for the thread. | |
| 218 * The actual main function is a lambda that only calls this function and noth ing else. | |
| 219 */ | |
| 220 void Consumer() | |
| 221 { | |
| 222 while (queue.WaitIf([this]() { return running; })) | |
| 223 { | |
| 224 // Drain the queue, processing all of its elements | |
| 225 while ( | |
|
sergei
2015/10/01 15:50:06
It means that it will run until the queue is not e
Eric
2015/10/08 21:05:39
Of course it's intended. The whole point here is t
| |
| 226 queue.Remove( | |
| 227 [&](T&& t) { | |
| 228 try | |
| 229 { | |
| 230 processor(std::move(t)); | |
| 231 } | |
| 232 catch (...) | |
| 233 { | |
| 234 // Ignore any exception 'processor' may throw. | |
| 235 } | |
| 236 } | |
| 237 ) | |
| 238 ) | |
| 239 { | |
| 240 } | |
| 241 // Loop termination: queue.Remove() removed nothing from the queue, so it' s now empty | |
| 242 } | |
| 243 } | |
| 244 | |
| 245 public: | |
| 246 ActiveQueue(F& f) | |
| 247 : running(true), processor(f), queue(), thread([this]() { Consumer(); }) | |
| 248 {} | |
| 249 | |
| 250 ~ActiveQueue() | |
| 251 { | |
| 252 /* | |
| 253 * The consumer thread is waiting on the condition variable. | |
| 254 * If we don't wake it up, our thread won't terminate. | |
| 255 * | |
| 256 * Note that we don't have a race condition here with new elements arriving | |
| 257 * while we are draining the queue for the last time. | |
| 258 * We are executing in the destructor, so our member function Insert() is un available, | |
| 259 * and it's the only way that new element enter the message queue. | |
| 260 */ | |
| 261 queue.Rouse([this](){ running = false; }); | |
| 262 thread.join(); | |
| 263 } | |
| 264 | |
| 265 ActiveQueue(const ActiveQueue&); // = delete | |
| 266 ActiveQueue(ActiveQueue&&); // = delete | |
| 267 ActiveQueue& operator=(const ActiveQueue&); // = delete | |
| 268 ActiveQueue& operator=(ActiveQueue&&); // = delete | |
| 269 | |
| 270 /** | |
| 271 * Insert | |
| 272 */ | |
| 273 void Insert(const T& t) | |
| 274 { | |
| 275 queue.Insert(t); | |
| 276 } | |
| 277 | |
| 278 /** | |
| 279 * Insert | |
| 280 */ | |
| 281 void Insert(T&& t) | |
| 282 { | |
| 283 queue.Insert(std::move(t)); | |
| 284 } | |
| 285 }; | |
| OLD | NEW |