diff options
| author | Crozet Sébastien <developer@crozet.re> | 2020-10-05 19:04:18 +0200 |
|---|---|---|
| committer | Crozet Sébastien <developer@crozet.re> | 2020-10-05 19:04:18 +0200 |
| commit | 93aa7b6e1e8cbfd73542ed10ad5c26ae0a8b9848 (patch) | |
| tree | 5d602450c5b5e1c0c08eeffd3196b373b4312a08 /src/data | |
| parent | 2d0a888484dd296cc785caf978252dd97b58e10a (diff) | |
| download | rapier-93aa7b6e1e8cbfd73542ed10ad5c26ae0a8b9848.tar.gz rapier-93aa7b6e1e8cbfd73542ed10ad5c26ae0a8b9848.tar.bz2 rapier-93aa7b6e1e8cbfd73542ed10ad5c26ae0a8b9848.zip | |
Use the publish-subscribe mechanism to handle collider removals across pipelines.
Diffstat (limited to 'src/data')
| -rw-r--r-- | src/data/mod.rs | 1 | ||||
| -rw-r--r-- | src/data/pubsub.rs | 19 |
2 files changed, 15 insertions, 5 deletions
diff --git a/src/data/mod.rs b/src/data/mod.rs index 7c00442..5d3efa6 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -2,3 +2,4 @@ pub mod arena; pub(crate) mod graph; +pub mod pubsub; diff --git a/src/data/pubsub.rs b/src/data/pubsub.rs index b3b263c..1ac7498 100644 --- a/src/data/pubsub.rs +++ b/src/data/pubsub.rs @@ -1,16 +1,18 @@ //! Publish-subscribe mechanism for internal events. +use serde::export::PhantomData; use std::collections::VecDeque; /// The position of a subscriber on a pub-sub queue. #[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))] -pub struct PubSubCursor { +pub struct PubSubCursor<T> { // Index of the next message to read. id: u32, next: u32, + _phantom: PhantomData<T>, } -impl PubSubCursor { +impl<T> PubSubCursor<T> { fn id(&self, num_deleted: u32) -> usize { (self.id - num_deleted) as usize } @@ -53,18 +55,25 @@ impl<T> PubSub<T> { /// Subscribe to the queue. /// /// A subscription cannot be cancelled. - pub fn subscribe(&mut self) -> PubSubCursor { + pub fn subscribe(&mut self) -> PubSubCursor<T> { let cursor = PubSubCursor { next: self.messages.len() as u32 + self.deleted_messages, id: self.offsets.len() as u32 + self.deleted_offsets, + _phantom: PhantomData, }; self.offsets.push_back(cursor.next); cursor } + /// Read the i-th message not yet read by the given subsciber. + pub fn read_ith(&self, cursor: &PubSubCursor<T>, i: usize) -> Option<&T> { + self.messages + .get(cursor.next(self.deleted_messages) as usize + i) + } + /// Get the messages not yet read by the given subscriber. - pub fn read(&self, cursor: &PubSubCursor) -> impl Iterator<Item = &T> { + pub fn read(&self, cursor: &PubSubCursor<T>) -> impl Iterator<Item = &T> { let next = cursor.next(self.deleted_messages); // TODO: use self.queue.range(next..) once it is stabilised. @@ -77,7 +86,7 @@ impl<T> PubSub<T> { /// Makes the given subscribe acknowledge all the messages in the queue. /// /// A subscriber cannot read acknowledged messages any more. - pub fn ack(&mut self, cursor: &mut PubSubCursor) { + pub fn ack(&mut self, cursor: &mut PubSubCursor<T>) { // Update the cursor. cursor.next = self.messages.len() as u32 + self.deleted_messages; self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX; |
