OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * This file is part of Adblock Plus <https://adblockplus.org/>, |
| 3 * Copyright (C) 2006-2016 Eyeo GmbH |
| 4 * |
| 5 * Adblock Plus is free software: you can redistribute it and/or modify |
| 6 * it under the terms of the GNU General Public License version 3 as |
| 7 * published by the Free Software Foundation. |
| 8 * |
| 9 * Adblock Plus is distributed in the hope that it will be useful, |
| 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 12 * GNU General Public License for more details. |
| 13 * |
| 14 * You should have received a copy of the GNU General Public License |
| 15 * along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. |
| 16 */ |
| 17 |
| 18 #include <thread> |
| 19 #include <mutex> |
| 20 #include <condition_variable> |
| 21 #include <deque> |
| 22 #include "Placeholder.h" |
| 23 |
| 24 /** |
| 25 * A synchronized FIFO queue with thread notice on receipt. |
| 26 * |
| 27 * \tparam T |
| 28 * Class of the elements of the queue. |
| 29 * It must have a move constructor. |
| 30 */ |
| 31 template<class T> |
| 32 class MessageQueue |
| 33 { |
| 34 std::deque<T> queue; |
| 35 typedef std::lock_guard<std::mutex> SentryType; |
| 36 typedef std::unique_lock<std::mutex> UniqueLockType; |
| 37 std::mutex mutex; |
| 38 std::condition_variable cv; |
| 39 public: |
| 40 MessageQueue() {} // = default |
| 41 |
| 42 /* |
| 43 * This class does not have any responsibility with regard to the elements of
its queue. |
| 44 * An instance may be destroyed with elements still in the queue. |
| 45 */ |
| 46 ~MessageQueue() {} // = default; |
| 47 |
| 48 /** |
| 49 * Insert an element, copy semantics version. |
| 50 */ |
| 51 void Insert(const T& t) |
| 52 { |
| 53 SentryType sentry(mutex); |
| 54 queue.push_back(t); |
| 55 cv.notify_one(); |
| 56 } |
| 57 |
| 58 /** |
| 59 * Insert an element, move semantics version. |
| 60 */ |
| 61 void Insert(T&& t) |
| 62 { |
| 63 SentryType sentry(mutex); |
| 64 queue.push_back(std::move(t)); |
| 65 cv.notify_one(); |
| 66 } |
| 67 |
| 68 /** |
| 69 * Wake up anything waiting on the condition variable. |
| 70 * |
| 71 * \tparam Function |
| 72 * Functor type for argument |
| 73 * \param f |
| 74 * Functor to execute _inside_ the protection of the queue's mutex |
| 75 */ |
| 76 template<typename Function> |
| 77 void Rouse(Function& f) |
| 78 { |
| 79 SentryType sentry(mutex); |
| 80 f(); |
| 81 cv.notify_one(); |
| 82 } |
| 83 |
| 84 /** |
| 85 * Test, Remove, and Execute |
| 86 * |
| 87 * If the queue is not empty, remove the next element from the consumer end |
| 88 * of the queue and execute the argument functor on it. |
| 89 * |
| 90 * \param f |
| 91 * A functor to which to pass the element at the consumer end of the queue. |
| 92 * This functor is executed _outside_ the protection of the queue's mutex. |
| 93 * \return |
| 94 * True if and only if an element was removed from the front of the queue. |
| 95 */ |
| 96 template<typename Function> |
| 97 bool Remove(Function& f) |
| 98 { |
| 99 /* |
| 100 * We need to remove an element atomically from the queue, |
| 101 * so we need to construct that element _inside_ the protection of the mut
ex, |
| 102 * On the other hand, we need to execute the function _outside_ the protecti
on |
| 103 * of the queue, because otherwise performance may suffer. |
| 104 * Hence we use 'Placeholder' so that we can declare a variable to hold the |
| 105 * removed element without requiring it to have a default constructor or |
| 106 * assignment operators. |
| 107 */ |
| 108 Placeholder<T> x; |
| 109 { |
| 110 SentryType sentry(mutex); |
| 111 if (queue.empty()) |
| 112 return false; |
| 113 x.Construct(std::move(queue.front())); // only require move-constructib
ility |
| 114 queue.pop_front(); |
| 115 } |
| 116 f(std::move(x.Object())); |
| 117 return true; |
| 118 } |
| 119 |
| 120 /** |
| 121 * If a condition is satisfied, wait indefinitely. If not, don't wait. |
| 122 * |
| 123 * \return False, immediately without waiting, if the argument evaluates false
. |
| 124 * \return True, only after waiting, if the argument evaluates true. |
| 125 */ |
| 126 template<typename Predicate> |
| 127 bool WaitIf(Predicate& p) |
| 128 { |
| 129 UniqueLockType ul(mutex); |
| 130 if (!p()) |
| 131 { |
| 132 return false; |
| 133 } |
| 134 cv.wait(ul); |
| 135 return true; |
| 136 } |
| 137 |
| 138 /* |
| 139 * A defect in the compiler for VS2012 requires these definitions. |
| 140 */ |
| 141 #if defined(_MSC_VER) && _MSC_VER <= 1700 |
| 142 #define WAIT_FOR_RETURN_TYPE std::cv_status::cv_status |
| 143 #else |
| 144 #define WAIT_FOR_RETURN_TYPE std::cv_status |
| 145 #endif |
| 146 |
| 147 /** |
| 148 * Wait for a limited duration. |
| 149 * |
| 150 * Return type was going to be "decltype(cv.wait_for)" for clarity. |
| 151 * A defect in the toolset for VS 2012 causes that declaration not to compile. |
| 152 */ |
| 153 template <class R, class P> |
| 154 WAIT_FOR_RETURN_TYPE WaitFor(const std::chrono::duration<R,P>& relativeDuratio
n) |
| 155 { |
| 156 return cv.wait_for(UniqueLockType(mutex), relativeDuration); |
| 157 } |
| 158 }; |
| 159 |
| 160 /** |
| 161 * An active queue with a single, internal consumer. |
| 162 * |
| 163 * This class presents the front of a message queue for arbitrary producers. |
| 164 * The back of the message queue is managed by an internal consumer, |
| 165 * which calls the processor for each element passed through the queue. |
| 166 * |
| 167 * The internal thread has the same lifetime as the object. |
| 168 * There's no external interface to pause or kill the thread. |
| 169 * Destroy the object to terminate the thread. |
| 170 * The consumer drains the queue before terminating the thread. |
| 171 * |
| 172 * \tparam T Class of elements in the queue |
| 173 * \tparam F Type of functor to process each element consumed from the queue |
| 174 */ |
| 175 template<class T, class F> |
| 176 class ActiveQueue |
| 177 { |
| 178 /** |
| 179 * Signal flag indicating to the consumer thread that it should continue runni
ng. |
| 180 * |
| 181 * This flag is true for the entire duration of the object lifetime; |
| 182 * it's set to false only in the destructor. |
| 183 * To avoid race conditions, it's only accessed under mutex protection with ca
lls to notify and wait. |
| 184 * We accomplish this with lambdas passed as the functor arguments of the mess
age queue functions 'Rouse' and 'WaitIf'. |
| 185 * These lambdas are the only references to the variable after construction. |
| 186 */ |
| 187 bool running; |
| 188 |
| 189 /** |
| 190 * Functor to process each element as it is removed from the queue. |
| 191 */ |
| 192 F& processor; |
| 193 |
| 194 /** |
| 195 * The queue that the active thread behavior is wrapped around. |
| 196 */ |
| 197 MessageQueue<T> queue; |
| 198 |
| 199 /** |
| 200 * Thread running the consumer process. |
| 201 * |
| 202 * This thread runs the entire lifetime of this object. |
| 203 * It's started in the constructor and joined in the destructor. |
| 204 * |
| 205 * This member variable is declared last so that it is constructed last, |
| 206 * after all the other member necessary for proper functioning of the consum
er. |
| 207 */ |
| 208 std::thread thread; |
| 209 |
| 210 /** |
| 211 * Main function for the consumer thread. |
| 212 * |
| 213 * Strictly speaking, it's the effective main function for the thread. |
| 214 * The actual main function is a lambda that only calls this function and noth
ing else. |
| 215 */ |
| 216 void Consumer() |
| 217 { |
| 218 while (queue.WaitIf([this]() { return running; })) |
| 219 { |
| 220 // Drain the queue, processing all of its elements |
| 221 while ( |
| 222 queue.Remove( |
| 223 [&](T&& t) { |
| 224 try |
| 225 { |
| 226 processor(std::move(t)); |
| 227 } |
| 228 catch (...) |
| 229 { |
| 230 // Ignore any exception 'processor' may throw. |
| 231 } |
| 232 } |
| 233 ) |
| 234 ) |
| 235 { |
| 236 } |
| 237 // Loop termination: queue.Remove() removed nothing from the queue, so it'
s now empty |
| 238 } |
| 239 } |
| 240 |
| 241 public: |
| 242 ActiveQueue(F& f) |
| 243 : running(true), processor(f), queue(), thread([this]() { Consumer(); }) |
| 244 {} |
| 245 |
| 246 ~ActiveQueue() |
| 247 { |
| 248 /* |
| 249 * The consumer thread is waiting on the condition variable. |
| 250 * If we don't wake it up, our thread won't terminate. |
| 251 * |
| 252 * Note that we don't have a race condition here with new elements arriving |
| 253 * while we are draining the queue for the last time. |
| 254 * We are executing in the destructor, so our member function Insert() is un
available, |
| 255 * and it's the only way that new element enter the message queue. |
| 256 */ |
| 257 queue.Rouse([this](){ running = false; }); |
| 258 thread.join(); |
| 259 } |
| 260 |
| 261 ActiveQueue(const ActiveQueue&); // = delete |
| 262 ActiveQueue(ActiveQueue&&); // = delete |
| 263 ActiveQueue& operator=(const ActiveQueue&); // = delete |
| 264 ActiveQueue& operator=(ActiveQueue&&); // = delete |
| 265 |
| 266 /** |
| 267 * Insert |
| 268 */ |
| 269 void Insert(const T& t) |
| 270 { |
| 271 queue.Insert(t); |
| 272 } |
| 273 |
| 274 /** |
| 275 * Insert |
| 276 */ |
| 277 void Insert(T&& t) |
| 278 { |
| 279 queue.Insert(std::move(t)); |
| 280 } |
| 281 }; |
OLD | NEW |