| Index: src/plugin/ActiveQueue.h |
| =================================================================== |
| new file mode 100644 |
| --- /dev/null |
| +++ b/src/plugin/ActiveQueue.h |
| @@ -0,0 +1,285 @@ |
| +/* |
| + * This file is part of Adblock Plus <https://adblockplus.org/>, |
| + * Copyright (C) 2006-2015 Eyeo GmbH |
|
Oleksandr
2016/01/28 10:15:34
Nit: Here and further sadly we have already update
Eric
2016/02/04 21:01:44
Done.
|
| + * |
| + * Adblock Plus is free software: you can redistribute it and/or modify |
| + * it under the terms of the GNU General Public License version 3 as |
| + * published by the Free Software Foundation. |
| + * |
| + * Adblock Plus is distributed in the hope that it will be useful, |
| + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| + * GNU General Public License for more details. |
| + * |
| + * You should have received a copy of the GNU General Public License |
| + * along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. |
| + */ |
| + |
| +#include <thread> |
| +#include <mutex> |
| +#include <condition_variable> |
| +#include <deque> |
| +#include "Placeholder.h" |
| + |
| +/** |
| + * A synchronized FIFO queue with thread notice on receipt. |
| + * |
| + * \tparam T |
| + * Class of the elements of the queue. |
| + * It must have a move constructor. |
| + * \tparam F |
| + * Functor type of object to receive notice on Insert(). |
|
Oleksandr
2016/01/28 10:15:34
Nit: I guess this parameter was removed at some po
Eric
2016/02/04 21:01:43
Yes. As I recall, I originally only had one class.
|
| + */ |
| +template<class T> |
| +class MessageQueue |
| +{ |
| + std::deque<T> queue; |
| + typedef std::lock_guard<std::mutex> SentryType; |
| + typedef std::unique_lock<std::mutex> UniqueLockType; |
| + std::mutex mutex; |
| + std::condition_variable cv; |
| +public: |
| + MessageQueue() {} // = default |
| + |
| + /* |
| + * This class does not have any responsibility with regard to the elements of its queue. |
| + * An instance may be destroyed with elements still in the queue. |
| + */ |
| + ~MessageQueue() {} // = default; |
| + |
| + /** |
| + * Insert an element, copy constructor version. |
|
Oleksandr
2016/01/28 10:15:34
Nit: this is not a constructor. I think "lvalue ve
Eric
2016/02/04 21:01:44
OK. I'll use "copy semantics".
|
| + */ |
| + void Insert(const T& t) |
| + { |
| + { |
|
Oleksandr
2016/01/28 10:15:35
Just to reiterate Sergei's point - the extra scope
Eric
2016/02/04 21:01:44
Done.
|
| + SentryType sentry(mutex); |
| + queue.push_back(t); |
| + cv.notify_one(); |
| + } |
| + } |
| + |
| + /** |
| + * Insert an element, move constructor version. |
| + */ |
| + void Insert(T&& t) |
| + { |
| + { |
| + SentryType sentry(mutex); |
| + queue.push_back(std::move(t)); |
| + cv.notify_one(); |
| + } |
| + } |
| + |
| + /** |
| + * Wake up anything waiting on the condition variable. |
| + * |
| + * \tparam Function |
| + * Functor type for argument |
| + * \param f |
| + * Functor to execute _inside_ the protection of the queue's mutex |
| + */ |
| + template<typename Function> |
| + void Rouse(Function& f) |
| + { |
| + SentryType sentry(mutex); |
| + f(); |
| + cv.notify_one(); |
| + } |
| + |
| + /** |
| + * Remove and Test. |
| + * Test that the queue is not empty. |
| + * If so, remove the next element from the consumer end of the queue and pass it to the argument function |
|
Oleksandr
2016/01/28 10:15:34
Nit: I think there are too many comments in this f
Eric
2016/02/04 21:01:43
Reworded.
|
| + * |
| + * \param f |
| + * If return value is true, a functor to which to pass the element at the consumer end of the queue. |
| + * This functor is executed _outside_ the protection of the queue's mutex. |
| + * \return |
| + * True if and only if an element was removed from the front of the queue. |
| + */ |
| + template<typename Function> |
| + bool Remove(Function& f) |
| + { |
| + /* |
| + * We need to remove an element atomically from the queue, |
| + * so we need to construct that element _inside_ the protection of the mutex, |
| + * On the other hand, we need to execute the function _outside_ the protection of the queue, |
| + * because otherwise performance may suffer. |
| + * Hence we use 'Placeholder' so that we can declare a variable to hold the removed element |
| + * without requiring it to have a default constructor or assignment operators. |
| + */ |
| + Placeholder<T> x; |
| + { |
| + SentryType sentry(mutex); |
| + if (queue.empty()) |
| + return false; |
| + x.Construct(std::move(queue.front())); // only require move-constructibility |
| + queue.pop_front(); |
| + } |
| + f(std::move(x.Object())); |
| + return true; |
| + } |
| + |
| + /** |
| + * If a condition is satisfied, wait indefinitely. If not, don't wait. |
| + * |
| + * \return False, immediately without waiting, if the argument evaluates false. |
| + * \return True, only after waiting, if the argument evaluates true. |
| + */ |
| + template<typename Predicate> |
| + bool WaitIf(Predicate& p) |
| + { |
| + UniqueLockType ul(mutex); |
| + if (!p()) |
| + { |
| + return false; |
| + } |
| + cv.wait(ul); |
| + return true; |
| + } |
| + |
| + /* |
| + * A defect in the compiler for VS2012 requires these definitions. |
| + */ |
| +#if defined(_MSC_VER) && _MSC_VER <= 1700 |
| +#define WAIT_FOR_RETURN_TYPE std::cv_status::cv_status |
| +#else |
| +#define WAIT_FOR_RETURN_TYPE std::cv_status |
| +#endif |
| + |
| + /** |
| + * Wait for a limited duration. |
| + * |
| + * Return type was going to be "decltype(cv.wait_for)" for clarity. |
| + * A defect in the toolset for VS 2012 causes that declaration not to compile. |
| + */ |
| + template <class R, class P> |
| + WAIT_FOR_RETURN_TYPE WaitFor(const std::chrono::duration<R,P>& relativeDuration) |
| + { |
| + return cv.wait_for(UniqueLockType(mutex), relativeDuration); |
| + } |
| +}; |
| + |
| +/** |
| + * An active queue with a single, internal consumer. |
| + * |
| + * This class presents the front of a message queue for arbitrary producers. |
| + * The back of the message queue is managed by an internal consumer, |
| + * which calls the processor for each element passed through the queue. |
| + * |
| + * The internal thread has the same lifetime as the object. |
| + * There's no external interface to pause or kill the thread. |
| + * Destroy the object to terminate the thread. |
| + * The consumer drains the queue before terminating the thread. |
| + * |
| + * \tparam T Class of elements in the queue |
| + * \tparam F Type of functor to process each element consumed from the queue |
| + */ |
| +template<class T, class F> |
| +class ActiveQueue |
| +{ |
| + /** |
| + * Signal flag indicating to the consumer thread that it should continue running. |
| + * |
| + * This flag is true for the entire duration of the object lifetime; |
| + * it's set to false only in the destructor. |
| + * To avoid race conditions, it's only accessed under mutex protection with calls to notify and wait. |
| + * We accomplish this with lambdas passed as the functor arguments of the message queue functions 'Rouse' and 'WaitIf'. |
| + * These lambdas are the only references to the variable after construction. |
| + */ |
| + bool running; |
| + |
| + /** |
| + * Functor to process each element as it is removed from the queue. |
| + */ |
| + F& processor; |
| + |
| + /** |
| + * The queue that the active thread behavior is wrapped around. |
| + */ |
| + MessageQueue<T> queue; |
| + |
| + /** |
| + * Thread running the consumer process. |
| + * |
| + * This thread runs the entire lifetime of this object. |
| + * It's started in the constructor and joined in the destructor. |
| + * |
| + * This member variable is declared last so that it is constructed last, |
| + * after all the other member necessary for proper functioning of the consumer. |
| + */ |
| + std::thread thread; |
| + |
| + /** |
| + * Main function for the consumer thread. |
| + * |
| + * Strictly speaking, it's the effective main function for the thread. |
| + * The actual main function is a lambda that only calls this function and nothing else. |
| + */ |
| + void Consumer() |
| + { |
| + while (queue.WaitIf([this]() { return running; })) |
| + { |
| + // Drain the queue, processing all of its elements |
| + while ( |
| + queue.Remove( |
| + [&](T&& t) { |
| + try |
| + { |
| + processor(std::move(t)); |
| + } |
| + catch (...) |
| + { |
| + // Ignore any exception 'processor' may throw. |
| + } |
| + } |
| + ) |
| + ) |
| + { |
| + } |
| + // Loop termination: queue.Remove() removed nothing from the queue, so it's now empty |
| + } |
| + } |
| + |
| +public: |
| + ActiveQueue(F& f) |
| + : running(true), processor(f), queue(), thread([this]() { Consumer(); }) |
| + {} |
| + |
| + ~ActiveQueue() |
| + { |
| + /* |
| + * The consumer thread is waiting on the condition variable. |
| + * If we don't wake it up, our thread won't terminate. |
| + * |
| + * Note that we don't have a race condition here with new elements arriving |
| + * while we are draining the queue for the last time. |
| + * We are executing in the destructor, so our member function Insert() is unavailable, |
| + * and it's the only way that new element enter the message queue. |
| + */ |
| + queue.Rouse([this](){ running = false; }); |
| + thread.join(); |
| + } |
| + |
| + ActiveQueue(const ActiveQueue&); // = delete |
| + ActiveQueue(ActiveQueue&&); // = delete |
| + ActiveQueue& operator=(const ActiveQueue&); // = delete |
| + ActiveQueue& operator=(ActiveQueue&&); // = delete |
| + |
| + /** |
| + * Insert |
| + */ |
| + void Insert(const T& t) |
| + { |
| + queue.Insert(t); |
| + } |
| + |
| + /** |
| + * Insert |
| + */ |
| + void Insert(T&& t) |
| + { |
| + queue.Insert(std::move(t)); |
| + } |
| +}; |