aboutsummaryrefslogtreecommitdiff
path: root/src/data
diff options
context:
space:
mode:
authorCrozet Sébastien <developer@crozet.re>2020-10-06 15:23:48 +0200
committerCrozet Sébastien <developer@crozet.re>2020-10-06 15:23:48 +0200
commit682ff61f94931ef205a9f81e7d00417ac88537c1 (patch)
tree405e0f7df3858de7387467f963088fce3244ba32 /src/data
parent7c92848383b9f76bde010f00683a44453b2b456a (diff)
downloadrapier-682ff61f94931ef205a9f81e7d00417ac88537c1.tar.gz
rapier-682ff61f94931ef205a9f81e7d00417ac88537c1.tar.bz2
rapier-682ff61f94931ef205a9f81e7d00417ac88537c1.zip
Don't let the PubSub internal offsets overflow + fix some warnings.
Diffstat (limited to 'src/data')
-rw-r--r--src/data/pubsub.rs67
1 files changed, 56 insertions, 11 deletions
diff --git a/src/data/pubsub.rs b/src/data/pubsub.rs
index 1ac7498..0a3432a 100644
--- a/src/data/pubsub.rs
+++ b/src/data/pubsub.rs
@@ -3,16 +3,28 @@
use serde::export::PhantomData;
use std::collections::VecDeque;
-/// The position of a subscriber on a pub-sub queue.
+/// A permanent subscription to a pub-sub queue.
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
-pub struct PubSubCursor<T> {
- // Index of the next message to read.
+pub struct Subscription<T> {
+ // Position on the cursor array.
id: u32,
- next: u32,
_phantom: PhantomData<T>,
}
-impl<T> PubSubCursor<T> {
+#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
+struct PubSubCursor {
+ // Position on the offset array.
+ id: u32,
+ // Index of the next message to read.
+ // NOTE: Having this here is not actually necessary because
+ // this value is supposed to be equal to `offsets[self.id]`.
+ // However, we keep it because it lets us avoid one lookup
+ // on the `offsets` array inside of message-polling loops
+ // based on `read_ith`.
+ next: u32,
+}
+
+impl PubSubCursor {
fn id(&self, num_deleted: u32) -> usize {
(self.id - num_deleted) as usize
}
@@ -29,6 +41,7 @@ pub struct PubSub<T> {
deleted_offsets: u32,
messages: VecDeque<T>,
offsets: VecDeque<u32>,
+ cursors: Vec<PubSubCursor>,
}
impl<T> PubSub<T> {
@@ -39,7 +52,22 @@ impl<T> PubSub<T> {
deleted_messages: 0,
messages: VecDeque::new(),
offsets: VecDeque::new(),
+ cursors: Vec::new(),
+ }
+ }
+
+ fn reset_shifts(&mut self) {
+ for offset in &mut self.offsets {
+ *offset -= self.deleted_messages;
}
+
+ for cursor in &mut self.cursors {
+ cursor.id -= self.deleted_offsets;
+ cursor.next -= self.deleted_messages;
+ }
+
+ self.deleted_offsets = 0;
+ self.deleted_messages = 0;
}
/// Publish a new message.
@@ -55,25 +83,33 @@ impl<T> PubSub<T> {
/// Subscribe to the queue.
///
/// A subscription cannot be cancelled.
- pub fn subscribe(&mut self) -> PubSubCursor<T> {
+ #[must_use]
+ pub fn subscribe(&mut self) -> Subscription<T> {
let cursor = PubSubCursor {
next: self.messages.len() as u32 + self.deleted_messages,
id: self.offsets.len() as u32 + self.deleted_offsets,
+ };
+
+ let subscription = Subscription {
+ id: self.cursors.len() as u32,
_phantom: PhantomData,
};
self.offsets.push_back(cursor.next);
- cursor
+ self.cursors.push(cursor);
+ subscription
}
/// Read the i-th message not yet read by the given subsciber.
- pub fn read_ith(&self, cursor: &PubSubCursor<T>, i: usize) -> Option<&T> {
+ pub fn read_ith(&self, sub: &Subscription<T>, i: usize) -> Option<&T> {
+ let cursor = &self.cursors[sub.id as usize];
self.messages
.get(cursor.next(self.deleted_messages) as usize + i)
}
/// Get the messages not yet read by the given subscriber.
- pub fn read(&self, cursor: &PubSubCursor<T>) -> impl Iterator<Item = &T> {
+ pub fn read(&self, sub: &Subscription<T>) -> impl Iterator<Item = &T> {
+ let cursor = &self.cursors[sub.id as usize];
let next = cursor.next(self.deleted_messages);
// TODO: use self.queue.range(next..) once it is stabilised.
@@ -86,11 +122,14 @@ impl<T> PubSub<T> {
/// Makes the given subscribe acknowledge all the messages in the queue.
///
/// A subscriber cannot read acknowledged messages any more.
- pub fn ack(&mut self, cursor: &mut PubSubCursor<T>) {
+ pub fn ack(&mut self, sub: &Subscription<T>) {
// Update the cursor.
- cursor.next = self.messages.len() as u32 + self.deleted_messages;
+ let cursor = &mut self.cursors[sub.id as usize];
+
self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX;
cursor.id = self.offsets.len() as u32 + self.deleted_offsets;
+
+ cursor.next = self.messages.len() as u32 + self.deleted_messages;
self.offsets.push_back(cursor.next);
// Now clear the messages we don't need to
@@ -110,6 +149,12 @@ impl<T> PubSub<T> {
}
self.deleted_messages += num_to_delete;
+
+ if self.deleted_messages > u32::MAX / 2 || self.deleted_offsets > u32::MAX / 2 {
+ // Don't let the deleted_* shifts grow indefinitely otherwise
+ // they will end up overflowing, breaking everything.
+ self.reset_shifts();
+ }
}
}