aboutsummaryrefslogtreecommitdiff
path: root/src/data/pubsub.rs
blob: 1ac7498645b6c25641b7065ddd1874e18b88ff0a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//! Publish-subscribe mechanism for internal events.

use serde::export::PhantomData;
use std::collections::VecDeque;

/// The position of a subscriber on a pub-sub queue.
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
pub struct PubSubCursor<T> {
    // Index of the next message to read.
    id: u32,
    next: u32,
    _phantom: PhantomData<T>,
}

impl<T> PubSubCursor<T> {
    fn id(&self, num_deleted: u32) -> usize {
        (self.id - num_deleted) as usize
    }

    fn next(&self, num_deleted: u32) -> usize {
        (self.next - num_deleted) as usize
    }
}

/// A pub-sub queue.
#[cfg_attr(feature = "serde-serialize", derive(Serialize, Deserialize))]
pub struct PubSub<T> {
    deleted_messages: u32,
    deleted_offsets: u32,
    messages: VecDeque<T>,
    offsets: VecDeque<u32>,
}

impl<T> PubSub<T> {
    /// Create a new empty pub-sub queue.
    pub fn new() -> Self {
        Self {
            deleted_offsets: 0,
            deleted_messages: 0,
            messages: VecDeque::new(),
            offsets: VecDeque::new(),
        }
    }

    /// Publish a new message.
    pub fn publish(&mut self, message: T) {
        if self.offsets.is_empty() {
            // No subscribers, drop the message.
            return;
        }

        self.messages.push_back(message);
    }

    /// Subscribe to the queue.
    ///
    /// A subscription cannot be cancelled.
    pub fn subscribe(&mut self) -> PubSubCursor<T> {
        let cursor = PubSubCursor {
            next: self.messages.len() as u32 + self.deleted_messages,
            id: self.offsets.len() as u32 + self.deleted_offsets,
            _phantom: PhantomData,
        };

        self.offsets.push_back(cursor.next);
        cursor
    }

    /// Read the i-th message not yet read by the given subsciber.
    pub fn read_ith(&self, cursor: &PubSubCursor<T>, i: usize) -> Option<&T> {
        self.messages
            .get(cursor.next(self.deleted_messages) as usize + i)
    }

    /// Get the messages not yet read by the given subscriber.
    pub fn read(&self, cursor: &PubSubCursor<T>) -> impl Iterator<Item = &T> {
        let next = cursor.next(self.deleted_messages);

        // TODO: use self.queue.range(next..) once it is stabilised.
        MessageRange {
            queue: &self.messages,
            next,
        }
    }

    /// Makes the given subscribe acknowledge all the messages in the queue.
    ///
    /// A subscriber cannot read acknowledged messages any more.
    pub fn ack(&mut self, cursor: &mut PubSubCursor<T>) {
        // Update the cursor.
        cursor.next = self.messages.len() as u32 + self.deleted_messages;
        self.offsets[cursor.id(self.deleted_offsets)] = u32::MAX;
        cursor.id = self.offsets.len() as u32 + self.deleted_offsets;
        self.offsets.push_back(cursor.next);

        // Now clear the messages we don't need to
        // maintain in memory anymore.
        while self.offsets.front() == Some(&u32::MAX) {
            self.offsets.pop_front();
            self.deleted_offsets += 1;
        }

        // There must be at least one offset otherwise
        // that would mean we have no subscribers.
        let next = self.offsets.front().unwrap();
        let num_to_delete = *next - self.deleted_messages;

        for _ in 0..num_to_delete {
            self.messages.pop_front();
        }

        self.deleted_messages += num_to_delete;
    }
}

struct MessageRange<'a, T> {
    queue: &'a VecDeque<T>,
    next: usize,
}

impl<'a, T> Iterator for MessageRange<'a, T> {
    type Item = &'a T;

    #[inline(always)]
    fn next(&mut self) -> Option<&'a T> {
        let result = self.queue.get(self.next);
        self.next += 1;
        result
    }
}