Index: src/Scheduler.h |
=================================================================== |
--- a/src/Scheduler.h |
+++ b/src/Scheduler.h |
@@ -18,8 +18,13 @@ |
#if !defined(ADBLOCKPLUS_SCHEDULER_H) |
#define ADBLOCKPLUS_SCHEDULER_H |
+#include <condition_variable> |
+#include <list> |
#include <memory> |
+#include <mutex> |
#include <functional> |
+#include <queue> |
+#include <thread> |
namespace AdblockPlus |
{ |
@@ -59,20 +64,216 @@ |
{ |
return HeapFunction<T>(body); |
} |
+} |
- /* |
- * Scheduler for the execution of tasks. |
- * |
- * The present version is nothing more than a rewrite of the legacy behavior, |
- * which was to create a new thread for each task. |
+/* |
+ * Execute a task immediately in detached thread that's used only for this task. |
+ * |
+ * The present version is nothing more than a rewrite of the legacy behavior, |
+ * which was to create a new thread for each task. |
+ */ |
+void StartImmediatelyInSingleUseDetachedThread(std::function<void()> task); |
+ |
+/** |
+ * Active object to run arbitrary function objects. |
+ * |
+ * Operations execute in a thread owned by this instance. |
+ * If a user wants to execute an operation in a different thread, |
sergei
2017/01/20 13:08:14
We don't need this part "If a user wants to execut
Eric
2017/03/30 17:16:59
Documenting responsibilities is part of what makes
sergei
2017/04/03 15:35:33
Sorry, but I have not seen on the provisioned link
|
+ * they have the responsibility to set up whatever additional |
+ * facilities may be required. |
+ */ |
+class OperationRunner |
sergei
2017/01/20 13:08:14
Why not to call it ActiveObject?
Eric
2017/03/30 17:16:59
Because it's not a generic active object. It's tai
sergei
2017/04/03 15:35:33
Maybe it's not a generic active object but it is a
|
+{ |
+ /** |
+ * Shared mutex for all synchronization |
*/ |
- class Scheduler |
+ std::mutex m; |
+ typedef std::unique_lock<std::mutex> UniqueLockType; |
+ /** |
+ * Condition variable notified when a operation is added to its queue |
+ */ |
+ std::condition_variable cv; |
+ /** |
+ * Operation queue for internal communication |
+ */ |
+ std::queue<std::function<void()>> queue; |
sergei
2017/01/20 13:08:14
Basically, it's a synchronized queue which has alr
Eric
2017/03/30 17:16:59
It's not a separate class because the mutex is use
sergei
2017/04/03 15:35:33
Could you please point to that race condition in t
|
+ /** |
+ * The thread for the operation queue |
+ */ |
+ std::thread th; |
+ /** |
+ * Flag to keep the operation queue running |
+ */ |
+ bool isRunning; |
+ /** |
+ * Thread main for the operation queue thread. |
+ */ |
+ void ThreadMain(); |
+ |
+public: |
+ /** |
+ * Execution thread starts during construction. |
+ */ |
+ OperationRunner(); |
+ |
+ /** |
+ * Note: The execution thread must end before the destructor returns. |
+ */ |
+ ~OperationRunner(); |
+ |
+ /** |
+ * Queue up an operation for execution in the queue's thread. |
+ */ |
+ void Run(std::function<void()> f); |
+}; |
+ |
+/** |
+ * A worker with a single-use-thread. |
+ */ |
+class SingleUseWorker |
sergei
2017/01/20 13:08:13
It looks like there was a lot of discussions, fina
Eric
2017/03/30 17:16:59
It doesn't look from my perspective that there wer
sergei
2017/04/03 15:35:32
There were a lot of talks regarding designing a th
|
+{ |
+ /** |
+ * The underlying thread |
+ */ |
+ std::thread th; |
+ |
+public: |
+ /** |
+ * Constructor does not start thread; that happens in `Run()`. |
+ */ |
+ SingleUseWorker(); |
+ ~SingleUseWorker(); |
+ /** |
+ * Run a task |
+ * |
+ * This class may rely on the restriction |
+ * that the user may call this function at most once. |
+ */ |
+ void Run(std::function<void()> f); |
+ /** |
+ * Synchronize with point of completion of task |
+ * |
+ * The present class calls `join()` on the thread, |
+ * but alternates might synchronize in other ways. |
+ */ |
+ void Join(); |
+}; |
+ |
+/** |
+ * A simple scheduler. |
+ * |
+ * The scheduler keeps track of executing threads |
+ * instead of detaching them and keeping no record of what's executing. |
+ * The main goal at this stage is to be able to join all running threads. |
+ * Since you can't call `join()` on a thread from within that thread, |
+ * this class contains its a separate thread to be able to make such calls. |
+ * This thread runs a generic operation queue |
+ * that does more than simply join zombie threads. |
+ * |
sergei
2017/03/30 13:14:41
Could you please move that Scheduler with the func
Eric
2017/03/30 14:20:55
You might know what you want, but this description
sergei
2017/04/03 15:35:33
So, what are the difficulties to create a coderevi
|
+ * The present version has some limitations and inefficiencies, |
+ * trading them off for ease of incremental implementation. |
+ * - Separate schedulers for each `JsEngine` without any shared facilities. |
sergei
2017/01/20 13:08:14
I would remove this string because whether there i
Eric
2017/03/30 17:16:58
Since you've not delved into this code the way I h
sergei
2017/04/03 15:35:33
I think that scheduler should not be so direct mem
|
+ * - Working threads are single-use. No provision for a thread pool. |
sergei
2017/01/20 13:08:13
That's also questionable.
Eric
2017/03/30 17:16:59
It is absolutely not questionable that this class,
sergei
2017/04/03 15:35:33
Why should it have a thread pool? It's similar to
|
+ * - Tasks are simple function objects. Its possible to hasten termination |
+ * in v8 by calling `TerminateExecution()`, for example, |
+ * but there's no interface for this. |
+ * - No support for delayed execution to support JavaScript `SetTimeout`. |
+ */ |
+template<class Worker> |
+class SchedulerT |
+{ |
+ /** |
+ * Shared mutex for all synchronization |
+ */ |
+ std::mutex m; |
sergei
2017/01/20 13:08:14
it should be rather std::recursive_mutex because i
Eric
2017/03/30 17:16:58
No, it should not be recursive mutex. That would b
sergei
2017/04/03 15:35:33
Acknowledged.
|
+ typedef std::unique_lock<std::mutex> UniqueLockType; |
+ |
+ /** |
+ * A list containing an entry for each running task. |
+ */ |
+ std::list<Worker> taskList; |
+ |
+ /** |
+ * Condition variable notified when a running task is removed from its list |
+ */ |
+ std::condition_variable taskRemoveCv; |
+ |
+ /** |
+ * Maintains a separate thread that is able to join completed worker threads. |
+ */ |
+ OperationRunner monitor; |
+ |
+ /** |
+ * Execution wrapper for a task. |
+ * @param task |
+ * An iterator to the task within the task list. |
+ */ |
+ void WorkerMain(std::function<void()> body, typename std::list<Worker>::iterator task) |
{ |
- public: |
- /* |
- * Execute a task immediately in a thread created for this task only |
- */ |
- static void StartImmediatelyInSingleUseThread(std::function<void()> task); |
- }; |
-} |
+ body(); |
+ monitor.Run([this, task]() { JoinAndRemove(task); }); |
+ } |
+ |
+ /** |
+ * Joins a task and then removes it from the task list. |
+ * @param task |
+ * An iterator to the task within the task list. |
+ * |
+ * This is the only function that may remove tasks from the running task list. |
+ */ |
+ void JoinAndRemove(typename std::list<Worker>::iterator task) |
+ { |
+ task->Join(); |
sergei
2017/04/03 15:35:33
here is a race condition, it can happen that move-
|
+ { |
+ UniqueLockType ul(m); |
+ taskList.erase(task); |
+ } |
+ taskRemoveCv.notify_all(); |
+ } |
+ |
+public: |
+ SchedulerT() |
+ {} |
+ |
+ ~SchedulerT() |
+ { |
+ // `JoinAll()` may be called prior to destruction but, however, |
+ // there is no requirement that it be called beforehand. |
+ // We call it in the destructor to ensure all tasks have completed. |
+ // If we do not, we risk destruction of a joinable thread, |
+ // which would generate a call to `std::terminate()`. |
+ JoinAll(); |
+ } |
+ |
+ /** |
+ * Construct a `Worker` and run a task in it immediately. |
+ * |
+ * This is the only function that may add tasks to the running task list. |
+ */ |
+ void Run(std::function<void()> body) |
sergei
2017/01/20 13:08:13
It would be easier to read if argument "body" is r
Eric
2017/03/30 17:16:58
I've used "body" consistently for a function that'
sergei
2017/04/03 15:35:33
I still think that another name is better here, bo
|
+ { |
+ UniqueLockType ul(m); |
+ // Note that the interface to `Worker` uses a constructor and not a factory |
+ // Here we construct in place, avoiding both move and copy. |
+ auto tt = taskList.emplace(taskList.begin()); |
+ tt->Run([this, body, tt]() { WorkerMain(body, tt); }); |
sergei
2017/01/20 13:08:13
I'm not sure that it's a wise decision to assume t
Eric
2017/03/30 17:16:58
This is not just an assumption. It's a design prin
sergei
2017/04/03 15:35:32
OK, let's say I want to implement timers using Set
|
+ } |
+ |
+ /** |
+ * Block until all running threads have been joined and the task list is empty. |
+ * |
+ * This class does not take on any responsibility to keep the task list |
+ * from growing while this function executes. |
+ * Improper use can result in this function blocking forever; don't do that. |
+ */ |
+ void JoinAll() |
+ { |
+ // The call to `Worker::Join()` is in another function, |
+ // namely `JoinAndRemove()`, which also removes items from the task list. |
+ // Thus all we do here is to wait for the task list to empty. |
+ UniqueLockType ul(m); |
+ taskRemoveCv.wait(ul, [this]() -> bool { return taskList.empty(); }); |
+ } |
+}; |
+ |
#endif |