Index: src/plugin/ActiveQueue.h |
=================================================================== |
new file mode 100644 |
--- /dev/null |
+++ b/src/plugin/ActiveQueue.h |
@@ -0,0 +1,285 @@ |
+/* |
+ * This file is part of Adblock Plus <https://adblockplus.org/>, |
+ * Copyright (C) 2006-2015 Eyeo GmbH |
+ * |
+ * Adblock Plus is free software: you can redistribute it and/or modify |
+ * it under the terms of the GNU General Public License version 3 as |
+ * published by the Free Software Foundation. |
+ * |
+ * Adblock Plus is distributed in the hope that it will be useful, |
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of |
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
+ * GNU General Public License for more details. |
+ * |
+ * You should have received a copy of the GNU General Public License |
+ * along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. |
+ */ |
+ |
+#include <thread> |
+#include <mutex> |
+#include <condition_variable> |
+#include <deque> |
+#include "Placeholder.h" |
+ |
+/** |
+ * A synchronized FIFO queue with thread notice on receipt. |
sergei
2015/10/01 15:50:06
So, why is it called MessageQueue? It seems better
Eric
2015/10/08 21:05:40
Because that's what it is, a message queue.
|
+ * |
+ * \tparam T |
+ * Class of the elements of the queue. |
+ * It must have a move constructor. |
+ * \tparam F |
+ * Functor type of object to receive notice on Insert(). |
+ */ |
+template<class T> |
+class MessageQueue |
+{ |
+ std::deque<T> queue; |
+ typedef std::lock_guard<std::mutex> SentryType; |
+ typedef std::unique_lock<std::mutex> UniqueLockType; |
+ std::mutex mutex; |
+ std::condition_variable cv; |
+public: |
+ MessageQueue() {} // = default |
sergei
2015/10/01 15:50:06
Why do we need `= default`? I would remove it.
Eric
2015/10/08 21:05:40
It's for when we have C++11 available. The string
Oleksandr
2016/01/28 10:15:32
We have quite few // = default comments in the cod
|
+ |
+ /* |
+ * This class does not have any responsibility with regard to the elements of its queue. |
+ * An instance may be destroyed with elements still in the queue. |
+ */ |
+ ~MessageQueue() {} // = default; |
+ |
+ /** |
+ * Insert an element, copy constructor version. |
+ */ |
+ void Insert(const T& t) |
sergei
2015/10/01 15:50:07
Do you mind to rename it to `push_back` or `PushBa
Eric
2015/10/08 21:05:39
I completely object. It's contrary to the abstract
Oleksandr
2016/01/28 10:15:33
I wouldn't mind Insert as well. I think we should
|
+ { |
+ { |
sergei
2015/10/01 15:50:06
This additional scope is not needed here nor in th
Eric
2015/10/08 21:05:40
Yep. I think this is a left-over artifact from dev
|
+ SentryType sentry(mutex); |
+ queue.push_back(t); |
+ cv.notify_one(); |
+ } |
+ } |
+ |
+ /** |
+ * Insert an element, move constructor version. |
+ */ |
+ void Insert(T&& t) |
+ { |
+ { |
+ SentryType sentry(mutex); |
+ queue.push_back(std::move(t)); |
+ cv.notify_one(); |
+ } |
+ } |
+ |
+ /** |
+ * Wake up anything waiting on the condition variable. |
+ * |
+ * \tparam Function |
+ * Functor type for argument |
+ * \param f |
+ * Functor to execute _inside_ the protection of the queue's mutex |
+ */ |
+ template<typename Function> |
+ void Rouse(Function& f) |
sergei
2015/10/01 15:50:06
The reason we need such method is to set a value o
Eric
2015/10/08 21:05:40
You get a race condition if you do it that way. Th
|
+ { |
+ SentryType sentry(mutex); |
+ f(); |
+ cv.notify_one(); |
+ } |
+ |
+ /** |
+ * Remove and Test. |
+ * Test that the queue is not empty. |
+ * If so, remove the next element from the consumer end of the queue and pass it to the argument function |
+ * |
+ * \param f |
+ * If return value is true, a functor to which to pass the element at the consumer end of the queue. |
+ * This functor is executed _outside_ the protection of the queue's mutex. |
+ * \return |
+ * True if and only if an element was removed from the front of the queue. |
+ */ |
+ template<typename Function> |
+ bool Remove(Function& f) |
sergei
2015/10/01 15:50:06
Why not to call it `pop_front` or `try_pop_front(.
Eric
2015/10/08 21:05:39
As I've said before, this is not any kind of gener
|
+ { |
+ /* |
+ * We need to remove an element atomically from the queue, |
+ * so we need to construct that element _inside_ the protection of the mutex, |
+ * On the other hand, we need to execute the function _outside_ the protection of the queue, |
+ * because otherwise performance may suffer. |
+ * Hence we use 'Placeholder' so that we can declare a variable to hold the removed element |
+ * without requiring it to have a default constructor or assignment operators. |
+ */ |
+ Placeholder<T> x; |
sergei
2015/10/01 15:50:06
I see only one reason to use a class `Placeholder`
Eric
2015/10/08 21:05:40
You see only one reason, yet there are other reaso
|
+ { |
+ SentryType sentry(mutex); |
+ if (queue.empty()) |
+ return false; |
+ x.Construct(std::move(queue.front())); // only require move-constructibility |
+ queue.pop_front(); |
+ } |
+ f(std::move(x.Object())); |
+ return true; |
+ } |
+ |
+ /** |
+ * If a condition is satisfied, wait indefinitely. If not, don't wait. |
+ * |
+ * \return False, immediately without waiting, if the argument evaluates false. |
+ * \return True, only after waiting, if the argument evaluates true. |
+ */ |
+ template<typename Predicate> |
+ bool WaitIf(Predicate& p) |
sergei
2015/10/01 15:50:06
It's still `MessageQueue` class, how is this metho
Eric
2015/10/08 21:05:39
You get a race condition if you do it that way. Tr
Oleksandr
2016/01/28 10:15:32
I agree with Sergei, this method is very awkward.
Eric
2016/02/04 21:01:42
What's awkward about it? Testing a predicate under
|
+ { |
+ UniqueLockType ul(mutex); |
+ if (!p()) |
+ { |
+ return false; |
+ } |
+ cv.wait(ul); |
+ return true; |
+ } |
+ |
+ /* |
+ * A defect in the compiler for VS2012 requires these definitions. |
+ */ |
+#if defined(_MSC_VER) && _MSC_VER <= 1700 |
+#define WAIT_FOR_RETURN_TYPE std::cv_status::cv_status |
+#else |
+#define WAIT_FOR_RETURN_TYPE std::cv_status |
+#endif |
+ |
+ /** |
+ * Wait for a limited duration. |
+ * |
+ * Return type was going to be "decltype(cv.wait_for)" for clarity. |
+ * A defect in the toolset for VS 2012 causes that declaration not to compile. |
+ */ |
+ template <class R, class P> |
+ WAIT_FOR_RETURN_TYPE WaitFor(const std::chrono::duration<R,P>& relativeDuration) |
sergei
2015/10/01 15:50:06
Why do we need this method? It seems to be not use
Eric
2015/10/08 21:05:39
It's used in the unit tests. Since you didn't comm
|
+ { |
+ return cv.wait_for(UniqueLockType(mutex), relativeDuration); |
+ } |
+}; |
+ |
+/** |
+ * An active queue with a single, internal consumer. |
+ * |
+ * This class presents the front of a message queue for arbitrary producers. |
+ * The back of the message queue is managed by an internal consumer, |
+ * which calls the processor for each element passed through the queue. |
+ * |
+ * The internal thread has the same lifetime as the object. |
+ * There's no external interface to pause or kill the thread. |
+ * Destroy the object to terminate the thread. |
+ * The consumer drains the queue before terminating the thread. |
+ * |
+ * \tparam T Class of elements in the queue |
+ * \tparam F Type of functor to process each element consumed from the queue |
+ */ |
+template<class T, class F> |
+class ActiveQueue |
sergei
2015/10/01 15:50:07
I would like to get rid of template parameters T a
Eric
2015/10/08 21:05:40
And I would like to keep them. You're suggesting n
|
+{ |
+ /** |
+ * Signal flag indicating to the consumer thread that it should continue running. |
+ * |
+ * This flag is true for the entire duration of the object lifetime; |
+ * it's set to false only in the destructor. |
+ * To avoid race conditions, it's only accessed under mutex protection with calls to notify and wait. |
+ * We accomplish this with lambdas passed as the functor arguments of the message queue functions 'Rouse' and 'WaitIf'. |
+ * These lambdas are the only references to the variable after construction. |
+ */ |
+ bool running; |
sergei
2015/10/01 15:50:07
Hint: if we access it only from the working thread
Eric
2015/10/08 21:05:40
Again: race conditions. And there's no extra prote
|
+ |
+ /** |
+ * Functor to process each element as it is removed from the queue. |
+ */ |
+ F& processor; |
sergei
2015/10/01 15:50:07
In general it's a bad idea to use reference member
Eric
2015/10/08 21:05:40
You did notice that this class contains a thread m
|
+ |
+ /** |
+ * The queue that the active thread behavior is wrapped around. |
+ */ |
+ MessageQueue<T> queue; |
+ |
+ /** |
+ * Thread running the consumer process. |
+ * |
+ * This thread runs the entire lifetime of this object. |
+ * It's started in the constructor and joined in the destructor. |
+ * |
+ * This member variable is declared last so that it is constructed last, |
+ * after all the other member necessary for proper functioning of the consumer. |
+ */ |
+ std::thread thread; |
+ |
+ /** |
+ * Main function for the consumer thread. |
+ * |
+ * Strictly speaking, it's the effective main function for the thread. |
+ * The actual main function is a lambda that only calls this function and nothing else. |
+ */ |
+ void Consumer() |
+ { |
+ while (queue.WaitIf([this]() { return running; })) |
+ { |
+ // Drain the queue, processing all of its elements |
+ while ( |
sergei
2015/10/01 15:50:06
It means that it will run until the queue is not e
Eric
2015/10/08 21:05:39
Of course it's intended. The whole point here is t
|
+ queue.Remove( |
+ [&](T&& t) { |
+ try |
+ { |
+ processor(std::move(t)); |
+ } |
+ catch (...) |
+ { |
+ // Ignore any exception 'processor' may throw. |
+ } |
+ } |
+ ) |
+ ) |
+ { |
+ } |
+ // Loop termination: queue.Remove() removed nothing from the queue, so it's now empty |
+ } |
+ } |
+ |
+public: |
+ ActiveQueue(F& f) |
+ : running(true), processor(f), queue(), thread([this]() { Consumer(); }) |
+ {} |
+ |
+ ~ActiveQueue() |
+ { |
+ /* |
+ * The consumer thread is waiting on the condition variable. |
+ * If we don't wake it up, our thread won't terminate. |
+ * |
+ * Note that we don't have a race condition here with new elements arriving |
+ * while we are draining the queue for the last time. |
+ * We are executing in the destructor, so our member function Insert() is unavailable, |
+ * and it's the only way that new element enter the message queue. |
+ */ |
+ queue.Rouse([this](){ running = false; }); |
+ thread.join(); |
+ } |
+ |
+ ActiveQueue(const ActiveQueue&); // = delete |
+ ActiveQueue(ActiveQueue&&); // = delete |
+ ActiveQueue& operator=(const ActiveQueue&); // = delete |
+ ActiveQueue& operator=(ActiveQueue&&); // = delete |
+ |
+ /** |
+ * Insert |
+ */ |
+ void Insert(const T& t) |
+ { |
+ queue.Insert(t); |
+ } |
+ |
+ /** |
+ * Insert |
+ */ |
+ void Insert(T&& t) |
+ { |
+ queue.Insert(std::move(t)); |
+ } |
+}; |