Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Unified Diff: src/Scheduler.cpp

Issue 29367507: Issue #3595 - Add an actual scheduler; use joined threads for file system
Patch Set: Created Dec. 14, 2016, 5:38 p.m.
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: src/Scheduler.cpp
===================================================================
--- a/src/Scheduler.cpp
+++ b/src/Scheduler.cpp
@@ -27,7 +27,7 @@
}
}
-void Scheduler::StartImmediatelyInSingleUseThread(std::function<void()> task)
+void StartImmediatelyInSingleUseDetachedThread(std::function<void()> task)
{
if (!task)
{
@@ -36,3 +36,82 @@
auto th = std::thread(SingleUseThreadMain, task);
th.detach();
}
+
+OperationRunner::OperationRunner()
+ : isRunning(true)
+{
+ th = std::thread([this]() { ThreadMain(); });
+ /*
+ * Note that we don't need to wait for our thread to start running
+ * before we consider the object fully constructed.
+ * Because of how the `wait()` statement in `ThreadMain` is written,
+ * any operation arriving before the thread begins will be
+ * processed immediately, without blocking on a notification.
+ */
sergei 2017/01/20 13:08:13 That comment is good however I don't think we need
Eric 2017/03/30 17:16:58 Behavior is always clearer *after* you read the co
sergei 2017/04/03 15:35:32 I'm talking about implementation without race cond
+}
+
+OperationRunner::~OperationRunner()
+{
+ // Shut down the operation queue
+ Run([this]() { isRunning = false; });
+ th.join();
+}
+
+void OperationRunner::Run(std::function<void()> f)
+{
+ UniqueLockType ul(m);
+ queue.push(f);
+ cv.notify_one();
+}
+
+void OperationRunner::ThreadMain()
+{
+ while (true)
+ {
+ std::function<void()> op;
+ {
+ UniqueLockType ul(m);
+ // Checking the flag here avoids needing a second lock object
+ if (!isRunning)
sergei 2017/01/20 13:08:13 Why not to put isRunning into while condition? we
Eric 2017/03/30 17:16:58 Because it's not correct. It opens up an opportuni
sergei 2017/04/03 15:35:31 Could you please explain when that race condition
+ {
+ break;
+ }
+ cv.wait(ul, [this]() -> bool { return !queue.empty(); });
+ op = queue.front();
+ queue.pop();
+ }
+ // Assert `m` is unlocked
sergei 2017/01/20 13:08:13 I would remove that comment.
Eric 2017/03/30 17:16:58 Nope; it stays. Tracking when the mutex is locked
+ try
+ {
+ if (op)
sergei 2017/04/03 15:35:31 Why not to prevent putting of empty functions in t
+ {
+ op();
+ }
+ }
+ catch (std::exception& e)
sergei 2017/04/03 15:35:32 it generates a warning about unused local variable
+ {
+ // suppress
+ }
+ }
+}
+
+SingleUseWorker::SingleUseWorker()
+{}
+
+SingleUseWorker::~SingleUseWorker()
+{
+ if (th.joinable())
sergei 2017/01/20 13:08:13 This implementation with if (th.joinable()) should
Eric 2017/03/30 17:16:58 Yes, you need to be able to join the thread either
+ {
+ th.join();
+ }
+}
+
+void SingleUseWorker::Run(std::function<void()> f)
+{
+ th = std::move(std::thread(f));
sergei 2017/01/20 13:08:13 Do we really need to explicitly say std::move here
Eric 2017/03/30 17:16:58 It avoids a second copy, since there's already one
sergei 2017/04/03 15:35:32 Copy of what? JIC, std::thread cannot be copied.
+}
+
+void SingleUseWorker::Join()
+{
+ th.join();
+}

Powered by Google App Engine
This is Rietveld