Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Unified Diff: src/Scheduler.h

Issue 29367507: Issue #3595 - Add an actual scheduler; use joined threads for file system
Patch Set: Created Dec. 14, 2016, 5:38 p.m.
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: src/Scheduler.h
===================================================================
--- a/src/Scheduler.h
+++ b/src/Scheduler.h
@@ -18,8 +18,13 @@
#if !defined(ADBLOCKPLUS_SCHEDULER_H)
#define ADBLOCKPLUS_SCHEDULER_H
+#include <condition_variable>
+#include <list>
#include <memory>
+#include <mutex>
#include <functional>
+#include <queue>
+#include <thread>
namespace AdblockPlus
{
@@ -59,20 +64,216 @@
{
return HeapFunction<T>(body);
}
+}
- /*
- * Scheduler for the execution of tasks.
- *
- * The present version is nothing more than a rewrite of the legacy behavior,
- * which was to create a new thread for each task.
+/*
+ * Execute a task immediately in detached thread that's used only for this task.
+ *
+ * The present version is nothing more than a rewrite of the legacy behavior,
+ * which was to create a new thread for each task.
+ */
+void StartImmediatelyInSingleUseDetachedThread(std::function<void()> task);
+
+/**
+ * Active object to run arbitrary function objects.
+ *
+ * Operations execute in a thread owned by this instance.
+ * If a user wants to execute an operation in a different thread,
sergei 2017/01/20 13:08:14 We don't need this part "If a user wants to execut
Eric 2017/03/30 17:16:59 Documenting responsibilities is part of what makes
sergei 2017/04/03 15:35:33 Sorry, but I have not seen on the provisioned link
+ * they have the responsibility to set up whatever additional
+ * facilities may be required.
+ */
+class OperationRunner
sergei 2017/01/20 13:08:14 Why not to call it ActiveObject?
Eric 2017/03/30 17:16:59 Because it's not a generic active object. It's tai
sergei 2017/04/03 15:35:33 Maybe it's not a generic active object but it is a
+{
+ /**
+ * Shared mutex for all synchronization
*/
- class Scheduler
+ std::mutex m;
+ typedef std::unique_lock<std::mutex> UniqueLockType;
+ /**
+ * Condition variable notified when a operation is added to its queue
+ */
+ std::condition_variable cv;
+ /**
+ * Operation queue for internal communication
+ */
+ std::queue<std::function<void()>> queue;
sergei 2017/01/20 13:08:14 Basically, it's a synchronized queue which has alr
Eric 2017/03/30 17:16:59 It's not a separate class because the mutex is use
sergei 2017/04/03 15:35:33 Could you please point to that race condition in t
+ /**
+ * The thread for the operation queue
+ */
+ std::thread th;
+ /**
+ * Flag to keep the operation queue running
+ */
+ bool isRunning;
+ /**
+ * Thread main for the operation queue thread.
+ */
+ void ThreadMain();
+
+public:
+ /**
+ * Execution thread starts during construction.
+ */
+ OperationRunner();
+
+ /**
+ * Note: The execution thread must end before the destructor returns.
+ */
+ ~OperationRunner();
+
+ /**
+ * Queue up an operation for execution in the queue's thread.
+ */
+ void Run(std::function<void()> f);
+};
+
+/**
+ * A worker with a single-use-thread.
+ */
+class SingleUseWorker
sergei 2017/01/20 13:08:13 It looks like there was a lot of discussions, fina
Eric 2017/03/30 17:16:59 It doesn't look from my perspective that there wer
sergei 2017/04/03 15:35:32 There were a lot of talks regarding designing a th
+{
+ /**
+ * The underlying thread
+ */
+ std::thread th;
+
+public:
+ /**
+ * Constructor does not start thread; that happens in `Run()`.
+ */
+ SingleUseWorker();
+ ~SingleUseWorker();
+ /**
+ * Run a task
+ *
+ * This class may rely on the restriction
+ * that the user may call this function at most once.
+ */
+ void Run(std::function<void()> f);
+ /**
+ * Synchronize with point of completion of task
+ *
+ * The present class calls `join()` on the thread,
+ * but alternates might synchronize in other ways.
+ */
+ void Join();
+};
+
+/**
+ * A simple scheduler.
+ *
+ * The scheduler keeps track of executing threads
+ * instead of detaching them and keeping no record of what's executing.
+ * The main goal at this stage is to be able to join all running threads.
+ * Since you can't call `join()` on a thread from within that thread,
+ * this class contains its a separate thread to be able to make such calls.
+ * This thread runs a generic operation queue
+ * that does more than simply join zombie threads.
+ *
sergei 2017/03/30 13:14:41 Could you please move that Scheduler with the func
Eric 2017/03/30 14:20:55 You might know what you want, but this description
sergei 2017/04/03 15:35:33 So, what are the difficulties to create a coderevi
+ * The present version has some limitations and inefficiencies,
+ * trading them off for ease of incremental implementation.
+ * - Separate schedulers for each `JsEngine` without any shared facilities.
sergei 2017/01/20 13:08:14 I would remove this string because whether there i
Eric 2017/03/30 17:16:58 Since you've not delved into this code the way I h
sergei 2017/04/03 15:35:33 I think that scheduler should not be so direct mem
+ * - Working threads are single-use. No provision for a thread pool.
sergei 2017/01/20 13:08:13 That's also questionable.
Eric 2017/03/30 17:16:59 It is absolutely not questionable that this class,
sergei 2017/04/03 15:35:33 Why should it have a thread pool? It's similar to
+ * - Tasks are simple function objects. Its possible to hasten termination
+ * in v8 by calling `TerminateExecution()`, for example,
+ * but there's no interface for this.
+ * - No support for delayed execution to support JavaScript `SetTimeout`.
+ */
+template<class Worker>
+class SchedulerT
+{
+ /**
+ * Shared mutex for all synchronization
+ */
+ std::mutex m;
sergei 2017/01/20 13:08:14 it should be rather std::recursive_mutex because i
Eric 2017/03/30 17:16:58 No, it should not be recursive mutex. That would b
sergei 2017/04/03 15:35:33 Acknowledged.
+ typedef std::unique_lock<std::mutex> UniqueLockType;
+
+ /**
+ * A list containing an entry for each running task.
+ */
+ std::list<Worker> taskList;
+
+ /**
+ * Condition variable notified when a running task is removed from its list
+ */
+ std::condition_variable taskRemoveCv;
+
+ /**
+ * Maintains a separate thread that is able to join completed worker threads.
+ */
+ OperationRunner monitor;
+
+ /**
+ * Execution wrapper for a task.
+ * @param task
+ * An iterator to the task within the task list.
+ */
+ void WorkerMain(std::function<void()> body, typename std::list<Worker>::iterator task)
{
- public:
- /*
- * Execute a task immediately in a thread created for this task only
- */
- static void StartImmediatelyInSingleUseThread(std::function<void()> task);
- };
-}
+ body();
+ monitor.Run([this, task]() { JoinAndRemove(task); });
+ }
+
+ /**
+ * Joins a task and then removes it from the task list.
+ * @param task
+ * An iterator to the task within the task list.
+ *
+ * This is the only function that may remove tasks from the running task list.
+ */
+ void JoinAndRemove(typename std::list<Worker>::iterator task)
+ {
+ task->Join();
sergei 2017/04/03 15:35:33 here is a race condition, it can happen that move-
+ {
+ UniqueLockType ul(m);
+ taskList.erase(task);
+ }
+ taskRemoveCv.notify_all();
+ }
+
+public:
+ SchedulerT()
+ {}
+
+ ~SchedulerT()
+ {
+ // `JoinAll()` may be called prior to destruction but, however,
+ // there is no requirement that it be called beforehand.
+ // We call it in the destructor to ensure all tasks have completed.
+ // If we do not, we risk destruction of a joinable thread,
+ // which would generate a call to `std::terminate()`.
+ JoinAll();
+ }
+
+ /**
+ * Construct a `Worker` and run a task in it immediately.
+ *
+ * This is the only function that may add tasks to the running task list.
+ */
+ void Run(std::function<void()> body)
sergei 2017/01/20 13:08:13 It would be easier to read if argument "body" is r
Eric 2017/03/30 17:16:58 I've used "body" consistently for a function that'
sergei 2017/04/03 15:35:33 I still think that another name is better here, bo
+ {
+ UniqueLockType ul(m);
+ // Note that the interface to `Worker` uses a constructor and not a factory
+ // Here we construct in place, avoiding both move and copy.
+ auto tt = taskList.emplace(taskList.begin());
+ tt->Run([this, body, tt]() { WorkerMain(body, tt); });
sergei 2017/01/20 13:08:13 I'm not sure that it's a wise decision to assume t
Eric 2017/03/30 17:16:58 This is not just an assumption. It's a design prin
sergei 2017/04/03 15:35:32 OK, let's say I want to implement timers using Set
+ }
+
+ /**
+ * Block until all running threads have been joined and the task list is empty.
+ *
+ * This class does not take on any responsibility to keep the task list
+ * from growing while this function executes.
+ * Improper use can result in this function blocking forever; don't do that.
+ */
+ void JoinAll()
+ {
+ // The call to `Worker::Join()` is in another function,
+ // namely `JoinAndRemove()`, which also removes items from the task list.
+ // Thus all we do here is to wait for the task list to empty.
+ UniqueLockType ul(m);
+ taskRemoveCv.wait(ul, [this]() -> bool { return taskList.empty(); });
+ }
+};
+
#endif

Powered by Google App Engine
This is Rietveld