veecle_os_runtime/datastore/
combined_readers.rs1use core::future::{Future, poll_fn};
2use core::pin::pin;
3use core::task::Poll;
4
5pub trait CombineReaders {
11 type ToBeRead<'b>;
13
14 fn read<U>(&self, f: impl FnOnce(Self::ToBeRead<'_>) -> U) -> U;
16
17 #[allow(async_fn_in_trait)]
23 async fn wait_for_update(&mut self) -> &mut Self;
24}
25
26pub(super) trait Sealed {}
27
28#[allow(private_bounds)]
29pub trait CombinableReader: Sealed {
31 type ToBeRead: 'static;
33
34 #[doc(hidden)]
38 fn borrow(&self) -> core::cell::Ref<'_, Self::ToBeRead>;
39
40 #[doc(hidden)]
46 #[allow(async_fn_in_trait)]
47 async fn wait_for_update(&mut self);
48}
49
50macro_rules! impl_combined_reader_helper {
52 (
53 tuples: [
54 $(($($generic_type:ident)*),)*
55 ],
56 ) => {
57 $(
58 impl<$($generic_type,)*> CombineReaders for ( $( &mut $generic_type, )* )
59 where
60 $($generic_type: CombinableReader,)*
61 {
62 type ToBeRead<'x> = (
63 $(&'x <$generic_type as CombinableReader>::ToBeRead,)*
64 );
65
66 #[allow(non_snake_case)]
67 #[veecle_telemetry::instrument]
68 fn read<A>(&self, f: impl FnOnce(Self::ToBeRead<'_>) -> A) -> A {
69 let ($($generic_type,)*) = self;
70 let ($($generic_type,)*) = ($({
71 $generic_type.borrow()
72 },)*);
73 f(($(&*$generic_type,)*))
74 }
75
76 #[allow(non_snake_case)]
77 #[veecle_telemetry::instrument]
78 async fn wait_for_update(&mut self) -> &mut Self {
79 {
80 let ($($generic_type,)*) = self;
81 let ($(mut $generic_type,)*) = ($(pin!($generic_type.wait_for_update()),)*);
82 poll_fn(move |cx| {
83 let mut update_available = false;
85 $(
86 if $generic_type.as_mut().poll(cx).is_ready() {
87 update_available = true;
88 }
89 )*
90 if update_available {
91 Poll::Ready(())
92 } else {
93 Poll::Pending
94 }
95 }).await;
96 }
97 self
98 }
99 }
100 )*
101 };
102}
103
104impl_combined_reader_helper!(
105 tuples: [
106 (T U),
108 (T U V),
109 (T U V W),
110 (T U V W X),
111 (T U V W X Y),
112 (T U V W X Y Z),
113 ],
114);
115
116#[cfg(test)]
117#[cfg_attr(coverage_nightly, coverage(off))]
118mod tests {
119 use core::pin::pin;
120 use futures::FutureExt;
121
122 use crate::datastore::{
123 CombineReaders, ExclusiveReader, Reader, Slot, Storable, Writer, generational,
124 };
125
126 #[test]
127 fn read_exclusive_reader() {
128 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
129 #[storable(crate = crate)]
130 struct Sensor0(u8);
131 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
132 #[storable(crate = crate)]
133 struct Sensor1(u8);
134
135 let slot0 = pin!(Slot::<Sensor0>::new());
136 let slot1 = pin!(Slot::<Sensor1>::new());
137
138 let mut reader0 = ExclusiveReader::from_slot(slot0.as_ref());
139 let mut reader1 = ExclusiveReader::from_slot(slot1.as_ref());
140
141 (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.is_none(), b.is_none()));
142 }
143
144 #[test]
145 fn wait_for_update_exclusive_reader() {
146 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
147 #[storable(crate = crate)]
148 struct Sensor0(u8);
149 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
150 #[storable(crate = crate)]
151 struct Sensor1(u8);
152
153 let source = pin!(generational::Source::new());
154 let slot0 = pin!(Slot::<Sensor0>::new());
155 let slot1 = pin!(Slot::<Sensor1>::new());
156
157 let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
158 let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
159 let mut reader0 = ExclusiveReader::from_slot(slot0.as_ref());
160 let mut reader1 = ExclusiveReader::from_slot(slot1.as_ref());
161
162 assert!(
163 (&mut reader0, &mut reader1)
164 .wait_for_update()
165 .now_or_never()
166 .is_none()
167 );
168
169 source.as_ref().increment_generation();
170 writer0.write(Sensor0(2)).now_or_never().unwrap();
171 writer1.write(Sensor1(2)).now_or_never().unwrap();
172
173 assert!(
174 (&mut reader0, &mut reader1)
175 .wait_for_update()
176 .now_or_never()
177 .is_some()
178 );
179 assert!(
180 (&mut reader0, &mut reader1)
181 .wait_for_update()
182 .now_or_never()
183 .is_none()
184 );
185 }
186
187 #[test]
188 fn read() {
189 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
190 #[storable(crate = crate)]
191 struct Sensor0(u8);
192 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
193 #[storable(crate = crate)]
194 struct Sensor1(u8);
195
196 let source = pin!(generational::Source::new());
197 let slot0 = pin!(Slot::<Sensor0>::new());
198 let slot1 = pin!(Slot::<Sensor1>::new());
199
200 let mut reader0 = Reader::from_slot(slot0.as_ref());
201 let mut reader1 = Reader::from_slot(slot1.as_ref());
202
203 (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.is_none(), b.is_none()));
204
205 let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
206 let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
207 source.as_ref().increment_generation();
208 writer0.write(Sensor0(2)).now_or_never().unwrap();
209 writer1.write(Sensor1(2)).now_or_never().unwrap();
210
211 let mut reader0 = reader0.wait_init().now_or_never().unwrap();
212 let mut reader1 = reader1.wait_init().now_or_never().unwrap();
213
214 (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.0, b.0));
215 }
216
217 #[test]
218 fn wait_for_update() {
219 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
220 #[storable(crate = crate)]
221 struct Sensor0(u8);
222 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
223 #[storable(crate = crate)]
224 struct Sensor1(u8);
225
226 let source = pin!(generational::Source::new());
227 let slot0 = pin!(Slot::<Sensor0>::new());
228 let slot1 = pin!(Slot::<Sensor1>::new());
229
230 let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
231 let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
232 let mut reader0 = Reader::from_slot(slot0.as_ref());
233 let mut reader1 = Reader::from_slot(slot1.as_ref());
234
235 assert!(
236 (&mut reader0, &mut reader1)
237 .wait_for_update()
238 .now_or_never()
239 .is_none()
240 );
241
242 source.as_ref().increment_generation();
243 writer0.write(Sensor0(2)).now_or_never().unwrap();
244 writer1.write(Sensor1(2)).now_or_never().unwrap();
245
246 assert!(
247 (&mut reader0, &mut reader1)
248 .wait_for_update()
249 .now_or_never()
250 .is_some()
251 );
252 assert!(
253 (&mut reader0, &mut reader1)
254 .wait_for_update()
255 .now_or_never()
256 .is_none()
257 );
258
259 let mut reader0 = reader0.wait_init().now_or_never().unwrap();
260 let mut reader1 = reader1.wait_init().now_or_never().unwrap();
261
262 assert!(
263 (&mut reader0, &mut reader1)
264 .wait_for_update()
265 .now_or_never()
266 .is_none()
267 );
268
269 source.as_ref().increment_generation();
270 writer0.write(Sensor0(3)).now_or_never().unwrap();
271 writer1.write(Sensor1(3)).now_or_never().unwrap();
272
273 (&mut reader0, &mut reader1)
274 .wait_for_update()
275 .now_or_never()
276 .unwrap()
277 .read(|(a, b)| assert_eq!(a.0, b.0));
278 assert!(
279 (&mut reader0, &mut reader1)
280 .wait_for_update()
281 .now_or_never()
282 .is_none()
283 );
284 }
285
286 #[test]
287 fn read_mixed() {
288 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
289 #[storable(crate = crate)]
290 struct Sensor0(u8);
291 #[derive(Eq, PartialEq, Debug, Clone, Storable)]
292 #[storable(crate = crate)]
293 struct Sensor1(u8);
294
295 let source = pin!(generational::Source::new());
296 let slot0 = pin!(Slot::<Sensor0>::new());
297 let slot1 = pin!(Slot::<Sensor1>::new());
298
299 let mut reader0 = Reader::from_slot(slot0.as_ref());
300 let mut reader1 = ExclusiveReader::from_slot(slot1.as_ref());
301
302 (&mut reader0, &mut reader1).read(|(a, b)| assert_eq!(a.is_none(), b.is_none()));
303
304 let mut writer0 = Writer::new(source.as_ref().waiter(), slot0.as_ref());
305 let mut writer1 = Writer::new(source.as_ref().waiter(), slot1.as_ref());
306 source.as_ref().increment_generation();
307 writer0.write(Sensor0(2)).now_or_never().unwrap();
308 writer1.write(Sensor1(2)).now_or_never().unwrap();
309
310 let mut reader0 = reader0.wait_init().now_or_never().unwrap();
311
312 (&mut reader0, &mut reader1)
313 .read(|(a, b): (&Sensor0, &Option<Sensor1>)| assert_eq!(a.0, b.as_ref().unwrap().0));
314 }
315}