Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Unified Diff: src/plugin/ActiveQueue.h

Issue 29323611: Issue #1234, #2058 - Rewrite log facility, improving thread implementation
Patch Set: Created Aug. 19, 2015, 5:42 p.m.
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: src/plugin/ActiveQueue.h
===================================================================
new file mode 100644
--- /dev/null
+++ b/src/plugin/ActiveQueue.h
@@ -0,0 +1,285 @@
+/*
+ * This file is part of Adblock Plus <https://adblockplus.org/>,
+ * Copyright (C) 2006-2015 Eyeo GmbH
+ *
+ * Adblock Plus is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 3 as
+ * published by the Free Software Foundation.
+ *
+ * Adblock Plus is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <deque>
+#include "Placeholder.h"
+
+/**
+ * A synchronized FIFO queue with thread notice on receipt.
sergei 2015/10/01 15:50:06 So, why is it called MessageQueue? It seems better
Eric 2015/10/08 21:05:40 Because that's what it is, a message queue.
+ *
+ * \tparam T
+ * Class of the elements of the queue.
+ * It must have a move constructor.
+ * \tparam F
+ * Functor type of object to receive notice on Insert().
+ */
+template<class T>
+class MessageQueue
+{
+ std::deque<T> queue;
+ typedef std::lock_guard<std::mutex> SentryType;
+ typedef std::unique_lock<std::mutex> UniqueLockType;
+ std::mutex mutex;
+ std::condition_variable cv;
+public:
+ MessageQueue() {} // = default
sergei 2015/10/01 15:50:06 Why do we need `= default`? I would remove it.
Eric 2015/10/08 21:05:40 It's for when we have C++11 available. The string
Oleksandr 2016/01/28 10:15:32 We have quite few // = default comments in the cod
+
+ /*
+ * This class does not have any responsibility with regard to the elements of its queue.
+ * An instance may be destroyed with elements still in the queue.
+ */
+ ~MessageQueue() {} // = default;
+
+ /**
+ * Insert an element, copy constructor version.
+ */
+ void Insert(const T& t)
sergei 2015/10/01 15:50:07 Do you mind to rename it to `push_back` or `PushBa
Eric 2015/10/08 21:05:39 I completely object. It's contrary to the abstract
Oleksandr 2016/01/28 10:15:33 I wouldn't mind Insert as well. I think we should
+ {
+ {
sergei 2015/10/01 15:50:06 This additional scope is not needed here nor in th
Eric 2015/10/08 21:05:40 Yep. I think this is a left-over artifact from dev
+ SentryType sentry(mutex);
+ queue.push_back(t);
+ cv.notify_one();
+ }
+ }
+
+ /**
+ * Insert an element, move constructor version.
+ */
+ void Insert(T&& t)
+ {
+ {
+ SentryType sentry(mutex);
+ queue.push_back(std::move(t));
+ cv.notify_one();
+ }
+ }
+
+ /**
+ * Wake up anything waiting on the condition variable.
+ *
+ * \tparam Function
+ * Functor type for argument
+ * \param f
+ * Functor to execute _inside_ the protection of the queue's mutex
+ */
+ template<typename Function>
+ void Rouse(Function& f)
sergei 2015/10/01 15:50:06 The reason we need such method is to set a value o
Eric 2015/10/08 21:05:40 You get a race condition if you do it that way. Th
+ {
+ SentryType sentry(mutex);
+ f();
+ cv.notify_one();
+ }
+
+ /**
+ * Remove and Test.
+ * Test that the queue is not empty.
+ * If so, remove the next element from the consumer end of the queue and pass it to the argument function
+ *
+ * \param f
+ * If return value is true, a functor to which to pass the element at the consumer end of the queue.
+ * This functor is executed _outside_ the protection of the queue's mutex.
+ * \return
+ * True if and only if an element was removed from the front of the queue.
+ */
+ template<typename Function>
+ bool Remove(Function& f)
sergei 2015/10/01 15:50:06 Why not to call it `pop_front` or `try_pop_front(.
Eric 2015/10/08 21:05:39 As I've said before, this is not any kind of gener
+ {
+ /*
+ * We need to remove an element atomically from the queue,
+ * so we need to construct that element _inside_ the protection of the mutex,
+ * On the other hand, we need to execute the function _outside_ the protection of the queue,
+ * because otherwise performance may suffer.
+ * Hence we use 'Placeholder' so that we can declare a variable to hold the removed element
+ * without requiring it to have a default constructor or assignment operators.
+ */
+ Placeholder<T> x;
sergei 2015/10/01 15:50:06 I see only one reason to use a class `Placeholder`
Eric 2015/10/08 21:05:40 You see only one reason, yet there are other reaso
+ {
+ SentryType sentry(mutex);
+ if (queue.empty())
+ return false;
+ x.Construct(std::move(queue.front())); // only require move-constructibility
+ queue.pop_front();
+ }
+ f(std::move(x.Object()));
+ return true;
+ }
+
+ /**
+ * If a condition is satisfied, wait indefinitely. If not, don't wait.
+ *
+ * \return False, immediately without waiting, if the argument evaluates false.
+ * \return True, only after waiting, if the argument evaluates true.
+ */
+ template<typename Predicate>
+ bool WaitIf(Predicate& p)
sergei 2015/10/01 15:50:06 It's still `MessageQueue` class, how is this metho
Eric 2015/10/08 21:05:39 You get a race condition if you do it that way. Tr
Oleksandr 2016/01/28 10:15:32 I agree with Sergei, this method is very awkward.
Eric 2016/02/04 21:01:42 What's awkward about it? Testing a predicate under
+ {
+ UniqueLockType ul(mutex);
+ if (!p())
+ {
+ return false;
+ }
+ cv.wait(ul);
+ return true;
+ }
+
+ /*
+ * A defect in the compiler for VS2012 requires these definitions.
+ */
+#if defined(_MSC_VER) && _MSC_VER <= 1700
+#define WAIT_FOR_RETURN_TYPE std::cv_status::cv_status
+#else
+#define WAIT_FOR_RETURN_TYPE std::cv_status
+#endif
+
+ /**
+ * Wait for a limited duration.
+ *
+ * Return type was going to be "decltype(cv.wait_for)" for clarity.
+ * A defect in the toolset for VS 2012 causes that declaration not to compile.
+ */
+ template <class R, class P>
+ WAIT_FOR_RETURN_TYPE WaitFor(const std::chrono::duration<R,P>& relativeDuration)
sergei 2015/10/01 15:50:06 Why do we need this method? It seems to be not use
Eric 2015/10/08 21:05:39 It's used in the unit tests. Since you didn't comm
+ {
+ return cv.wait_for(UniqueLockType(mutex), relativeDuration);
+ }
+};
+
+/**
+ * An active queue with a single, internal consumer.
+ *
+ * This class presents the front of a message queue for arbitrary producers.
+ * The back of the message queue is managed by an internal consumer,
+ * which calls the processor for each element passed through the queue.
+ *
+ * The internal thread has the same lifetime as the object.
+ * There's no external interface to pause or kill the thread.
+ * Destroy the object to terminate the thread.
+ * The consumer drains the queue before terminating the thread.
+ *
+ * \tparam T Class of elements in the queue
+ * \tparam F Type of functor to process each element consumed from the queue
+ */
+template<class T, class F>
+class ActiveQueue
sergei 2015/10/01 15:50:07 I would like to get rid of template parameters T a
Eric 2015/10/08 21:05:40 And I would like to keep them. You're suggesting n
+{
+ /**
+ * Signal flag indicating to the consumer thread that it should continue running.
+ *
+ * This flag is true for the entire duration of the object lifetime;
+ * it's set to false only in the destructor.
+ * To avoid race conditions, it's only accessed under mutex protection with calls to notify and wait.
+ * We accomplish this with lambdas passed as the functor arguments of the message queue functions 'Rouse' and 'WaitIf'.
+ * These lambdas are the only references to the variable after construction.
+ */
+ bool running;
sergei 2015/10/01 15:50:07 Hint: if we access it only from the working thread
Eric 2015/10/08 21:05:40 Again: race conditions. And there's no extra prote
+
+ /**
+ * Functor to process each element as it is removed from the queue.
+ */
+ F& processor;
sergei 2015/10/01 15:50:07 In general it's a bad idea to use reference member
Eric 2015/10/08 21:05:40 You did notice that this class contains a thread m
+
+ /**
+ * The queue that the active thread behavior is wrapped around.
+ */
+ MessageQueue<T> queue;
+
+ /**
+ * Thread running the consumer process.
+ *
+ * This thread runs the entire lifetime of this object.
+ * It's started in the constructor and joined in the destructor.
+ *
+ * This member variable is declared last so that it is constructed last,
+ * after all the other member necessary for proper functioning of the consumer.
+ */
+ std::thread thread;
+
+ /**
+ * Main function for the consumer thread.
+ *
+ * Strictly speaking, it's the effective main function for the thread.
+ * The actual main function is a lambda that only calls this function and nothing else.
+ */
+ void Consumer()
+ {
+ while (queue.WaitIf([this]() { return running; }))
+ {
+ // Drain the queue, processing all of its elements
+ while (
sergei 2015/10/01 15:50:06 It means that it will run until the queue is not e
Eric 2015/10/08 21:05:39 Of course it's intended. The whole point here is t
+ queue.Remove(
+ [&](T&& t) {
+ try
+ {
+ processor(std::move(t));
+ }
+ catch (...)
+ {
+ // Ignore any exception 'processor' may throw.
+ }
+ }
+ )
+ )
+ {
+ }
+ // Loop termination: queue.Remove() removed nothing from the queue, so it's now empty
+ }
+ }
+
+public:
+ ActiveQueue(F& f)
+ : running(true), processor(f), queue(), thread([this]() { Consumer(); })
+ {}
+
+ ~ActiveQueue()
+ {
+ /*
+ * The consumer thread is waiting on the condition variable.
+ * If we don't wake it up, our thread won't terminate.
+ *
+ * Note that we don't have a race condition here with new elements arriving
+ * while we are draining the queue for the last time.
+ * We are executing in the destructor, so our member function Insert() is unavailable,
+ * and it's the only way that new element enter the message queue.
+ */
+ queue.Rouse([this](){ running = false; });
+ thread.join();
+ }
+
+ ActiveQueue(const ActiveQueue&); // = delete
+ ActiveQueue(ActiveQueue&&); // = delete
+ ActiveQueue& operator=(const ActiveQueue&); // = delete
+ ActiveQueue& operator=(ActiveQueue&&); // = delete
+
+ /**
+ * Insert
+ */
+ void Insert(const T& t)
+ {
+ queue.Insert(t);
+ }
+
+ /**
+ * Insert
+ */
+ void Insert(T&& t)
+ {
+ queue.Insert(std::move(t));
+ }
+};
« no previous file with comments | « adblockplus.gyp ('k') | src/plugin/Config.h » ('j') | src/plugin/Placeholder.h » ('J')

Powered by Google App Engine
This is Rietveld