stream/
stream.rs

1#![allow(non_camel_case_types)] // https://github.com/rust-embedded/heapless/issues/411
2
3use super::{Format, Target};
4use core::mem::MaybeUninit;
5use heapless::{
6    box_pool,
7    pool::boxed::{Box, BoxBlock},
8    spsc::{Consumer, Producer, Queue},
9};
10use smoltcp_nal::embedded_nal::{UdpClientStack, nb};
11
12// Magic first bytes indicating a UDP frame of straming data
13const MAGIC: u16 = 0x057B;
14
15// The size of the header, calculated in words.
16// The header has a 16-bit magic word, an 8-bit format, 8-bit batch-size, and 32-bit sequence
17// number, which corresponds to 8 bytes.
18const HEADER_SIZE: usize = 8;
19
20// The number of frames that can be buffered.
21const FRAME_COUNT: usize = 4;
22
23// The size of each frame in bytes.
24// Ensure the resulting ethernet frame is within the MTU:
25// 1500 MTU - 40 IP6 header - 8 UDP header - 32 VPN - 20 IP4
26const FRAME_SIZE: usize = 1500 - 40 - 8 - 32 - 20;
27
28// The size of the frame queue must be at least as large as the number of frame buffers. Every
29// allocated frame buffer should fit in the queue.
30const FRAME_QUEUE_SIZE: usize = FRAME_COUNT * 2;
31
32type Frame = [MaybeUninit<u8>; FRAME_SIZE];
33
34box_pool!(FRAME_POOL: Frame);
35
36/// Configure streaming on a device.
37///
38/// # Args
39/// * `stack` - A reference to the shared network stack.
40///
41/// # Returns
42/// (generator, stream) where `generator` can be used to enqueue "batches" for transmission. The
43/// `stream` is the logically consumer (UDP transmitter) of the enqueued data.
44#[cfg(target_arch = "arm")]
45pub fn setup<N: UdpClientStack<Error = smoltcp_nal::NetworkError>>(
46    stack: N,
47) -> (FrameGenerator, DataStream<N>) {
48    // The queue needs to be at least as large as the frame count to ensure that every allocated
49    // frame can potentially be enqueued for transmission.
50    let queue =
51        cortex_m::singleton!(: Queue<StreamFrame, FRAME_QUEUE_SIZE> = Queue::new())
52            .unwrap();
53    let (producer, consumer) = queue.split();
54
55    #[allow(clippy::declare_interior_mutable_const)]
56    const FRAME: BoxBlock<Frame> = BoxBlock::new();
57    let memory =
58        cortex_m::singleton!(FRAME_DATA: [BoxBlock<Frame>; FRAME_COUNT] =
59    [FRAME; FRAME_COUNT])
60        .unwrap();
61
62    for block in memory.iter_mut() {
63        FRAME_POOL.manage(block);
64    }
65
66    let generator = FrameGenerator::new(producer);
67
68    let stream = DataStream::new(stack, consumer);
69
70    (generator, stream)
71}
72
73#[derive(Debug)]
74struct StreamFrame {
75    buffer: Box<FRAME_POOL>,
76    offset: usize,
77    batches: u8,
78}
79
80impl StreamFrame {
81    pub fn new(
82        mut buffer: Box<FRAME_POOL>,
83        format_id: u8,
84        sequence_number: u32,
85    ) -> Self {
86        for (byte, buf) in MAGIC
87            .to_le_bytes()
88            .iter()
89            .chain(&[format_id, 0])
90            .chain(sequence_number.to_le_bytes().iter())
91            .zip(buffer.iter_mut())
92        {
93            buf.write(*byte);
94        }
95
96        Self {
97            buffer,
98            offset: HEADER_SIZE,
99            batches: 0,
100        }
101    }
102
103    pub fn add_batch<F>(&mut self, mut f: F) -> usize
104    where
105        F: FnMut(&mut [MaybeUninit<u8>]) -> usize,
106    {
107        let len = f(&mut self.buffer[self.offset..]);
108        self.offset += len;
109        self.batches += 1;
110        len
111    }
112
113    pub fn is_full(&self, len: usize) -> bool {
114        self.offset + len > self.buffer.len()
115    }
116
117    pub fn finish(&mut self) -> &[MaybeUninit<u8>] {
118        self.buffer[3].write(self.batches);
119        &self.buffer[..self.offset]
120    }
121}
122
123/// The data generator for a stream.
124pub struct FrameGenerator {
125    queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
126    current_frame: Option<StreamFrame>,
127    sequence_number: u32,
128    format: u8,
129}
130
131impl FrameGenerator {
132    fn new(queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>) -> Self {
133        Self {
134            queue,
135            format: Format::Unknown.into(),
136            current_frame: None,
137            sequence_number: 0,
138        }
139    }
140
141    /// Configure the format of the stream.
142    ///
143    /// # Note:
144    /// This function shall only be called once upon initializing streaming
145    ///
146    /// # Args
147    /// * `format` - The desired format of the stream.
148    pub fn configure(&mut self, format: impl Into<u8>) {
149        self.format = format.into();
150    }
151
152    /// Add a batch to the current stream frame.
153    ///
154    /// # Args
155    /// * `f` - A closure that will be provided the buffer to write batch data into.
156    ///
157    /// Returns the number of bytes written.
158    pub fn add<F>(&mut self, func: F)
159    where
160        F: FnMut(&mut [MaybeUninit<u8>]) -> usize,
161    {
162        let sequence_number = self.sequence_number;
163        self.sequence_number = self.sequence_number.wrapping_add(1);
164
165        let current_frame = match self.current_frame.as_mut() {
166            None => {
167                if let Ok(buffer) =
168                    FRAME_POOL.alloc([MaybeUninit::uninit(); FRAME_SIZE])
169                {
170                    self.current_frame.insert(StreamFrame::new(
171                        buffer,
172                        self.format,
173                        sequence_number,
174                    ))
175                } else {
176                    return;
177                }
178            }
179            Some(frame) => frame,
180        };
181
182        let len = current_frame.add_batch(func);
183
184        if current_frame.is_full(len) {
185            // Note(unwrap): The queue is designed to be at least as large as the frame buffer
186            // count, so this enqueue should always succeed.
187            if let Some(frame) = self.current_frame.take() {
188                self.queue.enqueue(frame).unwrap();
189            }
190        }
191    }
192}
193
194/// The "consumer" portion of the data stream.
195///
196/// # Note
197/// This is responsible for consuming data and sending it over UDP.
198pub struct DataStream<N: UdpClientStack> {
199    stack: N,
200    socket: Option<<N as UdpClientStack>::UdpSocket>,
201    queue: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
202    remote: Target,
203}
204
205impl<N: UdpClientStack<Error = smoltcp_nal::NetworkError>> DataStream<N> {
206    /// Construct a new data streamer.
207    ///
208    /// # Args
209    /// * `stack` - A reference to the shared network stack.
210    /// * `consumer` - The read side of the queue containing data to transmit.
211    /// * `frame_pool` - The Pool to return stream frame objects into.
212    fn new(
213        stack: N,
214        consumer: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
215    ) -> Self {
216        Self {
217            stack,
218            socket: None,
219            remote: Target::default(),
220            queue: consumer,
221        }
222    }
223
224    fn close(&mut self) {
225        if let Some(socket) = self.socket.take() {
226            log::info!("Closing stream");
227            // Note(unwrap): We guarantee that the socket is available above.
228            self.stack.close(socket).unwrap();
229        }
230    }
231
232    // Open new socket.
233    fn open(&mut self) -> Result<(), ()> {
234        // If there is already a socket of if remote address is unspecified,
235        // do not open a new socket.
236        if self.socket.is_some() || self.remote.0.ip().is_unspecified() {
237            return Err(());
238        }
239
240        let mut socket = self.stack.socket().or(Err(()))?;
241
242        // We may fail to connect if we don't have an IP address yet.
243        if self.stack.connect(&mut socket, self.remote.0).is_err() {
244            self.stack.close(socket).unwrap();
245            return Err(());
246        }
247
248        self.socket.replace(socket);
249
250        log::info!("Opening stream");
251
252        Ok(())
253    }
254
255    /// Configure the remote endpoint of the stream.
256    ///
257    /// # Args
258    /// * `remote` - The destination to send stream data to.
259    pub fn set_remote(&mut self, remote: Target) {
260        // Close socket to be reopened if the remote has changed.
261        if remote != self.remote {
262            self.close();
263        }
264        self.remote = remote;
265    }
266
267    /// Process any data for transmission.
268    pub fn process(&mut self) {
269        match self.socket.as_mut() {
270            None => {
271                // If there's no socket available, try to connect to our remote.
272                if self.open().is_ok() {
273                    // If we just successfully opened the socket, flush old data from queue.
274                    while let Some(frame) = self.queue.dequeue() {
275                        drop(frame.buffer);
276                    }
277                }
278            }
279            Some(handle) => {
280                if let Some(mut frame) = self.queue.dequeue() {
281                    // Transmit the frame and return it to the pool.
282                    let buf = frame.finish();
283                    let data = unsafe {
284                        core::slice::from_raw_parts(
285                            buf.as_ptr() as *const u8,
286                            size_of_val(buf),
287                        )
288                    };
289
290                    // If we fail to send, it can only be because the socket got closed on us (i.e.
291                    // address update due to DHCP). If this happens, reopen the socket.
292                    match self.stack.send(handle, data) {
293                        Ok(_) => {},
294
295                        // Our IP address may have changedm so handle reopening the UDP stream.
296                        Err(nb::Error::Other(smoltcp_nal::NetworkError::UdpWriteFailure(smoltcp_nal::smoltcp::socket::udp::SendError::Unaddressable))) => {
297                            log::warn!( "IP address updated during stream. Reopening socket");
298                            let socket = self.socket.take().unwrap();
299                            self.stack.close(socket).unwrap();
300                        }
301
302                        // The buffer should clear up once ICMP resolves the IP address, so ignore
303                        // this error.
304                        Err(nb::Error::Other(smoltcp_nal::NetworkError::UdpWriteFailure(smoltcp_nal::smoltcp::socket::udp::SendError::BufferFull))) => {}
305
306                        Err(other) => {
307                            log::warn!("Unexpected UDP error during data stream: {other:?}");
308                        }
309                    }
310                    drop(frame.buffer)
311                }
312            }
313        }
314    }
315}