diff options
| author | Crozet Sébastien <developer@crozet.re> | 2020-10-06 15:23:48 +0200 |
|---|---|---|
| committer | Crozet Sébastien <developer@crozet.re> | 2020-10-06 15:23:48 +0200 |
| commit | 682ff61f94931ef205a9f81e7d00417ac88537c1 (patch) | |
| tree | 405e0f7df3858de7387467f963088fce3244ba32 /src/data | |
| parent | 7c92848383b9f76bde010f00683a44453b2b456a (diff) | |
| download | rapier-682ff61f94931ef205a9f81e7d00417ac88537c1.tar.gz rapier-682ff61f94931ef205a9f81e7d00417ac88537c1.tar.bz2 rapier-682ff61f94931ef205a9f81e7d00417ac88537c1.zip | |
Don't let the PubSub internal offsets overflow + fix some warnings.
Diffstat (limited to 'src/data')
| -rw-r--r-- | src/data/pubsub.rs | 67 |
1 files changed, 56 insertions, 11 deletions
diff --git a/src/data/pubsub.rs b/src/data/pubsub.rs index 1ac7498..0a3432a 100644 --- a/src/data/pubsub.rs +++ b/src/data/pubsub.rs @@ -3,16 +3,28 @@ use serde::export::PhantomData; use std::collections::VecDeque; -/// The position of a subscriber on a pub-sub queue. +/// A permanent subscription to a pub-sub queue. #[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))] -pub struct PubSubCursor<T> { - // Index of the next message to read. +pub struct Subscription<T> { + // Position on the cursor array. id: u32, - next: u32, _phantom: PhantomData<T>, } -impl<T> PubSubCursor<T> { +#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))] +struct PubSubCursor { + // Position on the offset array. + id: u32, + // Index of the next message to read. + // NOTE: Having this here is not actually necessary because + // this value is supposed to be equal to `offsets[self.id]`. + // However, we keep it because it lets us avoid one lookup + // on the `offsets` array inside of message-polling loops + // based on `read_ith`. + next: u32, +} + +impl PubSubCursor { fn id(&self, num_deleted: u32) -> usize { (self.id - num_deleted) as usize } @@ -29,6 +41,7 @@ pub struct PubSub<T> { deleted_offsets: u32, messages: VecDeque<T>, offsets: VecDeque<u32>, + cursors: Vec<PubSubCursor>, } impl<T> PubSub<T> { @@ -39,7 +52,22 @@ impl<T> PubSub<T> { deleted_messages: 0, messages: VecDeque::new(), offsets: VecDeque::new(), + cursors: Vec::new(), + } + } + + fn reset_shifts(&mut self) { + for offset in &mut self.offsets { + *offset -= self.deleted_messages; } + + for cursor in &mut self.cursors { + cursor.id -= self.deleted_offsets; + cursor.next -= self.deleted_messages; + } + + self.deleted_offsets = 0; + self.deleted_messages = 0; } /// Publish a new message. @@ -55,25 +83,33 @@ impl<T> PubSub<T> { /// Subscribe to the queue. /// /// A subscription cannot be cancelled. - pub fn subscribe(&mut self) -> PubSubCursor<T> { + #[must_use] + pub fn subscribe(&mut self) -> Subscription<T> { let cursor = PubSubCursor { next: self.messages.len() as u32 + self.deleted_messages, id: self.offsets.len() as u32 + self.deleted_offsets, + }; + + let subscription = Subscription { + id: self.cursors.len() as u32, _phantom: PhantomData, }; self.offsets.push_back(cursor.next); - cursor + self.cursors.push(cursor); + subscription } /// Read the i-th message not yet read by the given subsciber. - pub fn read_ith(&self, cursor: &PubSubCursor<T>, i: usize) -> Option<&T> { + pub fn read_ith(&self, sub: &Subscription<T>, i: usize) -> Option<&T> { + let cursor = &self.cursors[sub.id as usize]; self.messages .get(cursor.next(self.deleted_messages) as usize + i) } /// Get the messages not yet read by the given subscriber. - pub fn read(&self, cursor: &PubSubCursor<T>) -> impl Iterator<Item = &T> { + pub fn read(&self, sub: &Subscription<T>) -> impl Iterator<Item = &T> { + let cursor = &self.cursors[sub.id as usize]; let next = cursor.next(self.deleted_messages); // TODO: use self.queue.range(next..) once it is stabilised. @@ -86,11 +122,14 @@ impl<T> PubSub<T> { /// Makes the given subscribe acknowledge all the messages in the queue. /// /// A subscriber cannot read acknowledged messages any more. - pub fn ack(&mut self, cursor: &mut PubSubCursor<T>) { + pub fn ack(&mut self, sub: &Subscription<T>) { // Update the cursor. - cursor.next = self.messages.len() as u32 + self.deleted_messages; + let cursor = &mut self.cursors[sub.id as usize]; + self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX; cursor.id = self.offsets.len() as u32 + self.deleted_offsets; + + cursor.next = self.messages.len() as u32 + self.deleted_messages; self.offsets.push_back(cursor.next); // Now clear the messages we don't need to @@ -110,6 +149,12 @@ impl<T> PubSub<T> { } self.deleted_messages += num_to_delete; + + if self.deleted_messages > u32::MAX / 2 || self.deleted_offsets > u32::MAX / 2 { + // Don't let the deleted_* shifts grow indefinitely otherwise + // they will end up overflowing, breaking everything. + self.reset_shifts(); + } } } |
