veecle_os_runtime/
actor.rs

1//! Smallest unit of work within a runtime instance.
2use core::convert::Infallible;
3use core::pin::Pin;
4
5#[doc(inline)]
6pub use veecle_os_runtime_macros::actor;
7
8use crate::datastore::{ExclusiveReader, InitializedReader, Reader, Storable, Writer};
9use crate::datastore::{Slot, generational};
10
11mod sealed {
12    pub trait Sealed {}
13}
14
15/// Actor interface.
16///
17/// The [`Actor`] trait allows writing actors that communicate within a runtime.
18/// It allows to define an initial context, which will be available for the whole life of the actor;
19/// a constructor method, with all the [`StoreRequest`] types it needs to communicate with other actors;
20/// and also the [`Actor::run`] method.
21///
22/// # Usage
23///
24/// Add the `Actor` implementing types to the actor list in [`veecle_os::runtime::execute!`](crate::execute!) when
25/// constructing a runtime instance.
26///
27/// The [`Actor::run`] method implements the actor's event loop.
28/// To yield back to the executor, every event loop must contain at least one `await`.
29/// Otherwise, the endless loop of the actor will block the executor and other actors.
30///
31/// ## Macros
32///
33/// The [`actor`][macro@crate::actor::actor] attribute macro can be used to implement actors.
34/// The function the macro is applied to is converted into the event loop.
35/// See its documentation for more details.
36///
37/// ### Example
38///
39/// ```rust
40/// # use std::convert::Infallible;
41/// # use std::fmt::Debug;
42/// #
43/// # use veecle_os_runtime::{Storable, Reader, Writer};
44/// #
45/// # #[derive(Debug, Default, Storable)]
46/// # pub struct Foo;
47/// #
48/// # #[derive(Debug, Default, Storable)]
49/// # pub struct Bar;
50/// #
51/// # pub struct Ctx;
52///
53/// #[veecle_os_runtime::actor]
54/// async fn my_actor(
55///     reader: Reader<'_, Foo>,
56///     writer: Writer<'_, Bar>,
57///     #[init_context] ctx: Ctx,
58/// ) -> Infallible {
59///     loop {
60///         // Do something here.
61///     }
62/// }
63/// ```
64///
65/// This will create a new struct called `MyActor` which implements [`Actor`], letting you register it into a runtime.
66///
67/// ## Manual
68///
69/// For cases where the macro is not sufficient, the [`Actor`] trait can also be implemented manually:
70///
71/// ```rust
72/// # use std::convert::Infallible;
73/// # use std::fmt::Debug;
74/// #
75/// # use veecle_os_runtime::{Storable, Reader, Writer, Actor};
76/// #
77/// # #[derive(Debug, Default, Storable)]
78/// # pub struct Foo;
79/// #
80/// # #[derive(Debug, Default, Storable)]
81/// # pub struct Bar;
82/// #
83/// # pub struct Ctx;
84///
85/// struct MyActor<'a> {
86///     reader: Reader<'a, Foo>,
87///     writer: Writer<'a, Bar>,
88///     context: Ctx,
89/// }
90///
91/// impl<'a> Actor<'a> for MyActor<'a> {
92///     type StoreRequest = (Reader<'a, Foo>, Writer<'a, Bar>);
93///     type InitContext = Ctx;
94///     type Error = Infallible;
95///
96///     fn new((reader, writer): Self::StoreRequest, context: Self::InitContext) -> Self {
97///         Self {
98///             reader,
99///             writer,
100///             context,
101///         }
102///     }
103///
104///     async fn run(mut self) -> Result<Infallible, Self::Error> {
105///         loop {
106///             // Do something here.
107///         }
108///     }
109/// }
110/// ```
111pub trait Actor<'a> {
112    /// [`Reader`]s and [`Writer`]s this actor requires.
113    type StoreRequest: StoreRequest<'a>;
114
115    /// Context that needs to be passed to the actor at initialisation.
116    type InitContext;
117
118    /// Error that this actor might return while running.
119    ///
120    /// This error is treated as fatal, if any actor returns an error the whole runtime will shutdown.
121    type Error: core::error::Error;
122
123    /// Creates a new instance of the struct implementing [`Actor`].
124    ///
125    /// See the [crate documentation][crate] for examples.
126    fn new(input: Self::StoreRequest, init_context: Self::InitContext) -> Self;
127
128    /// Runs the [`Actor`] event loop.
129    ///
130    /// See the [crate documentation][crate] for examples.
131    fn run(
132        self,
133    ) -> impl core::future::Future<Output = Result<core::convert::Infallible, Self::Error>>;
134}
135
136/// Allows requesting a (nearly) arbitrary amount of [`Reader`]s and [`Writer`]s in an [`Actor`].
137///
138/// This trait is not intended for direct usage by users.
139// Developer notes: This works by using type inference via `Datastore::reader` etc. to request `Reader`s etc. from the
140// `Datastore`.
141pub trait StoreRequest<'a>: sealed::Sealed {
142    /// Requests an instance of `Self` from the [`Datastore`].
143    #[doc(hidden)]
144    #[allow(async_fn_in_trait)] // It's actually private so it's fine.
145    async fn request(datastore: Pin<&'a impl Datastore>) -> Self;
146}
147
148impl sealed::Sealed for () {}
149
150/// Internal trait to abstract out type-erased and concrete data stores.
151pub trait Datastore {
152    /// Returns a generational source tracking the global datastore generation.
153    ///
154    /// This is used to ensure that every reader has had (or will have) a chance to read a value before a writer may
155    /// overwrite it.
156    fn source(self: Pin<&Self>) -> Pin<&generational::Source>;
157
158    #[expect(rustdoc::private_intra_doc_links)] // `rustdoc` is buggy with links from "pub" but unreachable types.
159    /// Returns a reference to the slot for a specific type.
160    ///
161    /// # Panics
162    ///
163    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
164    #[expect(private_interfaces)] // The methods are internal.
165    fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
166    where
167        T: Storable + 'static;
168}
169
170impl<S> Datastore for Pin<&S>
171where
172    S: Datastore,
173{
174    fn source(self: Pin<&Self>) -> Pin<&generational::Source> {
175        Pin::into_inner(self).source()
176    }
177
178    #[expect(private_interfaces)] // The methods are internal.
179    fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
180    where
181        T: Storable + 'static,
182    {
183        Pin::into_inner(self).slot()
184    }
185}
186
187pub(crate) trait DatastoreExt<'a>: Copy {
188    #[cfg(test)]
189    /// Increments the global datastore generation.
190    ///
191    /// Asserts that every reader has had (or will have) a chance to read a value before a writer may overwrite it.
192    fn increment_generation(self);
193
194    /// Returns the [`Reader`] for a specific slot.
195    ///
196    /// # Panics
197    ///
198    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
199    fn reader<T>(self) -> Reader<'a, T>
200    where
201        T: Storable + 'static;
202
203    /// Returns the [`ExclusiveReader`] for a specific slot.
204    ///
205    /// Exclusivity of the reader is not guaranteed by this method and must be ensured via other means (e.g.
206    /// [`crate::execute::validate_actors`]).
207    ///
208    /// # Panics
209    ///
210    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
211    fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
212    where
213        T: Storable + 'static;
214
215    /// Returns the [`Writer`] for a specific slot.
216    ///
217    /// # Panics
218    ///
219    /// * If the [`Writer`] for this slot has already been acquired.
220    ///
221    /// * If there is no [`Slot`] for `T` in the [`Datastore`].
222    fn writer<T>(self) -> Writer<'a, T>
223    where
224        T: Storable + 'static;
225}
226
227impl<'a, S> DatastoreExt<'a> for Pin<&'a S>
228where
229    S: Datastore,
230{
231    #[cfg(test)]
232    #[cfg_attr(coverage_nightly, coverage(off))]
233    fn increment_generation(self) {
234        self.source().increment_generation()
235    }
236
237    fn reader<T>(self) -> Reader<'a, T>
238    where
239        T: Storable + 'static,
240    {
241        Reader::from_slot(self.slot::<T>())
242    }
243
244    fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
245    where
246        T: Storable + 'static,
247    {
248        ExclusiveReader::from_slot(self.slot::<T>())
249    }
250
251    fn writer<T>(self) -> Writer<'a, T>
252    where
253        T: Storable + 'static,
254    {
255        Writer::new(self.source().waiter(), self.slot::<T>())
256    }
257}
258
259/// Implements a no-op for Actors that do not read or write any values.
260impl<'a> StoreRequest<'a> for () {
261    async fn request(_store: Pin<&'a impl Datastore>) -> Self {}
262}
263
264impl<T> sealed::Sealed for Reader<'_, T> where T: Storable + 'static {}
265
266impl<'a, T> StoreRequest<'a> for Reader<'a, T>
267where
268    T: Storable + 'static,
269{
270    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
271        datastore.reader()
272    }
273}
274
275impl<T> sealed::Sealed for ExclusiveReader<'_, T> where T: Storable + 'static {}
276
277impl<'a, T> StoreRequest<'a> for ExclusiveReader<'a, T>
278where
279    T: Storable + 'static,
280{
281    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
282        datastore.exclusive_reader()
283    }
284}
285
286impl<T> sealed::Sealed for InitializedReader<'_, T> where T: Storable + 'static {}
287
288impl<'a, T> StoreRequest<'a> for InitializedReader<'a, T>
289where
290    T: Storable + 'static,
291{
292    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
293        Reader::from_slot(datastore.slot()).wait_init().await
294    }
295}
296
297impl<T> sealed::Sealed for Writer<'_, T> where T: Storable + 'static {}
298
299impl<'a, T> StoreRequest<'a> for Writer<'a, T>
300where
301    T: Storable + 'static,
302{
303    async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
304        datastore.writer()
305    }
306}
307
308/// Implements [`StoreRequest`] for provided types.
309macro_rules! impl_request_helper {
310    ($t:ident) => {
311        #[cfg_attr(docsrs, doc(fake_variadic))]
312        /// This trait is implemented for tuples up to seven items long.
313        impl<'a, $t> sealed::Sealed for ($t,) { }
314
315        #[cfg_attr(docsrs, doc(fake_variadic))]
316        /// This trait is implemented for tuples up to seven items long.
317        impl<'a, $t> StoreRequest<'a> for ($t,)
318        where
319            $t: StoreRequest<'a>,
320        {
321            async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
322                (<$t as StoreRequest>::request(datastore).await,)
323            }
324        }
325    };
326
327    (@impl $($t:ident)*) => {
328        #[cfg_attr(docsrs, doc(hidden))]
329        impl<'a, $($t),*> sealed::Sealed for ( $( $t, )* )
330        where
331            $($t: sealed::Sealed),*
332        { }
333
334        #[cfg_attr(docsrs, doc(hidden))]
335        impl<'a, $($t),*> StoreRequest<'a> for ( $( $t, )* )
336        where
337            $($t: StoreRequest<'a>),*
338        {
339            async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
340                // join! is necessary here to avoid argument-order-dependence with the #[actor] macro.
341                // This ensures that any `InitializedReaders` in self correctly track the generation at which they were
342                // first ready, so that the first `wait_for_update` sees the value that caused them to become
343                // initialized.
344                // See `multi_request_order_independence` for the verification of this.
345                futures::join!($( <$t as StoreRequest>::request(datastore), )*)
346            }
347        }
348    };
349
350    ($head:ident $($rest:ident)*) => {
351        impl_request_helper!(@impl $head $($rest)*);
352        impl_request_helper!($($rest)*);
353    };
354}
355
356impl_request_helper!(Z Y X W V U T);
357
358/// Macro helper to allow actors to return either a [`Result`] type or [`Infallible`] (and eventually [`!`]).
359#[diagnostic::on_unimplemented(
360    message = "#[veecle_os_runtime::actor] functions should return either a `Result<Infallible, _>` or `Infallible`",
361    label = "not a valid actor return type"
362)]
363pub trait IsActorResult: sealed::Sealed {
364    /// The error type this result converts into.
365    type Error;
366
367    /// Convert the result into an actual [`Result`] value.
368    fn into_result(self) -> Result<Infallible, Self::Error>;
369}
370
371impl<E> sealed::Sealed for Result<Infallible, E> {}
372
373impl<E> IsActorResult for Result<Infallible, E> {
374    type Error = E;
375
376    fn into_result(self) -> Result<Infallible, E> {
377        self
378    }
379}
380
381impl sealed::Sealed for Infallible {}
382
383impl IsActorResult for Infallible {
384    type Error = Infallible;
385
386    fn into_result(self) -> Result<Infallible, Self::Error> {
387        match self {}
388    }
389}
390
391#[cfg(test)]
392#[cfg_attr(coverage_nightly, coverage(off))]
393mod tests {
394    use core::future::Future;
395    use core::pin::pin;
396    use core::task::{Context, Poll};
397
398    use futures::future::FutureExt;
399
400    use crate::actor::{DatastoreExt, StoreRequest};
401    use crate::cons::{Cons, Nil};
402    use crate::datastore::{InitializedReader, Storable};
403
404    #[test]
405    fn multi_request_order_independence() {
406        #[derive(Debug, Storable)]
407        #[storable(crate = crate)]
408        struct A;
409
410        #[derive(Debug, Storable)]
411        #[storable(crate = crate)]
412        struct B;
413
414        let datastore = pin!(crate::execute::make_store::<Cons<A, Cons<B, Nil>>>());
415
416        let mut a_writer = datastore.as_ref().writer::<A>();
417        let mut b_writer = datastore.as_ref().writer::<B>();
418
419        // No matter the order these two request the readers, they should both resolve during the generation where the
420        // later of the two is first written.
421        let mut request_1 = pin!(<(InitializedReader<A>, InitializedReader<B>)>::request(
422            datastore.as_ref()
423        ));
424        let mut request_2 = pin!(<(InitializedReader<B>, InitializedReader<A>)>::request(
425            datastore.as_ref()
426        ));
427
428        let (request_1_waker, request_1_wake_count) = futures_test::task::new_count_waker();
429        let (request_2_waker, request_2_wake_count) = futures_test::task::new_count_waker();
430
431        let mut request_1_context = Context::from_waker(&request_1_waker);
432        let mut request_2_context = Context::from_waker(&request_2_waker);
433
434        assert!(matches!(
435            request_1.as_mut().poll(&mut request_1_context),
436            Poll::Pending
437        ));
438        assert!(matches!(
439            request_2.as_mut().poll(&mut request_2_context),
440            Poll::Pending
441        ));
442
443        let old_request_1_wake_count = request_1_wake_count.get();
444        let old_request_2_wake_count = request_2_wake_count.get();
445
446        datastore.as_ref().increment_generation();
447
448        a_writer.write(A).now_or_never().unwrap();
449
450        // When the first value is written, each future may or may not wake up, but if they do we need to poll them.
451        if request_1_wake_count.get() > old_request_1_wake_count {
452            assert!(matches!(
453                request_1.as_mut().poll(&mut request_1_context),
454                Poll::Pending
455            ));
456        }
457        if request_2_wake_count.get() > old_request_2_wake_count {
458            assert!(matches!(
459                request_2.as_mut().poll(&mut request_2_context),
460                Poll::Pending
461            ));
462        }
463
464        let old_request_1_wake_count = request_1_wake_count.get();
465        let old_request_2_wake_count = request_2_wake_count.get();
466
467        datastore.as_ref().increment_generation();
468
469        b_writer.write(B).now_or_never().unwrap();
470
471        // When the second value is written, both futures _must_ wake up and complete.
472        assert!(request_1_wake_count.get() > old_request_1_wake_count);
473        assert!(request_2_wake_count.get() > old_request_2_wake_count);
474
475        let Poll::Ready((mut request_1_a, mut request_1_b)) =
476            request_1.as_mut().poll(&mut request_1_context)
477        else {
478            panic!("request 1 was not ready")
479        };
480
481        let Poll::Ready((mut request_2_a, mut request_2_b)) =
482            request_2.as_mut().poll(&mut request_2_context)
483        else {
484            panic!("request 2 was not ready")
485        };
486
487        // All readers should see an update, since they've just been initialized but not `wait_for_update`d.
488        assert!(request_1_a.wait_for_update().now_or_never().is_some());
489        assert!(request_1_b.wait_for_update().now_or_never().is_some());
490
491        assert!(request_2_a.wait_for_update().now_or_never().is_some());
492        assert!(request_2_b.wait_for_update().now_or_never().is_some());
493    }
494}