1#![allow(non_camel_case_types)] use super::{Format, Target};
4use core::mem::MaybeUninit;
5use heapless::{
6 box_pool,
7 pool::boxed::{Box, BoxBlock},
8 spsc::{Consumer, Producer, Queue},
9};
10use smoltcp_nal::embedded_nal::{UdpClientStack, nb};
11
12const MAGIC: u16 = 0x057B;
14
15const HEADER_SIZE: usize = 8;
19
20const FRAME_COUNT: usize = 4;
22
23const FRAME_SIZE: usize = 1500 - 40 - 8 - 32 - 20;
27
28const FRAME_QUEUE_SIZE: usize = FRAME_COUNT * 2;
31
32type Frame = [MaybeUninit<u8>; FRAME_SIZE];
33
34box_pool!(FRAME_POOL: Frame);
35
36#[cfg(target_arch = "arm")]
45pub fn setup<N: UdpClientStack<Error = smoltcp_nal::NetworkError>>(
46 stack: N,
47) -> (FrameGenerator, DataStream<N>) {
48 let queue =
51 cortex_m::singleton!(: Queue<StreamFrame, FRAME_QUEUE_SIZE> = Queue::new())
52 .unwrap();
53 let (producer, consumer) = queue.split();
54
55 #[allow(clippy::declare_interior_mutable_const)]
56 const FRAME: BoxBlock<Frame> = BoxBlock::new();
57 let memory =
58 cortex_m::singleton!(FRAME_DATA: [BoxBlock<Frame>; FRAME_COUNT] =
59 [FRAME; FRAME_COUNT])
60 .unwrap();
61
62 for block in memory.iter_mut() {
63 FRAME_POOL.manage(block);
64 }
65
66 let generator = FrameGenerator::new(producer);
67
68 let stream = DataStream::new(stack, consumer);
69
70 (generator, stream)
71}
72
73#[derive(Debug)]
74struct StreamFrame {
75 buffer: Box<FRAME_POOL>,
76 offset: usize,
77 batches: u8,
78}
79
80impl StreamFrame {
81 pub fn new(
82 mut buffer: Box<FRAME_POOL>,
83 format_id: u8,
84 sequence_number: u32,
85 ) -> Self {
86 for (byte, buf) in MAGIC
87 .to_le_bytes()
88 .iter()
89 .chain(&[format_id, 0])
90 .chain(sequence_number.to_le_bytes().iter())
91 .zip(buffer.iter_mut())
92 {
93 buf.write(*byte);
94 }
95
96 Self {
97 buffer,
98 offset: HEADER_SIZE,
99 batches: 0,
100 }
101 }
102
103 pub fn add_batch<F>(&mut self, mut f: F) -> usize
104 where
105 F: FnMut(&mut [MaybeUninit<u8>]) -> usize,
106 {
107 let len = f(&mut self.buffer[self.offset..]);
108 self.offset += len;
109 self.batches += 1;
110 len
111 }
112
113 pub fn is_full(&self, len: usize) -> bool {
114 self.offset + len > self.buffer.len()
115 }
116
117 pub fn finish(&mut self) -> &[MaybeUninit<u8>] {
118 self.buffer[3].write(self.batches);
119 &self.buffer[..self.offset]
120 }
121}
122
123pub struct FrameGenerator {
125 queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
126 current_frame: Option<StreamFrame>,
127 sequence_number: u32,
128 format: u8,
129}
130
131impl FrameGenerator {
132 fn new(queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>) -> Self {
133 Self {
134 queue,
135 format: Format::Unknown.into(),
136 current_frame: None,
137 sequence_number: 0,
138 }
139 }
140
141 pub fn configure(&mut self, format: impl Into<u8>) {
149 self.format = format.into();
150 }
151
152 pub fn add<F>(&mut self, func: F)
159 where
160 F: FnMut(&mut [MaybeUninit<u8>]) -> usize,
161 {
162 let sequence_number = self.sequence_number;
163 self.sequence_number = self.sequence_number.wrapping_add(1);
164
165 let current_frame = match self.current_frame.as_mut() {
166 None => {
167 if let Ok(buffer) =
168 FRAME_POOL.alloc([MaybeUninit::uninit(); FRAME_SIZE])
169 {
170 self.current_frame.insert(StreamFrame::new(
171 buffer,
172 self.format,
173 sequence_number,
174 ))
175 } else {
176 return;
177 }
178 }
179 Some(frame) => frame,
180 };
181
182 let len = current_frame.add_batch(func);
183
184 if current_frame.is_full(len) {
185 if let Some(frame) = self.current_frame.take() {
188 self.queue.enqueue(frame).unwrap();
189 }
190 }
191 }
192}
193
194pub struct DataStream<N: UdpClientStack> {
199 stack: N,
200 socket: Option<<N as UdpClientStack>::UdpSocket>,
201 queue: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
202 remote: Target,
203}
204
205impl<N: UdpClientStack<Error = smoltcp_nal::NetworkError>> DataStream<N> {
206 fn new(
213 stack: N,
214 consumer: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
215 ) -> Self {
216 Self {
217 stack,
218 socket: None,
219 remote: Target::default(),
220 queue: consumer,
221 }
222 }
223
224 fn close(&mut self) {
225 if let Some(socket) = self.socket.take() {
226 log::info!("Closing stream");
227 self.stack.close(socket).unwrap();
229 }
230 }
231
232 fn open(&mut self) -> Result<(), ()> {
234 if self.socket.is_some() || self.remote.0.ip().is_unspecified() {
237 return Err(());
238 }
239
240 let mut socket = self.stack.socket().or(Err(()))?;
241
242 if self.stack.connect(&mut socket, self.remote.0).is_err() {
244 self.stack.close(socket).unwrap();
245 return Err(());
246 }
247
248 self.socket.replace(socket);
249
250 log::info!("Opening stream");
251
252 Ok(())
253 }
254
255 pub fn set_remote(&mut self, remote: Target) {
260 if remote != self.remote {
262 self.close();
263 }
264 self.remote = remote;
265 }
266
267 pub fn process(&mut self) {
269 match self.socket.as_mut() {
270 None => {
271 if self.open().is_ok() {
273 while let Some(frame) = self.queue.dequeue() {
275 drop(frame.buffer);
276 }
277 }
278 }
279 Some(handle) => {
280 if let Some(mut frame) = self.queue.dequeue() {
281 let buf = frame.finish();
283 let data = unsafe {
284 core::slice::from_raw_parts(
285 buf.as_ptr() as *const u8,
286 size_of_val(buf),
287 )
288 };
289
290 match self.stack.send(handle, data) {
293 Ok(_) => {},
294
295 Err(nb::Error::Other(smoltcp_nal::NetworkError::UdpWriteFailure(smoltcp_nal::smoltcp::socket::udp::SendError::Unaddressable))) => {
297 log::warn!( "IP address updated during stream. Reopening socket");
298 let socket = self.socket.take().unwrap();
299 self.stack.close(socket).unwrap();
300 }
301
302 Err(nb::Error::Other(smoltcp_nal::NetworkError::UdpWriteFailure(smoltcp_nal::smoltcp::socket::udp::SendError::BufferFull))) => {}
305
306 Err(other) => {
307 log::warn!("Unexpected UDP error during data stream: {other:?}");
308 }
309 }
310 drop(frame.buffer)
311 }
312 }
313 }
314 }
315}