Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Toggle main menu visibility
Loading...
Searching...
No Matches
control_task_queue.h
Go to the documentation of this file.
1
/*
2
* Copyright (c) 2020 Roc Streaming authors
3
*
4
* This Source Code Form is subject to the terms of the Mozilla Public
5
* License, v. 2.0. If a copy of the MPL was not distributed with this
6
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7
*/
8
9
//! @file roc_ctl/control_task_queue.h
10
//! @brief Control task queue.
11
12
#ifndef ROC_CTL_CONTROL_TASK_QUEUE_H_
13
#define ROC_CTL_CONTROL_TASK_QUEUE_H_
14
15
#include "
roc_core/atomic.h
"
16
#include "
roc_core/list.h
"
17
#include "
roc_core/mpsc_queue.h
"
18
#include "
roc_core/mutex.h
"
19
#include "
roc_core/thread.h
"
20
#include "
roc_core/time.h
"
21
#include "
roc_core/timer.h
"
22
#include "
roc_ctl/control_task.h
"
23
#include "
roc_ctl/control_task_executor.h
"
24
#include "
roc_ctl/icontrol_task_completer.h
"
25
26
namespace
roc
{
27
namespace
ctl
{
28
29
//! Control task queue.
30
//!
31
//! This class implements a thread-safe task queue, allowing lock-free scheduling
32
//! of tasks for immediate or delayed execution on the background thread, as well
33
//! as lock-free task cancellation and re-scheduling (changing deadline).
34
//!
35
//! It also supports tasks to be paused and resumed. Task resuming is lock-free too.
36
//!
37
//! Note that those operations are lock-free only if core::Timer::try_set_deadline()
38
//! is so, which however is true on modern platforms.
39
//!
40
//! In the current implementation, priority is given to fast scheduling and cancellation
41
//! over the strict observance of the scheduling deadlines. In other words, during
42
//! contention or peak load, scheduling and cancellation will be always fast, but task
43
//! execution may be delayed.
44
//!
45
//! This design was considered acceptable because the actual users of control task queue
46
//! are more sensitive to delays than the tasks they schedule. The task queue is used by
47
//! network and pipeline threads, which should never block and use the task queue to
48
//! schedule low-priority delayed work.
49
//!
50
//! The implementation uses three queues internally:
51
//!
52
//! - ready_queue_ - a lock-free queue of tasks of three kinds:
53
//! - tasks to be resumed after pause (flags_ & FlagResumed != 0)
54
//! - tasks to be executed as soon as possible (renewed_deadline_ == 0)
55
//! - tasks to be re-scheduled with another deadline (renewed_deadline_ > 0)
56
//! - tasks to be canceled (renewed_deadline_ < 0)
57
//!
58
//! - sleeping_queue_ - a sorted queue of tasks with non-zero deadline, scheduled for
59
//! execution in future; the task at the head has the smallest (nearest) deadline;
60
//!
61
//! - pause_queue_ - an unsorted queue to keep track of all currently paused tasks.
62
//!
63
//! task_mutex_ should be acquired to process tasks and/or to access sleeping_queue_
64
//! and pause_queue_, as well as non-atomic task fields.
65
//!
66
//! wakeup_timer_ (core::Timer) is used to set or wait for the next wakeup time of the
67
//! background thread. This time is set to zero when ready_queue_ is non-empty, otherwise
68
//! it is set to the deadline of the first task in sleeping_queue_ if it's non-empty, and
69
//! otherwise is set to infinity (-1). The timer allows to update the deadline
70
//! concurrently from any thread.
71
//!
72
//! When the task is scheduled, re-scheduled, or canceled, there are two ways to
73
//! complete the operation:
74
//!
75
//! - If the event loop thread is sleeping and the task_mutex_ is free, we can acquire
76
//! the mutex and complete the operation in-place by manipulating sleeping_queue_
77
//! under the mutex, without bothering event loop thread. This can be done only if
78
//! we're changing task scheduling and not going to execute it right now.
79
//!
80
//! - Otherwise, we push the task to ready_queue_ (which has lock-free push), set
81
//! the timer wakeup time to zero (to ensure that the event loop thread wont go to
82
//! sleep), and return, leaving the completion of the operarion to the event loop
83
//! thread. The event loop thread will fetch the task from ready_queue_ soon and
84
//! complete the operation by manipulating the sleeping_queue_.
85
//!
86
//! The current task state is defined by its atomic field "state_". Various task queue
87
//! operations move task from one state to another. The move is always performed using
88
//! atomic CAS or exchange to handle concurrent lock-free updates correctly.
89
//!
90
//! There is also "flags_" field that provides additional information about task that is
91
//! preserved across transitions between states; for example that task is being resumed.
92
//!
93
//! Here are some example flows of the task states:
94
//! @code
95
//! schedule():
96
//! StateCompleted -> StateReady
97
//! -> StateProcessing -> StateCompleting -> StateCompleted
98
//!
99
//! schedule_at():
100
//! StateCompleted -> StateReady
101
//! -> StateSleeping
102
//! -> StateProcessing -> StateCompleting -> StateCompleted
103
//!
104
//! resume():
105
//! StateSleeping -> StateReady
106
//! -> StateProcessing -> StateCompleting -> StateCompleted
107
//!
108
//! async_cancel():
109
//! StateSleeping -> StateReady
110
//! -> StateCancelling -> StateCompleting -> StateCompleted
111
//! @endcode
112
//!
113
//! The meaning of the states is the following:
114
//! - StateReady: task is added to the ready queue for execution or renewal,
115
//! or probably is currently being renewed in-place
116
//! - StateSleeping: task renewal is complete and the task was put into the sleeping
117
//! queue to wait its deadline, or to paused queue to wait resume
118
//! - StateCancelling: task renewal is complete and the task is being canceled
119
//! because it was put to ready queue for cancellation
120
//! - StateProcessing: task is being processed after fetching it either from ready
121
//! queue (if it was put there for execution) or sleeping queue
122
//! - StateCompleting: task processing is complete and the task is being completed
123
//! - StateCompleted: task is completed and is not used anywhere; it may be safely
124
//! destroyed or reused; this is also the initial task state
125
class
ControlTaskQueue
:
private
core::Thread
{
126
public
:
127
//! Initialize.
128
//! @remarks
129
//! Starts background thread.
130
ControlTaskQueue
();
131
132
//! Destroy.
133
//! @remarks
134
//! stop_and_wait() should be called before destructor.
135
virtual
~ControlTaskQueue
();
136
137
//! Check if the object was successfully constructed.
138
bool
is_valid
()
const
;
139
140
//! Enqueue a task for asynchronous execution as soon as possible.
141
//!
142
//! This is like schedule_at(), but the deadline is "as soon as possible".
143
void
schedule
(
ControlTask
& task,
144
IControlTaskExecutor
& executor,
145
IControlTaskCompleter
* completer);
146
147
//! Enqueue a task for asynchronous execution at given point of time.
148
//!
149
//! - If the task is already completed, it's scheduled with given deadline.
150
//! - If the task is sleeping and waiting for deadline, it's deadline is updated.
151
//! - If the task is in processing, completion or cancellation phase, it's scheduled
152
//! to be executed again after completion or cancellation finishes.
153
//! - If the task is paused, re-scheduling is postponed until task resumes.
154
//!
155
//! @p deadline should be in the same domain as core::timestamp().
156
//! It can't be negative. Zero deadline means "execute as soon as possible".
157
//!
158
//! The @p executor is used to invoke the task function. It allows to implement
159
//! tasks in different classes. If a class T wants to implement tasks, it should
160
//! inherit ControlTaskExecutor<T>.
161
//!
162
//! If @p completer is present, the task should not be destroyed until completer is
163
//! invoked. The completer is invoked on event loop thread after once and only once,
164
//! after the task completes or is canceled. Completer should never block.
165
//!
166
//! The event loop thread assumes that the task may be destroyed right after it is
167
//! completed and it's completer is called (if it's present), and don't touch task
168
//! after this, unless the user explicitly reschedules the task.
169
void
schedule_at
(
ControlTask
& task,
170
core::nanoseconds_t
deadline,
171
IControlTaskExecutor
& executor,
172
IControlTaskCompleter
* completer);
173
174
//! Resume task if it's paused.
175
//!
176
//! - If the task is paused, schedule it for execution.
177
//! - If the task is being processed right now (i.e. it's executing or will be
178
//! executing very soon), then postpone decision until task execution ends. After
179
//! the task execution, if the task asked to pause, then immediately resume it.
180
//! - Otherwise, do nothing.
181
//!
182
//! If resume is called one or multiple times before task execution, those calls
183
//! are ignored. Only calls made during or after task execution are honored, and
184
//! only if the task execution leaved task in paused state.
185
//!
186
//! Subsequent resume calls between task executions are collapsed into one; even if
187
//! resume was called multiple after task paused and before it's executed again,
188
//! next pause will need a new resume call.
189
void
resume
(
ControlTask
& task);
190
191
//! Try to cancel scheduled task execution, if it's not executed yet.
192
//!
193
//! - If the task is already completed or is being completed or canceled, do nothing.
194
//! - If the task is sleeping or paused, cancel task execution.
195
//! - If the task is being processed right now (i.e. it's executing or will be
196
//! executing very soon), then postpone decision until task execution ends. After
197
//! the task execution, if the task asked to pause or continue, then cancellation
198
//! request is fulfilled and the task is canceled; otherwise cancellation request
199
//! is ignored and the task is completed normally.
200
//!
201
//! When the task is being canceled instead of completed, if it has completer, the
202
//! completer is invoked.
203
void
async_cancel
(
ControlTask
& task);
204
205
//! Wait until the task is completed.
206
//!
207
//! Blocks until the task is completed or canceled.
208
//! Does NOT wait until the task completer is called.
209
//!
210
//! Can not be called concurrently for the same task (will cause crash).
211
//! Can not be called from the task completion handler (will cause deadlock).
212
//!
213
//! If this method is called, the task should not be destroyed until this method
214
//! returns (as well as until the completer is invoked, if it's present).
215
void
wait
(
ControlTask
& task);
216
217
//! Stop thread and wait until it terminates.
218
//!
219
//! All tasks should be completed before calling stop_and_wait().
220
//! stop_and_wait() should be called before calling destructor.
221
void
stop_and_wait
();
222
223
private
:
224
virtual
void
run
();
225
226
void
start_thread_();
227
void
stop_thread_();
228
229
void
setup_task_(
ControlTask
& task,
230
IControlTaskExecutor
& executor,
231
IControlTaskCompleter
* completer);
232
233
void
request_resume_(
ControlTask
& task);
234
void
request_renew_(
ControlTask
& task,
core::nanoseconds_t
deadline);
235
void
request_renew_guarded_(
ControlTask
& task,
core::nanoseconds_t
deadline);
236
237
bool
try_renew_inplace_(
ControlTask
& task,
238
core::nanoseconds_t
deadline,
239
core::seqlock_version_t
version);
240
241
ControlTask::State
242
renew_state_(
ControlTask
& task,
unsigned
task_flags,
core::nanoseconds_t
deadline);
243
bool
renew_scheduling_(
ControlTask
& task,
244
unsigned
task_flags,
245
core::nanoseconds_t
deadline,
246
core::seqlock_version_t
version);
247
248
bool
reschedule_task_(
ControlTask
& task,
249
core::nanoseconds_t
deadline,
250
core::seqlock_version_t
version);
251
void
cancel_task_(
ControlTask
& task,
core::seqlock_version_t
version);
252
253
void
reborn_task_(
ControlTask
& task, ControlTask::State from_state);
254
void
pause_task_(
ControlTask
& task, ControlTask::State from_state);
255
void
256
complete_task_(
ControlTask
& task,
unsigned
task_flags, ControlTask::State from_state);
257
void
wait_task_(
ControlTask
& task);
258
259
void
execute_task_(
ControlTask
& task);
260
261
bool
process_tasks_();
262
263
ControlTask
* fetch_task_();
264
ControlTask
* fetch_ready_task_();
265
ControlTask
* fetch_sleeping_task_();
266
267
void
insert_sleeping_task_(
ControlTask
& task);
268
void
remove_sleeping_task_(
ControlTask
& task);
269
270
core::nanoseconds_t
update_wakeup_timer_();
271
272
bool
started_;
273
core::Atomic<int>
stop_;
274
bool
fetch_ready_;
275
276
core::Atomic<int>
ready_queue_size_;
277
core::MpscQueue<ControlTask, core::NoOwnership>
ready_queue_;
278
core::List<ControlTask, core::NoOwnership>
sleeping_queue_;
279
core::List<ControlTask, core::NoOwnership>
paused_queue_;
280
281
core::Timer
wakeup_timer_;
282
core::Mutex
task_mutex_;
283
};
284
285
}
// namespace ctl
286
}
// namespace roc
287
288
#endif
// ROC_CTL_CONTROL_TASK_QUEUE_H_
atomic.h
Atomic.
roc::core::Atomic
Atomic integer. Provides sequential consistency. For a fine-grained memory order control,...
Definition
atomic.h:26
roc::core::List
Intrusive doubly-linked list.
Definition
list.h:40
roc::core::MpscQueue
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition
mpsc_queue.h:45
roc::core::Mutex
Mutex.
Definition
mutex.h:31
roc::core::Thread
Base class for thread objects.
Definition
thread.h:27
roc::core::Thread::run
virtual void run()=0
Method to be executed in thread.
roc::core::Timer
Thread-safe timer.
Definition
timer.h:25
roc::ctl::ControlTaskQueue::wait
void wait(ControlTask &task)
Wait until the task is completed.
roc::ctl::ControlTaskQueue::schedule
void schedule(ControlTask &task, IControlTaskExecutor &executor, IControlTaskCompleter *completer)
Enqueue a task for asynchronous execution as soon as possible.
roc::ctl::ControlTaskQueue::~ControlTaskQueue
virtual ~ControlTaskQueue()
Destroy.
roc::ctl::ControlTaskQueue::resume
void resume(ControlTask &task)
Resume task if it's paused.
roc::ctl::ControlTaskQueue::async_cancel
void async_cancel(ControlTask &task)
Try to cancel scheduled task execution, if it's not executed yet.
roc::ctl::ControlTaskQueue::is_valid
bool is_valid() const
Check if the object was successfully constructed.
roc::ctl::ControlTaskQueue::ControlTaskQueue
ControlTaskQueue()
Initialize.
roc::ctl::ControlTaskQueue::schedule_at
void schedule_at(ControlTask &task, core::nanoseconds_t deadline, IControlTaskExecutor &executor, IControlTaskCompleter *completer)
Enqueue a task for asynchronous execution at given point of time.
roc::ctl::ControlTaskQueue::stop_and_wait
void stop_and_wait()
Stop thread and wait until it terminates.
roc::ctl::ControlTask
Base class for control tasks.
Definition
control_task.h:53
roc::ctl::IControlTaskCompleter
Control task completion handler.
Definition
icontrol_task_completer.h:21
roc::ctl::IControlTaskExecutor
Control task executor interface.
Definition
control_task_executor.h:22
control_task.h
Control task.
control_task_executor.h
Control task executor.
icontrol_task_completer.h
Control task completion handler.
list.h
Intrusive doubly-linked list.
mpsc_queue.h
Multi-producer single-consumer queue.
mutex.h
Mutex.
roc::core::seqlock_version_t
uint32_t seqlock_version_t
Type for holding seqlock value version. Version is changed each value update. May wrap.
Definition
seqlock_impl.h:23
roc::core::nanoseconds_t
int64_t nanoseconds_t
Nanoseconds.
Definition
time.h:58
roc::ctl
Control tasks event loop.
roc
Root namespace.
thread.h
Thread.
time.h
Time definitions.
timer.h
Thread-safe timer.
roc_ctl
control_task_queue.h
Generated by
1.17.0