diff options
| author | Crozet Sébastien <developer@crozet.re> | 2020-10-05 16:50:31 +0200 |
|---|---|---|
| committer | Crozet Sébastien <developer@crozet.re> | 2020-10-05 16:50:31 +0200 |
| commit | 9c22e59416bd76b2a6e817e9e29692a22a3113e1 (patch) | |
| tree | 14e247e118bf98872e8b0ecf5974c6f4870ef4de /src/data | |
| parent | c031f96ac548645932c5605bfc17869618e9212b (diff) | |
| download | rapier-9c22e59416bd76b2a6e817e9e29692a22a3113e1.tar.gz rapier-9c22e59416bd76b2a6e817e9e29692a22a3113e1.tar.bz2 rapier-9c22e59416bd76b2a6e817e9e29692a22a3113e1.zip | |
Add a simple publish-subscribe mechanism.
Diffstat (limited to 'src/data')
| -rw-r--r-- | src/data/pubsub.rs | 121 |
1 files changed, 121 insertions, 0 deletions
diff --git a/src/data/pubsub.rs b/src/data/pubsub.rs new file mode 100644 index 0000000..b3b263c --- /dev/null +++ b/src/data/pubsub.rs @@ -0,0 +1,121 @@ +//! Publish-subscribe mechanism for internal events. + +use std::collections::VecDeque; + +/// The position of a subscriber on a pub-sub queue. +#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))] +pub struct PubSubCursor { + // Index of the next message to read. + id: u32, + next: u32, +} + +impl PubSubCursor { + fn id(&self, num_deleted: u32) -> usize { + (self.id - num_deleted) as usize + } + + fn next(&self, num_deleted: u32) -> usize { + (self.next - num_deleted) as usize + } +} + +/// A pub-sub queue. +#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))] +pub struct PubSub<T> { + deleted_messages: u32, + deleted_offsets: u32, + messages: VecDeque<T>, + offsets: VecDeque<u32>, +} + +impl<T> PubSub<T> { + /// Create a new empty pub-sub queue. + pub fn new() -> Self { + Self { + deleted_offsets: 0, + deleted_messages: 0, + messages: VecDeque::new(), + offsets: VecDeque::new(), + } + } + + /// Publish a new message. + pub fn publish(&mut self, message: T) { + if self.offsets.is_empty() { + // No subscribers, drop the message. + return; + } + + self.messages.push_back(message); + } + + /// Subscribe to the queue. + /// + /// A subscription cannot be cancelled. + pub fn subscribe(&mut self) -> PubSubCursor { + let cursor = PubSubCursor { + next: self.messages.len() as u32 + self.deleted_messages, + id: self.offsets.len() as u32 + self.deleted_offsets, + }; + + self.offsets.push_back(cursor.next); + cursor + } + + /// Get the messages not yet read by the given subscriber. + pub fn read(&self, cursor: &PubSubCursor) -> impl Iterator<Item = &T> { + let next = cursor.next(self.deleted_messages); + + // TODO: use self.queue.range(next..) once it is stabilised. + MessageRange { + queue: &self.messages, + next, + } + } + + /// Makes the given subscribe acknowledge all the messages in the queue. + /// + /// A subscriber cannot read acknowledged messages any more. + pub fn ack(&mut self, cursor: &mut PubSubCursor) { + // Update the cursor. + cursor.next = self.messages.len() as u32 + self.deleted_messages; + self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX; + cursor.id = self.offsets.len() as u32 + self.deleted_offsets; + self.offsets.push_back(cursor.next); + + // Now clear the messages we don't need to + // maintain in memory anymore. + while self.offsets.front() == Some(&u32::MAX) { + self.offsets.pop_front(); + self.deleted_offsets += 1; + } + + // There must be at least one offset otherwise + // that would mean we have no subscribers. + let next = self.offsets.front().unwrap(); + let num_to_delete = *next - self.deleted_messages; + + for _ in 0..num_to_delete { + self.messages.pop_front(); + } + + self.deleted_messages += num_to_delete; + } +} + +struct MessageRange<'a, T> { + queue: &'a VecDeque<T>, + next: usize, +} + +impl<'a, T> Iterator for MessageRange<'a, T> { + type Item = &'a T; + + #[inline(always)] + fn next(&mut self) -> Option<&'a T> { + let result = self.queue.get(self.next); + self.next += 1; + result + } +} |
