1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
//! Publish-subscribe mechanism for internal events.
use std::collections::VecDeque;
/// The position of a subscriber on a pub-sub queue.
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
pub struct PubSubCursor {
// Index of the next message to read.
id: u32,
next: u32,
}
impl PubSubCursor {
fn id(&self, num_deleted: u32) -> usize {
(self.id - num_deleted) as usize
}
fn next(&self, num_deleted: u32) -> usize {
(self.next - num_deleted) as usize
}
}
/// A pub-sub queue.
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
pub struct PubSub<T> {
deleted_messages: u32,
deleted_offsets: u32,
messages: VecDeque<T>,
offsets: VecDeque<u32>,
}
impl<T> PubSub<T> {
/// Create a new empty pub-sub queue.
pub fn new() -> Self {
Self {
deleted_offsets: 0,
deleted_messages: 0,
messages: VecDeque::new(),
offsets: VecDeque::new(),
}
}
/// Publish a new message.
pub fn publish(&mut self, message: T) {
if self.offsets.is_empty() {
// No subscribers, drop the message.
return;
}
self.messages.push_back(message);
}
/// Subscribe to the queue.
///
/// A subscription cannot be cancelled.
pub fn subscribe(&mut self) -> PubSubCursor {
let cursor = PubSubCursor {
next: self.messages.len() as u32 + self.deleted_messages,
id: self.offsets.len() as u32 + self.deleted_offsets,
};
self.offsets.push_back(cursor.next);
cursor
}
/// Get the messages not yet read by the given subscriber.
pub fn read(&self, cursor: &PubSubCursor) -> impl Iterator<Item = &T> {
let next = cursor.next(self.deleted_messages);
// TODO: use self.queue.range(next..) once it is stabilised.
MessageRange {
queue: &self.messages,
next,
}
}
/// Makes the given subscribe acknowledge all the messages in the queue.
///
/// A subscriber cannot read acknowledged messages any more.
pub fn ack(&mut self, cursor: &mut PubSubCursor) {
// Update the cursor.
cursor.next = self.messages.len() as u32 + self.deleted_messages;
self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX;
cursor.id = self.offsets.len() as u32 + self.deleted_offsets;
self.offsets.push_back(cursor.next);
// Now clear the messages we don't need to
// maintain in memory anymore.
while self.offsets.front() == Some(&u32::MAX) {
self.offsets.pop_front();
self.deleted_offsets += 1;
}
// There must be at least one offset otherwise
// that would mean we have no subscribers.
let next = self.offsets.front().unwrap();
let num_to_delete = *next - self.deleted_messages;
for _ in 0..num_to_delete {
self.messages.pop_front();
}
self.deleted_messages += num_to_delete;
}
}
struct MessageRange<'a, T> {
queue: &'a VecDeque<T>,
next: usize,
}
impl<'a, T> Iterator for MessageRange<'a, T> {
type Item = &'a T;
#[inline(always)]
fn next(&mut self) -> Option<&'a T> {
let result = self.queue.get(self.next);
self.next += 1;
result
}
}
|