From 9c22e59416bd76b2a6e817e9e29692a22a3113e1 Mon Sep 17 00:00:00 2001 From: Crozet Sébastien Date: Mon, 5 Oct 2020 16:50:31 +0200 Subject: Add a simple publish-subscribe mechanism. --- src/data/pubsub.rs | 121 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 src/data/pubsub.rs (limited to 'src/data') diff --git a/src/data/pubsub.rs b/src/data/pubsub.rs new file mode 100644 index 0000000..b3b263c --- /dev/null +++ b/src/data/pubsub.rs @@ -0,0 +1,121 @@ +//! Publish-subscribe mechanism for internal events. + +use std::collections::VecDeque; + +/// The position of a subscriber on a pub-sub queue. +#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))] +pub struct PubSubCursor { + // Index of the next message to read. + id: u32, + next: u32, +} + +impl PubSubCursor { + fn id(&self, num_deleted: u32) -> usize { + (self.id - num_deleted) as usize + } + + fn next(&self, num_deleted: u32) -> usize { + (self.next - num_deleted) as usize + } +} + +/// A pub-sub queue. +#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))] +pub struct PubSub { + deleted_messages: u32, + deleted_offsets: u32, + messages: VecDeque, + offsets: VecDeque, +} + +impl PubSub { + /// Create a new empty pub-sub queue. + pub fn new() -> Self { + Self { + deleted_offsets: 0, + deleted_messages: 0, + messages: VecDeque::new(), + offsets: VecDeque::new(), + } + } + + /// Publish a new message. + pub fn publish(&mut self, message: T) { + if self.offsets.is_empty() { + // No subscribers, drop the message. + return; + } + + self.messages.push_back(message); + } + + /// Subscribe to the queue. + /// + /// A subscription cannot be cancelled. + pub fn subscribe(&mut self) -> PubSubCursor { + let cursor = PubSubCursor { + next: self.messages.len() as u32 + self.deleted_messages, + id: self.offsets.len() as u32 + self.deleted_offsets, + }; + + self.offsets.push_back(cursor.next); + cursor + } + + /// Get the messages not yet read by the given subscriber. + pub fn read(&self, cursor: &PubSubCursor) -> impl Iterator { + let next = cursor.next(self.deleted_messages); + + // TODO: use self.queue.range(next..) once it is stabilised. + MessageRange { + queue: &self.messages, + next, + } + } + + /// Makes the given subscribe acknowledge all the messages in the queue. + /// + /// A subscriber cannot read acknowledged messages any more. + pub fn ack(&mut self, cursor: &mut PubSubCursor) { + // Update the cursor. + cursor.next = self.messages.len() as u32 + self.deleted_messages; + self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX; + cursor.id = self.offsets.len() as u32 + self.deleted_offsets; + self.offsets.push_back(cursor.next); + + // Now clear the messages we don't need to + // maintain in memory anymore. + while self.offsets.front() == Some(&u32::MAX) { + self.offsets.pop_front(); + self.deleted_offsets += 1; + } + + // There must be at least one offset otherwise + // that would mean we have no subscribers. + let next = self.offsets.front().unwrap(); + let num_to_delete = *next - self.deleted_messages; + + for _ in 0..num_to_delete { + self.messages.pop_front(); + } + + self.deleted_messages += num_to_delete; + } +} + +struct MessageRange<'a, T> { + queue: &'a VecDeque, + next: usize, +} + +impl<'a, T> Iterator for MessageRange<'a, T> { + type Item = &'a T; + + #[inline(always)] + fn next(&mut self) -> Option<&'a T> { + let result = self.queue.get(self.next); + self.next += 1; + result + } +} -- cgit From 93aa7b6e1e8cbfd73542ed10ad5c26ae0a8b9848 Mon Sep 17 00:00:00 2001 From: Crozet Sébastien Date: Mon, 5 Oct 2020 19:04:18 +0200 Subject: Use the publish-subscribe mechanism to handle collider removals across pipelines. --- src/data/mod.rs | 1 + src/data/pubsub.rs | 19 ++++++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) (limited to 'src/data') diff --git a/src/data/mod.rs b/src/data/mod.rs index 7c00442..5d3efa6 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -2,3 +2,4 @@ pub mod arena; pub(crate) mod graph; +pub mod pubsub; diff --git a/src/data/pubsub.rs b/src/data/pubsub.rs index b3b263c..1ac7498 100644 --- a/src/data/pubsub.rs +++ b/src/data/pubsub.rs @@ -1,16 +1,18 @@ //! Publish-subscribe mechanism for internal events. +use serde::export::PhantomData; use std::collections::VecDeque; /// The position of a subscriber on a pub-sub queue. #[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))] -pub struct PubSubCursor { +pub struct PubSubCursor { // Index of the next message to read. id: u32, next: u32, + _phantom: PhantomData, } -impl PubSubCursor { +impl PubSubCursor { fn id(&self, num_deleted: u32) -> usize { (self.id - num_deleted) as usize } @@ -53,18 +55,25 @@ impl PubSub { /// Subscribe to the queue. /// /// A subscription cannot be cancelled. - pub fn subscribe(&mut self) -> PubSubCursor { + pub fn subscribe(&mut self) -> PubSubCursor { let cursor = PubSubCursor { next: self.messages.len() as u32 + self.deleted_messages, id: self.offsets.len() as u32 + self.deleted_offsets, + _phantom: PhantomData, }; self.offsets.push_back(cursor.next); cursor } + /// Read the i-th message not yet read by the given subsciber. + pub fn read_ith(&self, cursor: &PubSubCursor, i: usize) -> Option<&T> { + self.messages + .get(cursor.next(self.deleted_messages) as usize + i) + } + /// Get the messages not yet read by the given subscriber. - pub fn read(&self, cursor: &PubSubCursor) -> impl Iterator { + pub fn read(&self, cursor: &PubSubCursor) -> impl Iterator { let next = cursor.next(self.deleted_messages); // TODO: use self.queue.range(next..) once it is stabilised. @@ -77,7 +86,7 @@ impl PubSub { /// Makes the given subscribe acknowledge all the messages in the queue. /// /// A subscriber cannot read acknowledged messages any more. - pub fn ack(&mut self, cursor: &mut PubSubCursor) { + pub fn ack(&mut self, cursor: &mut PubSubCursor) { // Update the cursor. cursor.next = self.messages.len() as u32 + self.deleted_messages; self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX; -- cgit From 682ff61f94931ef205a9f81e7d00417ac88537c1 Mon Sep 17 00:00:00 2001 From: Crozet Sébastien Date: Tue, 6 Oct 2020 15:23:48 +0200 Subject: Don't let the PubSub internal offsets overflow + fix some warnings. --- src/data/pubsub.rs | 67 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 56 insertions(+), 11 deletions(-) (limited to 'src/data') diff --git a/src/data/pubsub.rs b/src/data/pubsub.rs index 1ac7498..0a3432a 100644 --- a/src/data/pubsub.rs +++ b/src/data/pubsub.rs @@ -3,16 +3,28 @@ use serde::export::PhantomData; use std::collections::VecDeque; -/// The position of a subscriber on a pub-sub queue. +/// A permanent subscription to a pub-sub queue. #[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))] -pub struct PubSubCursor { - // Index of the next message to read. +pub struct Subscription { + // Position on the cursor array. id: u32, - next: u32, _phantom: PhantomData, } -impl PubSubCursor { +#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))] +struct PubSubCursor { + // Position on the offset array. + id: u32, + // Index of the next message to read. + // NOTE: Having this here is not actually necessary because + // this value is supposed to be equal to `offsets[self.id]`. + // However, we keep it because it lets us avoid one lookup + // on the `offsets` array inside of message-polling loops + // based on `read_ith`. + next: u32, +} + +impl PubSubCursor { fn id(&self, num_deleted: u32) -> usize { (self.id - num_deleted) as usize } @@ -29,6 +41,7 @@ pub struct PubSub { deleted_offsets: u32, messages: VecDeque, offsets: VecDeque, + cursors: Vec, } impl PubSub { @@ -39,7 +52,22 @@ impl PubSub { deleted_messages: 0, messages: VecDeque::new(), offsets: VecDeque::new(), + cursors: Vec::new(), + } + } + + fn reset_shifts(&mut self) { + for offset in &mut self.offsets { + *offset -= self.deleted_messages; } + + for cursor in &mut self.cursors { + cursor.id -= self.deleted_offsets; + cursor.next -= self.deleted_messages; + } + + self.deleted_offsets = 0; + self.deleted_messages = 0; } /// Publish a new message. @@ -55,25 +83,33 @@ impl PubSub { /// Subscribe to the queue. /// /// A subscription cannot be cancelled. - pub fn subscribe(&mut self) -> PubSubCursor { + #[must_use] + pub fn subscribe(&mut self) -> Subscription { let cursor = PubSubCursor { next: self.messages.len() as u32 + self.deleted_messages, id: self.offsets.len() as u32 + self.deleted_offsets, + }; + + let subscription = Subscription { + id: self.cursors.len() as u32, _phantom: PhantomData, }; self.offsets.push_back(cursor.next); - cursor + self.cursors.push(cursor); + subscription } /// Read the i-th message not yet read by the given subsciber. - pub fn read_ith(&self, cursor: &PubSubCursor, i: usize) -> Option<&T> { + pub fn read_ith(&self, sub: &Subscription, i: usize) -> Option<&T> { + let cursor = &self.cursors[sub.id as usize]; self.messages .get(cursor.next(self.deleted_messages) as usize + i) } /// Get the messages not yet read by the given subscriber. - pub fn read(&self, cursor: &PubSubCursor) -> impl Iterator { + pub fn read(&self, sub: &Subscription) -> impl Iterator { + let cursor = &self.cursors[sub.id as usize]; let next = cursor.next(self.deleted_messages); // TODO: use self.queue.range(next..) once it is stabilised. @@ -86,11 +122,14 @@ impl PubSub { /// Makes the given subscribe acknowledge all the messages in the queue. /// /// A subscriber cannot read acknowledged messages any more. - pub fn ack(&mut self, cursor: &mut PubSubCursor) { + pub fn ack(&mut self, sub: &Subscription) { // Update the cursor. - cursor.next = self.messages.len() as u32 + self.deleted_messages; + let cursor = &mut self.cursors[sub.id as usize]; + self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX; cursor.id = self.offsets.len() as u32 + self.deleted_offsets; + + cursor.next = self.messages.len() as u32 + self.deleted_messages; self.offsets.push_back(cursor.next); // Now clear the messages we don't need to @@ -110,6 +149,12 @@ impl PubSub { } self.deleted_messages += num_to_delete; + + if self.deleted_messages > u32::MAX / 2 || self.deleted_offsets > u32::MAX / 2 { + // Don't let the deleted_* shifts grow indefinitely otherwise + // they will end up overflowing, breaking everything. + self.reset_shifts(); + } } } -- cgit From 8c388687935b0f09a6a03b9de96fc80d15dd67c9 Mon Sep 17 00:00:00 2001 From: Crozet Sébastien Date: Tue, 6 Oct 2020 16:02:15 +0200 Subject: Fix bogus PhantomData import. --- src/data/pubsub.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/data') diff --git a/src/data/pubsub.rs b/src/data/pubsub.rs index 0a3432a..b2c9e27 100644 --- a/src/data/pubsub.rs +++ b/src/data/pubsub.rs @@ -1,7 +1,7 @@ //! Publish-subscribe mechanism for internal events. -use serde::export::PhantomData; use std::collections::VecDeque; +use std::marker::PhantomData; /// A permanent subscription to a pub-sub queue. #[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))] -- cgit