Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Toggle main menu visibility
Loading...
Searching...
No Matches
mpsc_queue.h
Go to the documentation of this file.
1
/*
2
* Copyright (c) 2020 Roc Streaming authors
3
*
4
* This Source Code Form is subject to the terms of the Mozilla Public
5
* License, v. 2.0. If a copy of the MPL was not distributed with this
6
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7
*/
8
9
//! @file roc_core/mpsc_queue.h
10
//! @brief Multi-producer single-consumer queue.
11
12
#ifndef ROC_CORE_MPSC_QUEUE_H_
13
#define ROC_CORE_MPSC_QUEUE_H_
14
15
#include "roc_core/atomic_ops.h"
16
#include "
roc_core/mpsc_queue_impl.h
"
17
#include "
roc_core/mpsc_queue_node.h
"
18
#include "
roc_core/noncopyable.h
"
19
#include "
roc_core/ownership_policy.h
"
20
#include "
roc_core/panic.h
"
21
22
namespace
roc
{
23
namespace
core
{
24
25
//! Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
26
//!
27
//! Provides sequential consistency.
28
//!
29
//! Based on Dmitry Vyukov algorithm:
30
//! - http://tiny.cc/3d3moz
31
//! - https://int08h.com/post/ode-to-a-vyukov-queue/
32
//! - https://github.com/samanbarghi/MPSCQ
33
//!
34
//! @tparam T defines object type, it must inherit MpscQueueNode.
35
//!
36
//! @tparam OwnershipPolicy defines ownership policy which is used to acquire an
37
//! element ownership when it's added to the queue and release ownership when it's
38
//! removed from the queue.
39
//!
40
//! @tparam Node defines base class of queue nodes. It is needed if MpscQueueNode
41
//! is used with non-default tag.
42
template
<
class
T,
43
template
<
class
TT>
class
OwnershipPolicy =
RefCountedOwnership
,
44
class
Node =
MpscQueueNode<>
>
45
class
MpscQueue
:
public
NonCopyable<> {
46
public
:
47
//! Pointer type.
48
//! @remarks
49
//! either raw or smart pointer depending on the ownership policy.
50
typedef
typename
OwnershipPolicy<T>::Pointer
Pointer
;
51
52
~MpscQueue
() {
53
// release ownership of all objects
54
while
(
pop_front_exclusive
()) {
55
}
56
}
57
58
//! Add object to the end of the queue.
59
//! Can be called concurrently.
60
//! Acquires ownership of @p elem.
61
//! After this call returns, any thread calling pop_front_exclusive() or
62
//! try_pop_front_exclusive() is guaranteed to see a non-empty queue. But note
63
//! that the latter can still fail if called concurrently with push_back().
64
//! @note
65
//! - On CPUs with atomic exchange, e.g. x86, this operation is both lock-free
66
//! and wait-free, i.e. it never waits for sleeping threads and never spins.
67
//! - On CPUs without atomic exchange, e.g. arm64, this operation is lock-free,
68
//! but not wait-free, i.e. it never waits for sleeping threads, but with a low
69
//! probability can spin while there are concurrent non-sleeping push_back()
70
//! calls (because of the spin loop in the implementation of atomic exchange).
71
//! - Concurrent try_pop_front() and pop_front() does not affect this operation.
72
//! Only concurrent push_back() calls can make it spin.
73
void
push_back
(T& elem) {
74
OwnershipPolicy<T>::acquire(elem);
75
76
MpscQueueData
* data = to_node_data_(elem);
77
impl_.push_back(data);
78
}
79
80
//! Try to remove object from the beginning of the queue (non-blocking version).
81
//! Should NOT be called concurrently.
82
//! Releases ownership of the returned object.
83
//! @remarks
84
//! - Returns NULL if the queue is empty.
85
//! - May return NULL even if the queue is actually non-empty, in particular if
86
//! concurrent push_back() call is running, or if the push_back() results were
87
//! not fully published yet.
88
//! @note
89
//! - This operation is both lock-free and wait-free on all architectures, i.e. it
90
//! never waits for sleeping threads and never spins indefinitely.
91
Pointer
try_pop_front_exclusive
() {
92
MpscQueueData
* data = impl_.pop_front(
false
);
93
if
(!data) {
94
return
NULL;
95
}
96
97
Pointer
elem = from_node_data_(data);
98
OwnershipPolicy<T>::release(*elem);
99
100
return
elem;
101
}
102
103
//! Remove object from the beginning of the queue (blocking version).
104
//! Should NOT be called concurrently.
105
//! Releases ownership of the returned object.
106
//! @remarks
107
//! - Returns NULL if the queue is empty.
108
//! - May spin while a concurrent push_back() call is running.
109
//! @remarks
110
//! - This operation is NOT lock-free (or wait-free). It may spin until all
111
//! concurrent push_back() calls are finished.
112
//! - On the "fast-path", however, this operation does not wait for any
113
//! threads and just performs a few atomic reads and writes.
114
Pointer
pop_front_exclusive
() {
115
MpscQueueData
* data = impl_.pop_front(
true
);
116
if
(!data) {
117
return
NULL;
118
}
119
120
Pointer
elem = from_node_data_(data);
121
OwnershipPolicy<T>::release(*elem);
122
123
return
elem;
124
}
125
126
private
:
127
static
MpscQueueData
* to_node_data_(T& elem) {
128
return
static_cast<
Node&
>
(elem).mpsc_queue_data();
129
}
130
131
static
T* from_node_data_(MpscQueueData* data) {
132
return
static_cast<
T*
>
(
static_cast<
Node*
>
(Node::mpsc_queue_node(data)));
133
}
134
135
MpscQueueImpl impl_;
136
};
137
138
}
// namespace core
139
}
// namespace roc
140
141
#endif
// ROC_CORE_MPSC_QUEUE_H_
roc::core::MpscQueueNode
Base class for MpscQueue element.
Definition
mpsc_queue_node.h:43
roc::core::MpscQueue
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition
mpsc_queue.h:45
roc::core::MpscQueue::Pointer
OwnershipPolicy< T >::Pointer Pointer
Pointer type.
Definition
mpsc_queue.h:50
roc::core::MpscQueue::try_pop_front_exclusive
Pointer try_pop_front_exclusive()
Try to remove object from the beginning of the queue (non-blocking version). Should NOT be called con...
Definition
mpsc_queue.h:91
roc::core::MpscQueue::pop_front_exclusive
Pointer pop_front_exclusive()
Remove object from the beginning of the queue (blocking version). Should NOT be called concurrently....
Definition
mpsc_queue.h:114
roc::core::MpscQueue::push_back
void push_back(T &elem)
Add object to the end of the queue. Can be called concurrently. Acquires ownership of elem....
Definition
mpsc_queue.h:73
mpsc_queue_impl.h
Multi-producer single-consumer queue internal implementation.
mpsc_queue_node.h
MpscQueue node.
roc::core
General-purpose building blocks and platform abstraction layer.
roc
Root namespace.
noncopyable.h
Non-copyable object.
ownership_policy.h
Ownership policies.
panic.h
Panic.
roc::core::MpscQueueData
MpscQueue node internal data.
Definition
mpsc_queue_node.h:24
roc::core::RefCountedOwnership
Reference counted object ownership.
Definition
ownership_policy.h:21
roc_core
mpsc_queue.h
Generated by
1.17.0