Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::KeyedStream;
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::CycleId;
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26#[cfg(stageleft_runtime)]
27use crate::location::dynamic::{DynLocation, LocationId};
28use crate::location::tick::{Atomic, DeferTick, NoAtomic};
29use crate::location::{Location, NoTick, Tick, check_matching_location};
30use crate::nondet::{NonDet, nondet};
31use crate::prelude::manual_proof;
32use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
33
34pub mod networking;
35
36/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
37#[sealed::sealed]
38pub trait Ordering:
39    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
40{
41    /// The [`StreamOrder`] corresponding to this type.
42    const ORDERING_KIND: StreamOrder;
43}
44
45/// Marks the stream as being totally ordered, which means that there are
46/// no sources of non-determinism (other than intentional ones) that will
47/// affect the order of elements.
48pub enum TotalOrder {}
49
50#[sealed::sealed]
51impl Ordering for TotalOrder {
52    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
53}
54
55/// Marks the stream as having no order, which means that the order of
56/// elements may be affected by non-determinism.
57///
58/// This restricts certain operators, such as `fold` and `reduce`, to only
59/// be used with commutative aggregation functions.
60pub enum NoOrder {}
61
62#[sealed::sealed]
63impl Ordering for NoOrder {
64    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
65}
66
67/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
68/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
69/// have `Self` guarantees instead.
70#[sealed::sealed]
71pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
72#[sealed::sealed]
73impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
74
75/// Helper trait for determining the weakest of two orderings.
76#[sealed::sealed]
77pub trait MinOrder<Other: ?Sized> {
78    /// The weaker of the two orderings.
79    type Min: Ordering;
80}
81
82#[sealed::sealed]
83impl<O: Ordering> MinOrder<O> for TotalOrder {
84    type Min = O;
85}
86
87#[sealed::sealed]
88impl<O: Ordering> MinOrder<O> for NoOrder {
89    type Min = NoOrder;
90}
91
92/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
93#[sealed::sealed]
94pub trait Retries:
95    MinRetries<Self, Min = Self>
96    + MinRetries<ExactlyOnce, Min = Self>
97    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
98{
99    /// The [`StreamRetry`] corresponding to this type.
100    const RETRIES_KIND: StreamRetry;
101}
102
103/// Marks the stream as having deterministic message cardinality, with no
104/// possibility of duplicates.
105pub enum ExactlyOnce {}
106
107#[sealed::sealed]
108impl Retries for ExactlyOnce {
109    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
110}
111
112/// Marks the stream as having non-deterministic message cardinality, which
113/// means that duplicates may occur, but messages will not be dropped.
114pub enum AtLeastOnce {}
115
116#[sealed::sealed]
117impl Retries for AtLeastOnce {
118    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
119}
120
121/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
122/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
123/// have `Self` guarantees instead.
124#[sealed::sealed]
125pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
126#[sealed::sealed]
127impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
128
129/// Helper trait for determining the weakest of two retry guarantees.
130#[sealed::sealed]
131pub trait MinRetries<Other: ?Sized> {
132    /// The weaker of the two retry guarantees.
133    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
134}
135
136#[sealed::sealed]
137impl<R: Retries> MinRetries<R> for ExactlyOnce {
138    type Min = R;
139}
140
141#[sealed::sealed]
142impl<R: Retries> MinRetries<R> for AtLeastOnce {
143    type Min = AtLeastOnce;
144}
145
146#[sealed::sealed]
147#[diagnostic::on_unimplemented(
148    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
149    label = "required here",
150    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
151)]
152/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
153pub trait IsOrdered: Ordering {}
154
155#[sealed::sealed]
156#[diagnostic::do_not_recommend]
157impl IsOrdered for TotalOrder {}
158
159#[sealed::sealed]
160#[diagnostic::on_unimplemented(
161    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
162    label = "required here",
163    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
164)]
165/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
166pub trait IsExactlyOnce: Retries {}
167
168#[sealed::sealed]
169#[diagnostic::do_not_recommend]
170impl IsExactlyOnce for ExactlyOnce {}
171
172/// Streaming sequence of elements with type `Type`.
173///
174/// This live collection represents a growing sequence of elements, with new elements being
175/// asynchronously appended to the end of the sequence. This can be used to model the arrival
176/// of network input, such as API requests, or streaming ingestion.
177///
178/// By default, all streams have deterministic ordering and each element is materialized exactly
179/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
180/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
181/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
182///
183/// Type Parameters:
184/// - `Type`: the type of elements in the stream
185/// - `Loc`: the location where the stream is being materialized
186/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
187/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
188///   (default is [`TotalOrder`])
189/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
190///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
191pub struct Stream<
192    Type,
193    Loc,
194    Bound: Boundedness = Unbounded,
195    Order: Ordering = TotalOrder,
196    Retry: Retries = ExactlyOnce,
197> {
198    pub(crate) location: Loc,
199    pub(crate) ir_node: RefCell<HydroNode>,
200
201    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
202}
203
204impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
205    for Stream<T, L, Unbounded, O, R>
206where
207    L: Location<'a>,
208{
209    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
210        let new_meta = stream
211            .location
212            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
213
214        Stream {
215            location: stream.location,
216            ir_node: RefCell::new(HydroNode::Cast {
217                inner: Box::new(stream.ir_node.into_inner()),
218                metadata: new_meta,
219            }),
220            _phantom: PhantomData,
221        }
222    }
223}
224
225impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
226    for Stream<T, L, B, NoOrder, R>
227where
228    L: Location<'a>,
229{
230    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
231        stream.weaken_ordering()
232    }
233}
234
235impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
236    for Stream<T, L, B, O, AtLeastOnce>
237where
238    L: Location<'a>,
239{
240    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
241        stream.weaken_retries()
242    }
243}
244
245impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
246where
247    L: Location<'a>,
248{
249    fn defer_tick(self) -> Self {
250        Stream::defer_tick(self)
251    }
252}
253
254impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
255    for Stream<T, Tick<L>, Bounded, O, R>
256where
257    L: Location<'a>,
258{
259    type Location = Tick<L>;
260
261    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
262        Stream::new(
263            location.clone(),
264            HydroNode::CycleSource {
265                cycle_id,
266                metadata: location.new_node_metadata(Self::collection_kind()),
267            },
268        )
269    }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
273    for Stream<T, Tick<L>, Bounded, O, R>
274where
275    L: Location<'a>,
276{
277    type Location = Tick<L>;
278
279    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
280        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
281            location.clone(),
282            HydroNode::DeferTick {
283                input: Box::new(HydroNode::CycleSource {
284                    cycle_id,
285                    metadata: location.new_node_metadata(Self::collection_kind()),
286                }),
287                metadata: location.new_node_metadata(Self::collection_kind()),
288            },
289        );
290
291        from_previous_tick.chain(initial.filter_if_some(location.optional_first_tick(q!(()))))
292    }
293}
294
295impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
296    for Stream<T, Tick<L>, Bounded, O, R>
297where
298    L: Location<'a>,
299{
300    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
301        assert_eq!(
302            Location::id(&self.location),
303            expected_location,
304            "locations do not match"
305        );
306        self.location
307            .flow_state()
308            .borrow_mut()
309            .push_root(HydroRoot::CycleSink {
310                cycle_id,
311                input: Box::new(self.ir_node.into_inner()),
312                op_metadata: HydroIrOpMetadata::new(),
313            });
314    }
315}
316
317impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
318    for Stream<T, L, B, O, R>
319where
320    L: Location<'a> + NoTick,
321{
322    type Location = L;
323
324    fn create_source(cycle_id: CycleId, location: L) -> Self {
325        Stream::new(
326            location.clone(),
327            HydroNode::CycleSource {
328                cycle_id,
329                metadata: location.new_node_metadata(Self::collection_kind()),
330            },
331        )
332    }
333}
334
335impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
336    for Stream<T, L, B, O, R>
337where
338    L: Location<'a> + NoTick,
339{
340    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
341        assert_eq!(
342            Location::id(&self.location),
343            expected_location,
344            "locations do not match"
345        );
346        self.location
347            .flow_state()
348            .borrow_mut()
349            .push_root(HydroRoot::CycleSink {
350                cycle_id,
351                input: Box::new(self.ir_node.into_inner()),
352                op_metadata: HydroIrOpMetadata::new(),
353            });
354    }
355}
356
357impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
358where
359    T: Clone,
360    L: Location<'a>,
361{
362    fn clone(&self) -> Self {
363        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
364            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
365            *self.ir_node.borrow_mut() = HydroNode::Tee {
366                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
367                metadata: self.location.new_node_metadata(Self::collection_kind()),
368            };
369        }
370
371        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
372            Stream {
373                location: self.location.clone(),
374                ir_node: HydroNode::Tee {
375                    inner: SharedNode(inner.0.clone()),
376                    metadata: metadata.clone(),
377                }
378                .into(),
379                _phantom: PhantomData,
380            }
381        } else {
382            unreachable!()
383        }
384    }
385}
386
387impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
388where
389    L: Location<'a>,
390{
391    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
392        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
393        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
394
395        Stream {
396            location,
397            ir_node: RefCell::new(ir_node),
398            _phantom: PhantomData,
399        }
400    }
401
402    /// Returns the [`Location`] where this stream is being materialized.
403    pub fn location(&self) -> &L {
404        &self.location
405    }
406
407    pub(crate) fn collection_kind() -> CollectionKind {
408        CollectionKind::Stream {
409            bound: B::BOUND_KIND,
410            order: O::ORDERING_KIND,
411            retry: R::RETRIES_KIND,
412            element_type: quote_type::<T>().into(),
413        }
414    }
415
416    /// Produces a stream based on invoking `f` on each element.
417    /// If you do not want to modify the stream and instead only want to view
418    /// each item use [`Stream::inspect`] instead.
419    ///
420    /// # Example
421    /// ```rust
422    /// # #[cfg(feature = "deploy")] {
423    /// # use hydro_lang::prelude::*;
424    /// # use futures::StreamExt;
425    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
426    /// let words = process.source_iter(q!(vec!["hello", "world"]));
427    /// words.map(q!(|x| x.to_uppercase()))
428    /// # }, |mut stream| async move {
429    /// # for w in vec!["HELLO", "WORLD"] {
430    /// #     assert_eq!(stream.next().await.unwrap(), w);
431    /// # }
432    /// # }));
433    /// # }
434    /// ```
435    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
436    where
437        F: Fn(T) -> U + 'a,
438    {
439        let f = f.splice_fn1_ctx(&self.location).into();
440        Stream::new(
441            self.location.clone(),
442            HydroNode::Map {
443                f,
444                input: Box::new(self.ir_node.into_inner()),
445                metadata: self
446                    .location
447                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
448            },
449        )
450    }
451
452    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
453    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
454    /// for the output type `U` must produce items in a **deterministic** order.
455    ///
456    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
457    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
458    ///
459    /// # Example
460    /// ```rust
461    /// # #[cfg(feature = "deploy")] {
462    /// # use hydro_lang::prelude::*;
463    /// # use futures::StreamExt;
464    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465    /// process
466    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
467    ///     .flat_map_ordered(q!(|x| x))
468    /// # }, |mut stream| async move {
469    /// // 1, 2, 3, 4
470    /// # for w in (1..5) {
471    /// #     assert_eq!(stream.next().await.unwrap(), w);
472    /// # }
473    /// # }));
474    /// # }
475    /// ```
476    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
477    where
478        I: IntoIterator<Item = U>,
479        F: Fn(T) -> I + 'a,
480    {
481        let f = f.splice_fn1_ctx(&self.location).into();
482        Stream::new(
483            self.location.clone(),
484            HydroNode::FlatMap {
485                f,
486                input: Box::new(self.ir_node.into_inner()),
487                metadata: self
488                    .location
489                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
490            },
491        )
492    }
493
494    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
495    /// for the output type `U` to produce items in any order.
496    ///
497    /// # Example
498    /// ```rust
499    /// # #[cfg(feature = "deploy")] {
500    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
501    /// # use futures::StreamExt;
502    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
503    /// process
504    ///     .source_iter(q!(vec![
505    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
506    ///         std::collections::HashSet::from_iter(vec![3, 4]),
507    ///     ]))
508    ///     .flat_map_unordered(q!(|x| x))
509    /// # }, |mut stream| async move {
510    /// // 1, 2, 3, 4, but in no particular order
511    /// # let mut results = Vec::new();
512    /// # for w in (1..5) {
513    /// #     results.push(stream.next().await.unwrap());
514    /// # }
515    /// # results.sort();
516    /// # assert_eq!(results, vec![1, 2, 3, 4]);
517    /// # }));
518    /// # }
519    /// ```
520    pub fn flat_map_unordered<U, I, F>(
521        self,
522        f: impl IntoQuotedMut<'a, F, L>,
523    ) -> Stream<U, L, B, NoOrder, R>
524    where
525        I: IntoIterator<Item = U>,
526        F: Fn(T) -> I + 'a,
527    {
528        let f = f.splice_fn1_ctx(&self.location).into();
529        Stream::new(
530            self.location.clone(),
531            HydroNode::FlatMap {
532                f,
533                input: Box::new(self.ir_node.into_inner()),
534                metadata: self
535                    .location
536                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
537            },
538        )
539    }
540
541    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
542    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
543    ///
544    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
545    /// not deterministic, use [`Stream::flatten_unordered`] instead.
546    ///
547    /// ```rust
548    /// # #[cfg(feature = "deploy")] {
549    /// # use hydro_lang::prelude::*;
550    /// # use futures::StreamExt;
551    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
552    /// process
553    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
554    ///     .flatten_ordered()
555    /// # }, |mut stream| async move {
556    /// // 1, 2, 3, 4
557    /// # for w in (1..5) {
558    /// #     assert_eq!(stream.next().await.unwrap(), w);
559    /// # }
560    /// # }));
561    /// # }
562    /// ```
563    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
564    where
565        T: IntoIterator<Item = U>,
566    {
567        self.flat_map_ordered(q!(|d| d))
568    }
569
570    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
571    /// for the element type `T` to produce items in any order.
572    ///
573    /// # Example
574    /// ```rust
575    /// # #[cfg(feature = "deploy")] {
576    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
577    /// # use futures::StreamExt;
578    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
579    /// process
580    ///     .source_iter(q!(vec![
581    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
582    ///         std::collections::HashSet::from_iter(vec![3, 4]),
583    ///     ]))
584    ///     .flatten_unordered()
585    /// # }, |mut stream| async move {
586    /// // 1, 2, 3, 4, but in no particular order
587    /// # let mut results = Vec::new();
588    /// # for w in (1..5) {
589    /// #     results.push(stream.next().await.unwrap());
590    /// # }
591    /// # results.sort();
592    /// # assert_eq!(results, vec![1, 2, 3, 4]);
593    /// # }));
594    /// # }
595    /// ```
596    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
597    where
598        T: IntoIterator<Item = U>,
599    {
600        self.flat_map_unordered(q!(|d| d))
601    }
602
603    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
604    /// `f`, preserving the order of the elements.
605    ///
606    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
607    /// not modify or take ownership of the values. If you need to modify the values while filtering
608    /// use [`Stream::filter_map`] instead.
609    ///
610    /// # Example
611    /// ```rust
612    /// # #[cfg(feature = "deploy")] {
613    /// # use hydro_lang::prelude::*;
614    /// # use futures::StreamExt;
615    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
616    /// process
617    ///     .source_iter(q!(vec![1, 2, 3, 4]))
618    ///     .filter(q!(|&x| x > 2))
619    /// # }, |mut stream| async move {
620    /// // 3, 4
621    /// # for w in (3..5) {
622    /// #     assert_eq!(stream.next().await.unwrap(), w);
623    /// # }
624    /// # }));
625    /// # }
626    /// ```
627    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
628    where
629        F: Fn(&T) -> bool + 'a,
630    {
631        let f = f.splice_fn1_borrow_ctx(&self.location).into();
632        Stream::new(
633            self.location.clone(),
634            HydroNode::Filter {
635                f,
636                input: Box::new(self.ir_node.into_inner()),
637                metadata: self.location.new_node_metadata(Self::collection_kind()),
638            },
639        )
640    }
641
642    /// Splits the stream into two streams based on a predicate, without cloning elements.
643    ///
644    /// Elements for which `f` returns `true` are sent to the first output stream,
645    /// and elements for which `f` returns `false` are sent to the second output stream.
646    ///
647    /// Unlike using `filter` twice, this only evaluates the predicate once per element
648    /// and does not require `T: Clone`.
649    ///
650    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
651    /// the predicate is only used for routing; the element itself is moved to the
652    /// appropriate output stream.
653    ///
654    /// # Example
655    /// ```rust
656    /// # #[cfg(feature = "deploy")] {
657    /// # use hydro_lang::prelude::*;
658    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
659    /// # use futures::StreamExt;
660    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
661    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
662    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
663    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
664    /// evens.map(q!(|x| (x, true)))
665    ///     .interleave(odds.map(q!(|x| (x, false))))
666    /// # }, |mut stream| async move {
667    /// # let mut results = Vec::new();
668    /// # for _ in 0..6 {
669    /// #     results.push(stream.next().await.unwrap());
670    /// # }
671    /// # results.sort();
672    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
673    /// # }));
674    /// # }
675    /// ```
676    #[expect(
677        clippy::type_complexity,
678        reason = "return type mirrors the input stream type"
679    )]
680    pub fn partition<F>(
681        self,
682        f: impl IntoQuotedMut<'a, F, L>,
683    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
684    where
685        F: Fn(&T) -> bool + 'a,
686    {
687        let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
688        let shared = SharedNode(Rc::new(RefCell::new(self.ir_node.into_inner())));
689
690        let true_stream = Stream::new(
691            self.location.clone(),
692            HydroNode::Partition {
693                inner: SharedNode(shared.0.clone()),
694                f: f.clone(),
695                is_true: true,
696                metadata: self.location.new_node_metadata(Self::collection_kind()),
697            },
698        );
699
700        let false_stream = Stream::new(
701            self.location.clone(),
702            HydroNode::Partition {
703                inner: SharedNode(shared.0),
704                f,
705                is_true: false,
706                metadata: self.location.new_node_metadata(Self::collection_kind()),
707            },
708        );
709
710        (true_stream, false_stream)
711    }
712
713    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
714    ///
715    /// # Example
716    /// ```rust
717    /// # #[cfg(feature = "deploy")] {
718    /// # use hydro_lang::prelude::*;
719    /// # use futures::StreamExt;
720    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
721    /// process
722    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
723    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
724    /// # }, |mut stream| async move {
725    /// // 1, 2
726    /// # for w in (1..3) {
727    /// #     assert_eq!(stream.next().await.unwrap(), w);
728    /// # }
729    /// # }));
730    /// # }
731    /// ```
732    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
733    where
734        F: Fn(T) -> Option<U> + 'a,
735    {
736        let f = f.splice_fn1_ctx(&self.location).into();
737        Stream::new(
738            self.location.clone(),
739            HydroNode::FilterMap {
740                f,
741                input: Box::new(self.ir_node.into_inner()),
742                metadata: self
743                    .location
744                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
745            },
746        )
747    }
748
749    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
750    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
751    /// If `other` is an empty [`Optional`], no values will be produced.
752    ///
753    /// # Example
754    /// ```rust
755    /// # #[cfg(feature = "deploy")] {
756    /// # use hydro_lang::prelude::*;
757    /// # use futures::StreamExt;
758    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
759    /// let tick = process.tick();
760    /// let batch = process
761    ///   .source_iter(q!(vec![1, 2, 3, 4]))
762    ///   .batch(&tick, nondet!(/** test */));
763    /// let count = batch.clone().count(); // `count()` returns a singleton
764    /// batch.cross_singleton(count).all_ticks()
765    /// # }, |mut stream| async move {
766    /// // (1, 4), (2, 4), (3, 4), (4, 4)
767    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
768    /// #     assert_eq!(stream.next().await.unwrap(), w);
769    /// # }
770    /// # }));
771    /// # }
772    /// ```
773    pub fn cross_singleton<O2>(
774        self,
775        other: impl Into<Optional<O2, L, Bounded>>,
776    ) -> Stream<(T, O2), L, B, O, R>
777    where
778        O2: Clone,
779    {
780        let other: Optional<O2, L, Bounded> = other.into();
781        check_matching_location(&self.location, &other.location);
782
783        Stream::new(
784            self.location.clone(),
785            HydroNode::CrossSingleton {
786                left: Box::new(self.ir_node.into_inner()),
787                right: Box::new(other.ir_node.into_inner()),
788                metadata: self
789                    .location
790                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
791            },
792        )
793    }
794
795    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
796    ///
797    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
798    /// leader of a cluster.
799    ///
800    /// # Example
801    /// ```rust
802    /// # #[cfg(feature = "deploy")] {
803    /// # use hydro_lang::prelude::*;
804    /// # use futures::StreamExt;
805    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
806    /// let tick = process.tick();
807    /// // ticks are lazy by default, forces the second tick to run
808    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
809    ///
810    /// let batch_first_tick = process
811    ///   .source_iter(q!(vec![1, 2, 3, 4]))
812    ///   .batch(&tick, nondet!(/** test */));
813    /// let batch_second_tick = process
814    ///   .source_iter(q!(vec![5, 6, 7, 8]))
815    ///   .batch(&tick, nondet!(/** test */))
816    ///   .defer_tick(); // appears on the second tick
817    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
818    /// batch_first_tick.chain(batch_second_tick)
819    ///   .filter_if_some(some_on_first_tick)
820    ///   .all_ticks()
821    /// # }, |mut stream| async move {
822    /// // [1, 2, 3, 4]
823    /// # for w in vec![1, 2, 3, 4] {
824    /// #     assert_eq!(stream.next().await.unwrap(), w);
825    /// # }
826    /// # }));
827    /// # }
828    /// ```
829    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
830        self.cross_singleton(signal.map(q!(|_u| ())))
831            .map(q!(|(d, _signal)| d))
832    }
833
834    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
835    ///
836    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
837    /// some local state.
838    ///
839    /// # Example
840    /// ```rust
841    /// # #[cfg(feature = "deploy")] {
842    /// # use hydro_lang::prelude::*;
843    /// # use futures::StreamExt;
844    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
845    /// let tick = process.tick();
846    /// // ticks are lazy by default, forces the second tick to run
847    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
848    ///
849    /// let batch_first_tick = process
850    ///   .source_iter(q!(vec![1, 2, 3, 4]))
851    ///   .batch(&tick, nondet!(/** test */));
852    /// let batch_second_tick = process
853    ///   .source_iter(q!(vec![5, 6, 7, 8]))
854    ///   .batch(&tick, nondet!(/** test */))
855    ///   .defer_tick(); // appears on the second tick
856    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
857    /// batch_first_tick.chain(batch_second_tick)
858    ///   .filter_if_none(some_on_first_tick)
859    ///   .all_ticks()
860    /// # }, |mut stream| async move {
861    /// // [5, 6, 7, 8]
862    /// # for w in vec![5, 6, 7, 8] {
863    /// #     assert_eq!(stream.next().await.unwrap(), w);
864    /// # }
865    /// # }));
866    /// # }
867    /// ```
868    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
869        self.filter_if_some(
870            other
871                .map(q!(|_| ()))
872                .into_singleton()
873                .filter(q!(|o| o.is_none())),
874        )
875    }
876
877    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
878    /// tupled pairs in a non-deterministic order.
879    ///
880    /// # Example
881    /// ```rust
882    /// # #[cfg(feature = "deploy")] {
883    /// # use hydro_lang::prelude::*;
884    /// # use std::collections::HashSet;
885    /// # use futures::StreamExt;
886    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
887    /// let tick = process.tick();
888    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
889    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
890    /// stream1.cross_product(stream2)
891    /// # }, |mut stream| async move {
892    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
893    /// # stream.map(|i| assert!(expected.contains(&i)));
894    /// # }));
895    /// # }
896    /// ```
897    pub fn cross_product<T2, O2: Ordering>(
898        self,
899        other: Stream<T2, L, B, O2, R>,
900    ) -> Stream<(T, T2), L, B, NoOrder, R>
901    where
902        T: Clone,
903        T2: Clone,
904    {
905        check_matching_location(&self.location, &other.location);
906
907        Stream::new(
908            self.location.clone(),
909            HydroNode::CrossProduct {
910                left: Box::new(self.ir_node.into_inner()),
911                right: Box::new(other.ir_node.into_inner()),
912                metadata: self
913                    .location
914                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
915            },
916        )
917    }
918
919    /// Takes one stream as input and filters out any duplicate occurrences. The output
920    /// contains all unique values from the input.
921    ///
922    /// # Example
923    /// ```rust
924    /// # #[cfg(feature = "deploy")] {
925    /// # use hydro_lang::prelude::*;
926    /// # use futures::StreamExt;
927    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
928    /// let tick = process.tick();
929    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
930    /// # }, |mut stream| async move {
931    /// # for w in vec![1, 2, 3, 4] {
932    /// #     assert_eq!(stream.next().await.unwrap(), w);
933    /// # }
934    /// # }));
935    /// # }
936    /// ```
937    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
938    where
939        T: Eq + Hash,
940    {
941        Stream::new(
942            self.location.clone(),
943            HydroNode::Unique {
944                input: Box::new(self.ir_node.into_inner()),
945                metadata: self
946                    .location
947                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
948            },
949        )
950    }
951
952    /// Outputs everything in this stream that is *not* contained in the `other` stream.
953    ///
954    /// The `other` stream must be [`Bounded`], since this function will wait until
955    /// all its elements are available before producing any output.
956    /// # Example
957    /// ```rust
958    /// # #[cfg(feature = "deploy")] {
959    /// # use hydro_lang::prelude::*;
960    /// # use futures::StreamExt;
961    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
962    /// let tick = process.tick();
963    /// let stream = process
964    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
965    ///   .batch(&tick, nondet!(/** test */));
966    /// let batch = process
967    ///   .source_iter(q!(vec![1, 2]))
968    ///   .batch(&tick, nondet!(/** test */));
969    /// stream.filter_not_in(batch).all_ticks()
970    /// # }, |mut stream| async move {
971    /// # for w in vec![3, 4] {
972    /// #     assert_eq!(stream.next().await.unwrap(), w);
973    /// # }
974    /// # }));
975    /// # }
976    /// ```
977    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
978    where
979        T: Eq + Hash,
980        B2: IsBounded,
981    {
982        check_matching_location(&self.location, &other.location);
983
984        Stream::new(
985            self.location.clone(),
986            HydroNode::Difference {
987                pos: Box::new(self.ir_node.into_inner()),
988                neg: Box::new(other.ir_node.into_inner()),
989                metadata: self
990                    .location
991                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
992            },
993        )
994    }
995
996    /// An operator which allows you to "inspect" each element of a stream without
997    /// modifying it. The closure `f` is called on a reference to each item. This is
998    /// mainly useful for debugging, and should not be used to generate side-effects.
999    ///
1000    /// # Example
1001    /// ```rust
1002    /// # #[cfg(feature = "deploy")] {
1003    /// # use hydro_lang::prelude::*;
1004    /// # use futures::StreamExt;
1005    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1006    /// let nums = process.source_iter(q!(vec![1, 2]));
1007    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1008    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1009    /// # }, |mut stream| async move {
1010    /// # for w in vec![1, 2] {
1011    /// #     assert_eq!(stream.next().await.unwrap(), w);
1012    /// # }
1013    /// # }));
1014    /// # }
1015    /// ```
1016    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1017    where
1018        F: Fn(&T) + 'a,
1019    {
1020        let f = f.splice_fn1_borrow_ctx(&self.location).into();
1021
1022        Stream::new(
1023            self.location.clone(),
1024            HydroNode::Inspect {
1025                f,
1026                input: Box::new(self.ir_node.into_inner()),
1027                metadata: self.location.new_node_metadata(Self::collection_kind()),
1028            },
1029        )
1030    }
1031
1032    /// Executes the provided closure for every element in this stream.
1033    ///
1034    /// Because the closure may have side effects, the stream must have deterministic order
1035    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1036    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1037    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1038    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1039    where
1040        O: IsOrdered,
1041        R: IsExactlyOnce,
1042    {
1043        let f = f.splice_fn1_ctx(&self.location).into();
1044        self.location
1045            .flow_state()
1046            .borrow_mut()
1047            .push_root(HydroRoot::ForEach {
1048                input: Box::new(self.ir_node.into_inner()),
1049                f,
1050                op_metadata: HydroIrOpMetadata::new(),
1051            });
1052    }
1053
1054    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1055    /// TCP socket to some other server. You should _not_ use this API for interacting with
1056    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1057    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1058    /// interaction with asynchronous sinks.
1059    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1060    where
1061        O: IsOrdered,
1062        R: IsExactlyOnce,
1063        S: 'a + futures::Sink<T> + Unpin,
1064    {
1065        self.location
1066            .flow_state()
1067            .borrow_mut()
1068            .push_root(HydroRoot::DestSink {
1069                sink: sink.splice_typed_ctx(&self.location).into(),
1070                input: Box::new(self.ir_node.into_inner()),
1071                op_metadata: HydroIrOpMetadata::new(),
1072            });
1073    }
1074
1075    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1076    ///
1077    /// # Example
1078    /// ```rust
1079    /// # #[cfg(feature = "deploy")] {
1080    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1081    /// # use futures::StreamExt;
1082    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1083    /// let tick = process.tick();
1084    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1085    /// numbers.enumerate()
1086    /// # }, |mut stream| async move {
1087    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1088    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1089    /// #     assert_eq!(stream.next().await.unwrap(), w);
1090    /// # }
1091    /// # }));
1092    /// # }
1093    /// ```
1094    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1095    where
1096        O: IsOrdered,
1097        R: IsExactlyOnce,
1098    {
1099        Stream::new(
1100            self.location.clone(),
1101            HydroNode::Enumerate {
1102                input: Box::new(self.ir_node.into_inner()),
1103                metadata: self.location.new_node_metadata(Stream::<
1104                    (usize, T),
1105                    L,
1106                    B,
1107                    TotalOrder,
1108                    ExactlyOnce,
1109                >::collection_kind()),
1110            },
1111        )
1112    }
1113
1114    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1115    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1116    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1117    ///
1118    /// Depending on the input stream guarantees, the closure may need to be commutative
1119    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1120    ///
1121    /// # Example
1122    /// ```rust
1123    /// # #[cfg(feature = "deploy")] {
1124    /// # use hydro_lang::prelude::*;
1125    /// # use futures::StreamExt;
1126    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1127    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1128    /// words
1129    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1130    ///     .into_stream()
1131    /// # }, |mut stream| async move {
1132    /// // "HELLOWORLD"
1133    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1134    /// # }));
1135    /// # }
1136    /// ```
1137    pub fn fold<A, I, F, C, Idemp>(
1138        self,
1139        init: impl IntoQuotedMut<'a, I, L>,
1140        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1141    ) -> Singleton<A, L, B>
1142    where
1143        I: Fn() -> A + 'a,
1144        F: Fn(&mut A, T),
1145        C: ValidCommutativityFor<O>,
1146        Idemp: ValidIdempotenceFor<R>,
1147    {
1148        let init = init.splice_fn0_ctx(&self.location).into();
1149        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1150        proof.register_proof(&comb);
1151
1152        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1153        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1154
1155        let core = HydroNode::Fold {
1156            init,
1157            acc: comb.into(),
1158            input: Box::new(ordered_etc.ir_node.into_inner()),
1159            metadata: ordered_etc
1160                .location
1161                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1162        };
1163
1164        Singleton::new(ordered_etc.location, core)
1165    }
1166
1167    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1168    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1169    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1170    /// reference, so that it can be modified in place.
1171    ///
1172    /// Depending on the input stream guarantees, the closure may need to be commutative
1173    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1174    ///
1175    /// # Example
1176    /// ```rust
1177    /// # #[cfg(feature = "deploy")] {
1178    /// # use hydro_lang::prelude::*;
1179    /// # use futures::StreamExt;
1180    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1181    /// let bools = process.source_iter(q!(vec![false, true, false]));
1182    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1183    /// # }, |mut stream| async move {
1184    /// // true
1185    /// # assert_eq!(stream.next().await.unwrap(), true);
1186    /// # }));
1187    /// # }
1188    /// ```
1189    pub fn reduce<F, C, Idemp>(
1190        self,
1191        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1192    ) -> Optional<T, L, B>
1193    where
1194        F: Fn(&mut T, T) + 'a,
1195        C: ValidCommutativityFor<O>,
1196        Idemp: ValidIdempotenceFor<R>,
1197    {
1198        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1199        proof.register_proof(&f);
1200
1201        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1202        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1203
1204        let core = HydroNode::Reduce {
1205            f: f.into(),
1206            input: Box::new(ordered_etc.ir_node.into_inner()),
1207            metadata: ordered_etc
1208                .location
1209                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1210        };
1211
1212        Optional::new(ordered_etc.location, core)
1213    }
1214
1215    /// Computes the maximum element in the stream as an [`Optional`], which
1216    /// will be empty until the first element in the input arrives.
1217    ///
1218    /// # Example
1219    /// ```rust
1220    /// # #[cfg(feature = "deploy")] {
1221    /// # use hydro_lang::prelude::*;
1222    /// # use futures::StreamExt;
1223    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1224    /// let tick = process.tick();
1225    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1226    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1227    /// batch.max().all_ticks()
1228    /// # }, |mut stream| async move {
1229    /// // 4
1230    /// # assert_eq!(stream.next().await.unwrap(), 4);
1231    /// # }));
1232    /// # }
1233    /// ```
1234    pub fn max(self) -> Optional<T, L, B>
1235    where
1236        T: Ord,
1237    {
1238        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1239            .assume_ordering_trusted_bounded::<TotalOrder>(
1240                nondet!(/** max is commutative, but order affects intermediates */),
1241            )
1242            .reduce(q!(|curr, new| {
1243                if new > *curr {
1244                    *curr = new;
1245                }
1246            }))
1247    }
1248
1249    /// Computes the minimum element in the stream as an [`Optional`], which
1250    /// will be empty until the first element in the input arrives.
1251    ///
1252    /// # Example
1253    /// ```rust
1254    /// # #[cfg(feature = "deploy")] {
1255    /// # use hydro_lang::prelude::*;
1256    /// # use futures::StreamExt;
1257    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1258    /// let tick = process.tick();
1259    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1260    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1261    /// batch.min().all_ticks()
1262    /// # }, |mut stream| async move {
1263    /// // 1
1264    /// # assert_eq!(stream.next().await.unwrap(), 1);
1265    /// # }));
1266    /// # }
1267    /// ```
1268    pub fn min(self) -> Optional<T, L, B>
1269    where
1270        T: Ord,
1271    {
1272        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1273            .assume_ordering_trusted_bounded::<TotalOrder>(
1274                nondet!(/** max is commutative, but order affects intermediates */),
1275            )
1276            .reduce(q!(|curr, new| {
1277                if new < *curr {
1278                    *curr = new;
1279                }
1280            }))
1281    }
1282
1283    /// Computes the first element in the stream as an [`Optional`], which
1284    /// will be empty until the first element in the input arrives.
1285    ///
1286    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1287    /// re-ordering of elements may cause the first element to change.
1288    ///
1289    /// # Example
1290    /// ```rust
1291    /// # #[cfg(feature = "deploy")] {
1292    /// # use hydro_lang::prelude::*;
1293    /// # use futures::StreamExt;
1294    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1295    /// let tick = process.tick();
1296    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1297    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1298    /// batch.first().all_ticks()
1299    /// # }, |mut stream| async move {
1300    /// // 1
1301    /// # assert_eq!(stream.next().await.unwrap(), 1);
1302    /// # }));
1303    /// # }
1304    /// ```
1305    pub fn first(self) -> Optional<T, L, B>
1306    where
1307        O: IsOrdered,
1308    {
1309        self.make_totally_ordered()
1310            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1311            .reduce(q!(|_, _| {}))
1312    }
1313
1314    /// Computes the last element in the stream as an [`Optional`], which
1315    /// will be empty until an element in the input arrives.
1316    ///
1317    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1318    /// re-ordering of elements may cause the last element to change.
1319    ///
1320    /// # Example
1321    /// ```rust
1322    /// # #[cfg(feature = "deploy")] {
1323    /// # use hydro_lang::prelude::*;
1324    /// # use futures::StreamExt;
1325    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1326    /// let tick = process.tick();
1327    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1328    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1329    /// batch.last().all_ticks()
1330    /// # }, |mut stream| async move {
1331    /// // 4
1332    /// # assert_eq!(stream.next().await.unwrap(), 4);
1333    /// # }));
1334    /// # }
1335    /// ```
1336    pub fn last(self) -> Optional<T, L, B>
1337    where
1338        O: IsOrdered,
1339    {
1340        self.make_totally_ordered()
1341            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1342            .reduce(q!(|curr, new| *curr = new))
1343    }
1344
1345    /// Collects all the elements of this stream into a single [`Vec`] element.
1346    ///
1347    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1348    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1349    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1350    /// the vector at an arbitrary point in time.
1351    ///
1352    /// # Example
1353    /// ```rust
1354    /// # #[cfg(feature = "deploy")] {
1355    /// # use hydro_lang::prelude::*;
1356    /// # use futures::StreamExt;
1357    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1358    /// let tick = process.tick();
1359    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1360    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1361    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1362    /// # }, |mut stream| async move {
1363    /// // [ vec![1, 2, 3, 4] ]
1364    /// # for w in vec![vec![1, 2, 3, 4]] {
1365    /// #     assert_eq!(stream.next().await.unwrap(), w);
1366    /// # }
1367    /// # }));
1368    /// # }
1369    /// ```
1370    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1371    where
1372        O: IsOrdered,
1373        R: IsExactlyOnce,
1374    {
1375        self.make_totally_ordered().make_exactly_once().fold(
1376            q!(|| vec![]),
1377            q!(|acc, v| {
1378                acc.push(v);
1379            }),
1380        )
1381    }
1382
1383    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1384    /// and emitting each intermediate result.
1385    ///
1386    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1387    /// containing all intermediate accumulated values. The scan operation can also terminate early
1388    /// by returning `None`.
1389    ///
1390    /// The function takes a mutable reference to the accumulator and the current element, and returns
1391    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1392    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1393    ///
1394    /// # Examples
1395    ///
1396    /// Basic usage - running sum:
1397    /// ```rust
1398    /// # #[cfg(feature = "deploy")] {
1399    /// # use hydro_lang::prelude::*;
1400    /// # use futures::StreamExt;
1401    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1402    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1403    ///     q!(|| 0),
1404    ///     q!(|acc, x| {
1405    ///         *acc += x;
1406    ///         Some(*acc)
1407    ///     }),
1408    /// )
1409    /// # }, |mut stream| async move {
1410    /// // Output: 1, 3, 6, 10
1411    /// # for w in vec![1, 3, 6, 10] {
1412    /// #     assert_eq!(stream.next().await.unwrap(), w);
1413    /// # }
1414    /// # }));
1415    /// # }
1416    /// ```
1417    ///
1418    /// Early termination example:
1419    /// ```rust
1420    /// # #[cfg(feature = "deploy")] {
1421    /// # use hydro_lang::prelude::*;
1422    /// # use futures::StreamExt;
1423    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1424    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1425    ///     q!(|| 1),
1426    ///     q!(|state, x| {
1427    ///         *state = *state * x;
1428    ///         if *state > 6 {
1429    ///             None // Terminate the stream
1430    ///         } else {
1431    ///             Some(-*state)
1432    ///         }
1433    ///     }),
1434    /// )
1435    /// # }, |mut stream| async move {
1436    /// // Output: -1, -2, -6
1437    /// # for w in vec![-1, -2, -6] {
1438    /// #     assert_eq!(stream.next().await.unwrap(), w);
1439    /// # }
1440    /// # }));
1441    /// # }
1442    /// ```
1443    pub fn scan<A, U, I, F>(
1444        self,
1445        init: impl IntoQuotedMut<'a, I, L>,
1446        f: impl IntoQuotedMut<'a, F, L>,
1447    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1448    where
1449        O: IsOrdered,
1450        R: IsExactlyOnce,
1451        I: Fn() -> A + 'a,
1452        F: Fn(&mut A, T) -> Option<U> + 'a,
1453    {
1454        let init = init.splice_fn0_ctx(&self.location).into();
1455        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1456
1457        Stream::new(
1458            self.location.clone(),
1459            HydroNode::Scan {
1460                init,
1461                acc: f,
1462                input: Box::new(self.ir_node.into_inner()),
1463                metadata: self.location.new_node_metadata(
1464                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1465                ),
1466            },
1467        )
1468    }
1469
1470    /// Given a time interval, returns a stream corresponding to samples taken from the
1471    /// stream roughly at that interval. The output will have elements in the same order
1472    /// as the input, but with arbitrary elements skipped between samples. There is also
1473    /// no guarantee on the exact timing of the samples.
1474    ///
1475    /// # Non-Determinism
1476    /// The output stream is non-deterministic in which elements are sampled, since this
1477    /// is controlled by a clock.
1478    pub fn sample_every(
1479        self,
1480        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1481        nondet: NonDet,
1482    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1483    where
1484        L: NoTick + NoAtomic,
1485    {
1486        let samples = self.location.source_interval(interval, nondet);
1487
1488        let tick = self.location.tick();
1489        self.batch(&tick, nondet)
1490            .filter_if_some(samples.batch(&tick, nondet).first())
1491            .all_ticks()
1492            .weaken_retries()
1493    }
1494
1495    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1496    /// stream has not emitted a value since that duration.
1497    ///
1498    /// # Non-Determinism
1499    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1500    /// samples take place, timeouts may be non-deterministically generated or missed,
1501    /// and the notification of the timeout may be delayed as well. There is also no
1502    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1503    /// detected based on when the next sample is taken.
1504    pub fn timeout(
1505        self,
1506        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1507        nondet: NonDet,
1508    ) -> Optional<(), L, Unbounded>
1509    where
1510        L: NoTick + NoAtomic,
1511    {
1512        let tick = self.location.tick();
1513
1514        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1515            q!(|| None),
1516            q!(
1517                |latest, _| {
1518                    *latest = Some(Instant::now());
1519                },
1520                commutative = manual_proof!(/** TODO */)
1521            ),
1522        );
1523
1524        latest_received
1525            .snapshot(&tick, nondet)
1526            .filter_map(q!(move |latest_received| {
1527                if let Some(latest_received) = latest_received {
1528                    if Instant::now().duration_since(latest_received) > duration {
1529                        Some(())
1530                    } else {
1531                        None
1532                    }
1533                } else {
1534                    Some(())
1535                }
1536            }))
1537            .latest()
1538    }
1539
1540    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1541    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1542    ///
1543    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1544    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1545    /// argument that declares where the stream will be atomically processed. Batching a stream into
1546    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1547    /// [`Tick`] will introduce asynchrony.
1548    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
1549        let out_location = Atomic { tick: tick.clone() };
1550        Stream::new(
1551            out_location.clone(),
1552            HydroNode::BeginAtomic {
1553                inner: Box::new(self.ir_node.into_inner()),
1554                metadata: out_location
1555                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1556            },
1557        )
1558    }
1559
1560    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1561    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1562    /// the order of the input. The output stream will execute in the [`Tick`] that was
1563    /// used to create the atomic section.
1564    ///
1565    /// # Non-Determinism
1566    /// The batch boundaries are non-deterministic and may change across executions.
1567    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1568        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1569        Stream::new(
1570            tick.clone(),
1571            HydroNode::Batch {
1572                inner: Box::new(self.ir_node.into_inner()),
1573                metadata: tick
1574                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1575            },
1576        )
1577    }
1578
1579    /// An operator which allows you to "name" a `HydroNode`.
1580    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1581    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1582        {
1583            let mut node = self.ir_node.borrow_mut();
1584            let metadata = node.metadata_mut();
1585            metadata.tag = Some(name.to_owned());
1586        }
1587        self
1588    }
1589
1590    /// Explicitly "casts" the stream to a type with a different ordering
1591    /// guarantee. Useful in unsafe code where the ordering cannot be proven
1592    /// by the type-system.
1593    ///
1594    /// # Non-Determinism
1595    /// This function is used as an escape hatch, and any mistakes in the
1596    /// provided ordering guarantee will propagate into the guarantees
1597    /// for the rest of the program.
1598    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1599        if O::ORDERING_KIND == O2::ORDERING_KIND {
1600            Stream::new(self.location, self.ir_node.into_inner())
1601        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1602            // We can always weaken the ordering guarantee
1603            Stream::new(
1604                self.location.clone(),
1605                HydroNode::Cast {
1606                    inner: Box::new(self.ir_node.into_inner()),
1607                    metadata: self
1608                        .location
1609                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1610                },
1611            )
1612        } else {
1613            Stream::new(
1614                self.location.clone(),
1615                HydroNode::ObserveNonDet {
1616                    inner: Box::new(self.ir_node.into_inner()),
1617                    trusted: false,
1618                    metadata: self
1619                        .location
1620                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1621                },
1622            )
1623        }
1624    }
1625
1626    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1627    // intermediate states will not be revealed
1628    fn assume_ordering_trusted_bounded<O2: Ordering>(
1629        self,
1630        nondet: NonDet,
1631    ) -> Stream<T, L, B, O2, R> {
1632        if B::BOUNDED {
1633            self.assume_ordering_trusted(nondet)
1634        } else {
1635            self.assume_ordering(nondet)
1636        }
1637    }
1638
1639    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1640    // is not observable
1641    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1642        self,
1643        _nondet: NonDet,
1644    ) -> Stream<T, L, B, O2, R> {
1645        if O::ORDERING_KIND == O2::ORDERING_KIND {
1646            Stream::new(self.location, self.ir_node.into_inner())
1647        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1648            // We can always weaken the ordering guarantee
1649            Stream::new(
1650                self.location.clone(),
1651                HydroNode::Cast {
1652                    inner: Box::new(self.ir_node.into_inner()),
1653                    metadata: self
1654                        .location
1655                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1656                },
1657            )
1658        } else {
1659            Stream::new(
1660                self.location.clone(),
1661                HydroNode::ObserveNonDet {
1662                    inner: Box::new(self.ir_node.into_inner()),
1663                    trusted: true,
1664                    metadata: self
1665                        .location
1666                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1667                },
1668            )
1669        }
1670    }
1671
1672    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1673    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1674    /// which is always safe because that is the weakest possible guarantee.
1675    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1676        self.weaken_ordering::<NoOrder>()
1677    }
1678
1679    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1680    /// enforcing that `O2` is weaker than the input ordering guarantee.
1681    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1682        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1683        self.assume_ordering::<O2>(nondet)
1684    }
1685
1686    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1687    /// implies that `O == TotalOrder`.
1688    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1689    where
1690        O: IsOrdered,
1691    {
1692        self.assume_ordering(nondet!(/** no-op */))
1693    }
1694
1695    /// Explicitly "casts" the stream to a type with a different retries
1696    /// guarantee. Useful in unsafe code where the lack of retries cannot
1697    /// be proven by the type-system.
1698    ///
1699    /// # Non-Determinism
1700    /// This function is used as an escape hatch, and any mistakes in the
1701    /// provided retries guarantee will propagate into the guarantees
1702    /// for the rest of the program.
1703    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1704        if R::RETRIES_KIND == R2::RETRIES_KIND {
1705            Stream::new(self.location, self.ir_node.into_inner())
1706        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1707            // We can always weaken the retries guarantee
1708            Stream::new(
1709                self.location.clone(),
1710                HydroNode::Cast {
1711                    inner: Box::new(self.ir_node.into_inner()),
1712                    metadata: self
1713                        .location
1714                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1715                },
1716            )
1717        } else {
1718            Stream::new(
1719                self.location.clone(),
1720                HydroNode::ObserveNonDet {
1721                    inner: Box::new(self.ir_node.into_inner()),
1722                    trusted: false,
1723                    metadata: self
1724                        .location
1725                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1726                },
1727            )
1728        }
1729    }
1730
1731    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1732    // is not observable
1733    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1734        if R::RETRIES_KIND == R2::RETRIES_KIND {
1735            Stream::new(self.location, self.ir_node.into_inner())
1736        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1737            // We can always weaken the retries guarantee
1738            Stream::new(
1739                self.location.clone(),
1740                HydroNode::Cast {
1741                    inner: Box::new(self.ir_node.into_inner()),
1742                    metadata: self
1743                        .location
1744                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1745                },
1746            )
1747        } else {
1748            Stream::new(
1749                self.location.clone(),
1750                HydroNode::ObserveNonDet {
1751                    inner: Box::new(self.ir_node.into_inner()),
1752                    trusted: true,
1753                    metadata: self
1754                        .location
1755                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1756                },
1757            )
1758        }
1759    }
1760
1761    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
1762    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1763    /// which is always safe because that is the weakest possible guarantee.
1764    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1765        self.weaken_retries::<AtLeastOnce>()
1766    }
1767
1768    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1769    /// enforcing that `R2` is weaker than the input retries guarantee.
1770    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
1771        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1772        self.assume_retries::<R2>(nondet)
1773    }
1774
1775    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
1776    /// implies that `R == ExactlyOnce`.
1777    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
1778    where
1779        R: IsExactlyOnce,
1780    {
1781        self.assume_retries(nondet!(/** no-op */))
1782    }
1783
1784    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1785    /// implies that `B == Bounded`.
1786    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
1787    where
1788        B: IsBounded,
1789    {
1790        Stream::new(self.location, self.ir_node.into_inner())
1791    }
1792}
1793
1794impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1795where
1796    L: Location<'a>,
1797{
1798    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1799    ///
1800    /// # Example
1801    /// ```rust
1802    /// # #[cfg(feature = "deploy")] {
1803    /// # use hydro_lang::prelude::*;
1804    /// # use futures::StreamExt;
1805    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1806    /// process.source_iter(q!(&[1, 2, 3])).cloned()
1807    /// # }, |mut stream| async move {
1808    /// // 1, 2, 3
1809    /// # for w in vec![1, 2, 3] {
1810    /// #     assert_eq!(stream.next().await.unwrap(), w);
1811    /// # }
1812    /// # }));
1813    /// # }
1814    /// ```
1815    pub fn cloned(self) -> Stream<T, L, B, O, R>
1816    where
1817        T: Clone,
1818    {
1819        self.map(q!(|d| d.clone()))
1820    }
1821}
1822
1823impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
1824where
1825    L: Location<'a>,
1826{
1827    /// Computes the number of elements in the stream as a [`Singleton`].
1828    ///
1829    /// # Example
1830    /// ```rust
1831    /// # #[cfg(feature = "deploy")] {
1832    /// # use hydro_lang::prelude::*;
1833    /// # use futures::StreamExt;
1834    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1835    /// let tick = process.tick();
1836    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1837    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1838    /// batch.count().all_ticks()
1839    /// # }, |mut stream| async move {
1840    /// // 4
1841    /// # assert_eq!(stream.next().await.unwrap(), 4);
1842    /// # }));
1843    /// # }
1844    /// ```
1845    pub fn count(self) -> Singleton<usize, L, B> {
1846        self.assume_ordering_trusted::<TotalOrder>(nondet!(
1847            /// Order does not affect eventual count, and also does not affect intermediate states.
1848        ))
1849        .fold(q!(|| 0usize), q!(|count, _| *count += 1))
1850    }
1851}
1852
1853impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
1854    /// Produces a new stream that interleaves the elements of the two input streams.
1855    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
1856    ///
1857    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1858    /// [`Bounded`], you can use [`Stream::chain`] instead.
1859    ///
1860    /// # Example
1861    /// ```rust
1862    /// # #[cfg(feature = "deploy")] {
1863    /// # use hydro_lang::prelude::*;
1864    /// # use futures::StreamExt;
1865    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1866    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
1867    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
1868    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
1869    /// # }, |mut stream| async move {
1870    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
1871    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
1872    /// #     assert_eq!(stream.next().await.unwrap(), w);
1873    /// # }
1874    /// # }));
1875    /// # }
1876    /// ```
1877    pub fn interleave<O2: Ordering, R2: Retries>(
1878        self,
1879        other: Stream<T, L, Unbounded, O2, R2>,
1880    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
1881    where
1882        R: MinRetries<R2>,
1883    {
1884        Stream::new(
1885            self.location.clone(),
1886            HydroNode::Chain {
1887                first: Box::new(self.ir_node.into_inner()),
1888                second: Box::new(other.ir_node.into_inner()),
1889                metadata: self.location.new_node_metadata(Stream::<
1890                    T,
1891                    L,
1892                    Unbounded,
1893                    NoOrder,
1894                    <R as MinRetries<R2>>::Min,
1895                >::collection_kind()),
1896            },
1897        )
1898    }
1899}
1900
1901impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
1902    /// Produces a new stream that combines the elements of the two input streams,
1903    /// preserving the relative order of elements within each input.
1904    ///
1905    /// Currently, both input streams must be [`Unbounded`]. When the streams are
1906    /// [`Bounded`], you can use [`Stream::chain`] instead.
1907    ///
1908    /// # Non-Determinism
1909    /// The order in which elements *across* the two streams will be interleaved is
1910    /// non-deterministic, so the order of elements will vary across runs. If the output order
1911    /// is irrelevant, use [`Stream::interleave`] instead, which is deterministic but emits an
1912    /// unordered stream.
1913    ///
1914    /// # Example
1915    /// ```rust
1916    /// # #[cfg(feature = "deploy")] {
1917    /// # use hydro_lang::prelude::*;
1918    /// # use futures::StreamExt;
1919    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1920    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
1921    /// # process.source_iter(q!(vec![1, 3])).into();
1922    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
1923    /// # }, |mut stream| async move {
1924    /// // 1, 3 and 2, 4 in some order, preserving the original local order
1925    /// # for w in vec![1, 3, 2, 4] {
1926    /// #     assert_eq!(stream.next().await.unwrap(), w);
1927    /// # }
1928    /// # }));
1929    /// # }
1930    /// ```
1931    pub fn merge_ordered<R2: Retries>(
1932        self,
1933        other: Stream<T, L, Unbounded, TotalOrder, R2>,
1934        _nondet: NonDet,
1935    ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
1936    where
1937        R: MinRetries<R2>,
1938    {
1939        Stream::new(
1940            self.location.clone(),
1941            HydroNode::Chain {
1942                first: Box::new(self.ir_node.into_inner()),
1943                second: Box::new(other.ir_node.into_inner()),
1944                metadata: self.location.new_node_metadata(Stream::<
1945                    T,
1946                    L,
1947                    Unbounded,
1948                    TotalOrder,
1949                    <R as MinRetries<R2>>::Min,
1950                >::collection_kind()),
1951            },
1952        )
1953    }
1954}
1955
1956impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
1957where
1958    L: Location<'a>,
1959{
1960    /// Produces a new stream that emits the input elements in sorted order.
1961    ///
1962    /// The input stream can have any ordering guarantee, but the output stream
1963    /// will have a [`TotalOrder`] guarantee. This operator will block until all
1964    /// elements in the input stream are available, so it requires the input stream
1965    /// to be [`Bounded`].
1966    ///
1967    /// # Example
1968    /// ```rust
1969    /// # #[cfg(feature = "deploy")] {
1970    /// # use hydro_lang::prelude::*;
1971    /// # use futures::StreamExt;
1972    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1973    /// let tick = process.tick();
1974    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
1975    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1976    /// batch.sort().all_ticks()
1977    /// # }, |mut stream| async move {
1978    /// // 1, 2, 3, 4
1979    /// # for w in (1..5) {
1980    /// #     assert_eq!(stream.next().await.unwrap(), w);
1981    /// # }
1982    /// # }));
1983    /// # }
1984    /// ```
1985    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
1986    where
1987        B: IsBounded,
1988        T: Ord,
1989    {
1990        let this = self.make_bounded();
1991        Stream::new(
1992            this.location.clone(),
1993            HydroNode::Sort {
1994                input: Box::new(this.ir_node.into_inner()),
1995                metadata: this
1996                    .location
1997                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
1998            },
1999        )
2000    }
2001
2002    /// Produces a new stream that first emits the elements of the `self` stream,
2003    /// and then emits the elements of the `other` stream. The output stream has
2004    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2005    /// [`TotalOrder`] guarantee.
2006    ///
2007    /// Currently, both input streams must be [`Bounded`]. This operator will block
2008    /// on the first stream until all its elements are available. In a future version,
2009    /// we will relax the requirement on the `other` stream.
2010    ///
2011    /// # Example
2012    /// ```rust
2013    /// # #[cfg(feature = "deploy")] {
2014    /// # use hydro_lang::prelude::*;
2015    /// # use futures::StreamExt;
2016    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2017    /// let tick = process.tick();
2018    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2019    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2020    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2021    /// # }, |mut stream| async move {
2022    /// // 2, 3, 4, 5, 1, 2, 3, 4
2023    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2024    /// #     assert_eq!(stream.next().await.unwrap(), w);
2025    /// # }
2026    /// # }));
2027    /// # }
2028    /// ```
2029    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2030        self,
2031        other: Stream<T, L, B2, O2, R2>,
2032    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2033    where
2034        B: IsBounded,
2035        O: MinOrder<O2>,
2036        R: MinRetries<R2>,
2037    {
2038        check_matching_location(&self.location, &other.location);
2039
2040        Stream::new(
2041            self.location.clone(),
2042            HydroNode::Chain {
2043                first: Box::new(self.ir_node.into_inner()),
2044                second: Box::new(other.ir_node.into_inner()),
2045                metadata: self.location.new_node_metadata(Stream::<
2046                    T,
2047                    L,
2048                    B2,
2049                    <O as MinOrder<O2>>::Min,
2050                    <R as MinRetries<R2>>::Min,
2051                >::collection_kind()),
2052            },
2053        )
2054    }
2055
2056    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2057    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2058    /// because this is compiled into a nested loop.
2059    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2060        self,
2061        other: Stream<T2, L, Bounded, O2, R>,
2062    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2063    where
2064        B: IsBounded,
2065        T: Clone,
2066        T2: Clone,
2067    {
2068        let this = self.make_bounded();
2069        check_matching_location(&this.location, &other.location);
2070
2071        Stream::new(
2072            this.location.clone(),
2073            HydroNode::CrossProduct {
2074                left: Box::new(this.ir_node.into_inner()),
2075                right: Box::new(other.ir_node.into_inner()),
2076                metadata: this.location.new_node_metadata(Stream::<
2077                    (T, T2),
2078                    L,
2079                    Bounded,
2080                    <O2 as MinOrder<O>>::Min,
2081                    R,
2082                >::collection_kind()),
2083            },
2084        )
2085    }
2086
2087    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2088    /// `self` used as the values for *each* key.
2089    ///
2090    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2091    /// values. For example, it can be used to send the same set of elements to several cluster
2092    /// members, if the membership information is available as a [`KeyedSingleton`].
2093    ///
2094    /// # Example
2095    /// ```rust
2096    /// # #[cfg(feature = "deploy")] {
2097    /// # use hydro_lang::prelude::*;
2098    /// # use futures::StreamExt;
2099    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2100    /// # let tick = process.tick();
2101    /// let keyed_singleton = // { 1: (), 2: () }
2102    /// # process
2103    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2104    /// #     .into_keyed()
2105    /// #     .batch(&tick, nondet!(/** test */))
2106    /// #     .first();
2107    /// let stream = // [ "a", "b" ]
2108    /// # process
2109    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2110    /// #     .batch(&tick, nondet!(/** test */));
2111    /// stream.repeat_with_keys(keyed_singleton)
2112    /// # .entries().all_ticks()
2113    /// # }, |mut stream| async move {
2114    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2115    /// # let mut results = Vec::new();
2116    /// # for _ in 0..4 {
2117    /// #     results.push(stream.next().await.unwrap());
2118    /// # }
2119    /// # results.sort();
2120    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2121    /// # }));
2122    /// # }
2123    /// ```
2124    pub fn repeat_with_keys<K, V2>(
2125        self,
2126        keys: KeyedSingleton<K, V2, L, Bounded>,
2127    ) -> KeyedStream<K, T, L, Bounded, O, R>
2128    where
2129        B: IsBounded,
2130        K: Clone,
2131        T: Clone,
2132    {
2133        keys.keys()
2134            .weaken_retries()
2135            .assume_ordering_trusted::<TotalOrder>(
2136                nondet!(/** keyed stream does not depend on ordering of keys */),
2137            )
2138            .cross_product_nested_loop(self.make_bounded())
2139            .into_keyed()
2140    }
2141}
2142
2143impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2144where
2145    L: Location<'a>,
2146{
2147    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2148    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2149    /// by equi-joining the two streams on the key attribute `K`.
2150    ///
2151    /// # Example
2152    /// ```rust
2153    /// # #[cfg(feature = "deploy")] {
2154    /// # use hydro_lang::prelude::*;
2155    /// # use std::collections::HashSet;
2156    /// # use futures::StreamExt;
2157    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2158    /// let tick = process.tick();
2159    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2160    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2161    /// stream1.join(stream2)
2162    /// # }, |mut stream| async move {
2163    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2164    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2165    /// # stream.map(|i| assert!(expected.contains(&i)));
2166    /// # }));
2167    /// # }
2168    pub fn join<V2, O2: Ordering, R2: Retries>(
2169        self,
2170        n: Stream<(K, V2), L, B, O2, R2>,
2171    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2172    where
2173        K: Eq + Hash,
2174        R: MinRetries<R2>,
2175    {
2176        check_matching_location(&self.location, &n.location);
2177
2178        Stream::new(
2179            self.location.clone(),
2180            HydroNode::Join {
2181                left: Box::new(self.ir_node.into_inner()),
2182                right: Box::new(n.ir_node.into_inner()),
2183                metadata: self.location.new_node_metadata(Stream::<
2184                    (K, (V1, V2)),
2185                    L,
2186                    B,
2187                    NoOrder,
2188                    <R as MinRetries<R2>>::Min,
2189                >::collection_kind()),
2190            },
2191        )
2192    }
2193
2194    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2195    /// computes the anti-join of the items in the input -- i.e. returns
2196    /// unique items in the first input that do not have a matching key
2197    /// in the second input.
2198    ///
2199    /// # Example
2200    /// ```rust
2201    /// # #[cfg(feature = "deploy")] {
2202    /// # use hydro_lang::prelude::*;
2203    /// # use futures::StreamExt;
2204    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2205    /// let tick = process.tick();
2206    /// let stream = process
2207    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2208    ///   .batch(&tick, nondet!(/** test */));
2209    /// let batch = process
2210    ///   .source_iter(q!(vec![1, 2]))
2211    ///   .batch(&tick, nondet!(/** test */));
2212    /// stream.anti_join(batch).all_ticks()
2213    /// # }, |mut stream| async move {
2214    /// # for w in vec![(3, 'c'), (4, 'd')] {
2215    /// #     assert_eq!(stream.next().await.unwrap(), w);
2216    /// # }
2217    /// # }));
2218    /// # }
2219    pub fn anti_join<O2: Ordering, R2: Retries>(
2220        self,
2221        n: Stream<K, L, Bounded, O2, R2>,
2222    ) -> Stream<(K, V1), L, B, O, R>
2223    where
2224        K: Eq + Hash,
2225    {
2226        check_matching_location(&self.location, &n.location);
2227
2228        Stream::new(
2229            self.location.clone(),
2230            HydroNode::AntiJoin {
2231                pos: Box::new(self.ir_node.into_inner()),
2232                neg: Box::new(n.ir_node.into_inner()),
2233                metadata: self
2234                    .location
2235                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2236            },
2237        )
2238    }
2239}
2240
2241impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2242    Stream<(K, V), L, B, O, R>
2243{
2244    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2245    /// is used as the key and the second element is added to the entries associated with that key.
2246    ///
2247    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2248    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2249    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2250    /// total ordering _within_ each group but no ordering _across_ groups.
2251    ///
2252    /// # Example
2253    /// ```rust
2254    /// # #[cfg(feature = "deploy")] {
2255    /// # use hydro_lang::prelude::*;
2256    /// # use futures::StreamExt;
2257    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2258    /// process
2259    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2260    ///     .into_keyed()
2261    /// #   .entries()
2262    /// # }, |mut stream| async move {
2263    /// // { 1: [2, 3], 2: [4] }
2264    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2265    /// #     assert_eq!(stream.next().await.unwrap(), w);
2266    /// # }
2267    /// # }));
2268    /// # }
2269    /// ```
2270    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2271        KeyedStream::new(
2272            self.location.clone(),
2273            HydroNode::Cast {
2274                inner: Box::new(self.ir_node.into_inner()),
2275                metadata: self
2276                    .location
2277                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2278            },
2279        )
2280    }
2281}
2282
2283impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2284where
2285    K: Eq + Hash,
2286    L: Location<'a>,
2287{
2288    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2289    /// # Example
2290    /// ```rust
2291    /// # #[cfg(feature = "deploy")] {
2292    /// # use hydro_lang::prelude::*;
2293    /// # use futures::StreamExt;
2294    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2295    /// let tick = process.tick();
2296    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2297    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2298    /// batch.keys().all_ticks()
2299    /// # }, |mut stream| async move {
2300    /// // 1, 2
2301    /// # assert_eq!(stream.next().await.unwrap(), 1);
2302    /// # assert_eq!(stream.next().await.unwrap(), 2);
2303    /// # }));
2304    /// # }
2305    /// ```
2306    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2307        self.into_keyed()
2308            .fold(
2309                q!(|| ()),
2310                q!(
2311                    |_, _| {},
2312                    commutative = manual_proof!(/** values are ignored */),
2313                    idempotent = manual_proof!(/** values are ignored */)
2314                ),
2315            )
2316            .keys()
2317    }
2318}
2319
2320impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2321where
2322    L: Location<'a> + NoTick,
2323{
2324    /// Returns a stream corresponding to the latest batch of elements being atomically
2325    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2326    /// the order of the input.
2327    ///
2328    /// # Non-Determinism
2329    /// The batch boundaries are non-deterministic and may change across executions.
2330    pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2331        Stream::new(
2332            self.location.clone().tick,
2333            HydroNode::Batch {
2334                inner: Box::new(self.ir_node.into_inner()),
2335                metadata: self
2336                    .location
2337                    .tick
2338                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2339            },
2340        )
2341    }
2342
2343    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2344    /// See [`Stream::atomic`] for more details.
2345    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2346        Stream::new(
2347            self.location.tick.l.clone(),
2348            HydroNode::EndAtomic {
2349                inner: Box::new(self.ir_node.into_inner()),
2350                metadata: self
2351                    .location
2352                    .tick
2353                    .l
2354                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2355            },
2356        )
2357    }
2358}
2359
2360impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2361where
2362    L: Location<'a> + NoTick + NoAtomic,
2363    F: Future<Output = T>,
2364{
2365    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2366    /// Future outputs are produced as available, regardless of input arrival order.
2367    ///
2368    /// # Example
2369    /// ```rust
2370    /// # #[cfg(feature = "deploy")] {
2371    /// # use std::collections::HashSet;
2372    /// # use futures::StreamExt;
2373    /// # use hydro_lang::prelude::*;
2374    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2375    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2376    ///     .map(q!(|x| async move {
2377    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2378    ///         x
2379    ///     }))
2380    ///     .resolve_futures()
2381    /// #   },
2382    /// #   |mut stream| async move {
2383    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2384    /// #       let mut output = HashSet::new();
2385    /// #       for _ in 1..10 {
2386    /// #           output.insert(stream.next().await.unwrap());
2387    /// #       }
2388    /// #       assert_eq!(
2389    /// #           output,
2390    /// #           HashSet::<i32>::from_iter(1..10)
2391    /// #       );
2392    /// #   },
2393    /// # ));
2394    /// # }
2395    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2396        Stream::new(
2397            self.location.clone(),
2398            HydroNode::ResolveFutures {
2399                input: Box::new(self.ir_node.into_inner()),
2400                metadata: self
2401                    .location
2402                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2403            },
2404        )
2405    }
2406
2407    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2408    /// Future outputs are produced in the same order as the input stream.
2409    ///
2410    /// # Example
2411    /// ```rust
2412    /// # #[cfg(feature = "deploy")] {
2413    /// # use std::collections::HashSet;
2414    /// # use futures::StreamExt;
2415    /// # use hydro_lang::prelude::*;
2416    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2417    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2418    ///     .map(q!(|x| async move {
2419    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2420    ///         x
2421    ///     }))
2422    ///     .resolve_futures_ordered()
2423    /// #   },
2424    /// #   |mut stream| async move {
2425    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2426    /// #       let mut output = Vec::new();
2427    /// #       for _ in 1..10 {
2428    /// #           output.push(stream.next().await.unwrap());
2429    /// #       }
2430    /// #       assert_eq!(
2431    /// #           output,
2432    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2433    /// #       );
2434    /// #   },
2435    /// # ));
2436    /// # }
2437    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2438        Stream::new(
2439            self.location.clone(),
2440            HydroNode::ResolveFuturesOrdered {
2441                input: Box::new(self.ir_node.into_inner()),
2442                metadata: self
2443                    .location
2444                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2445            },
2446        )
2447    }
2448}
2449
2450impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2451where
2452    L: Location<'a>,
2453{
2454    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2455    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2456    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2457        Stream::new(
2458            self.location.outer().clone(),
2459            HydroNode::YieldConcat {
2460                inner: Box::new(self.ir_node.into_inner()),
2461                metadata: self
2462                    .location
2463                    .outer()
2464                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2465            },
2466        )
2467    }
2468
2469    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2470    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2471    ///
2472    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2473    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2474    /// stream's [`Tick`] context.
2475    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2476        let out_location = Atomic {
2477            tick: self.location.clone(),
2478        };
2479
2480        Stream::new(
2481            out_location.clone(),
2482            HydroNode::YieldConcat {
2483                inner: Box::new(self.ir_node.into_inner()),
2484                metadata: out_location
2485                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2486            },
2487        )
2488    }
2489
2490    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2491    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2492    /// input.
2493    ///
2494    /// This API is particularly useful for stateful computation on batches of data, such as
2495    /// maintaining an accumulated state that is up to date with the current batch.
2496    ///
2497    /// # Example
2498    /// ```rust
2499    /// # #[cfg(feature = "deploy")] {
2500    /// # use hydro_lang::prelude::*;
2501    /// # use futures::StreamExt;
2502    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2503    /// let tick = process.tick();
2504    /// # // ticks are lazy by default, forces the second tick to run
2505    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2506    /// # let batch_first_tick = process
2507    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
2508    /// #  .batch(&tick, nondet!(/** test */));
2509    /// # let batch_second_tick = process
2510    /// #   .source_iter(q!(vec![5, 6, 7]))
2511    /// #   .batch(&tick, nondet!(/** test */))
2512    /// #   .defer_tick(); // appears on the second tick
2513    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2514    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2515    ///
2516    /// input.batch(&tick, nondet!(/** test */))
2517    ///     .across_ticks(|s| s.count()).all_ticks()
2518    /// # }, |mut stream| async move {
2519    /// // [4, 7]
2520    /// assert_eq!(stream.next().await.unwrap(), 4);
2521    /// assert_eq!(stream.next().await.unwrap(), 7);
2522    /// # }));
2523    /// # }
2524    /// ```
2525    pub fn across_ticks<Out: BatchAtomic>(
2526        self,
2527        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2528    ) -> Out::Batched {
2529        thunk(self.all_ticks_atomic()).batched_atomic()
2530    }
2531
2532    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2533    /// always has the elements of `self` at tick `T - 1`.
2534    ///
2535    /// At tick `0`, the output stream is empty, since there is no previous tick.
2536    ///
2537    /// This operator enables stateful iterative processing with ticks, by sending data from one
2538    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2539    ///
2540    /// # Example
2541    /// ```rust
2542    /// # #[cfg(feature = "deploy")] {
2543    /// # use hydro_lang::prelude::*;
2544    /// # use futures::StreamExt;
2545    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2546    /// let tick = process.tick();
2547    /// // ticks are lazy by default, forces the second tick to run
2548    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2549    ///
2550    /// let batch_first_tick = process
2551    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2552    ///   .batch(&tick, nondet!(/** test */));
2553    /// let batch_second_tick = process
2554    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2555    ///   .batch(&tick, nondet!(/** test */))
2556    ///   .defer_tick(); // appears on the second tick
2557    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2558    ///
2559    /// changes_across_ticks.clone().filter_not_in(
2560    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2561    /// ).all_ticks()
2562    /// # }, |mut stream| async move {
2563    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2564    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2565    /// #     assert_eq!(stream.next().await.unwrap(), w);
2566    /// # }
2567    /// # }));
2568    /// # }
2569    /// ```
2570    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2571        Stream::new(
2572            self.location.clone(),
2573            HydroNode::DeferTick {
2574                input: Box::new(self.ir_node.into_inner()),
2575                metadata: self
2576                    .location
2577                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2578            },
2579        )
2580    }
2581}
2582
2583#[cfg(test)]
2584mod tests {
2585    #[cfg(feature = "deploy")]
2586    use futures::{SinkExt, StreamExt};
2587    #[cfg(feature = "deploy")]
2588    use hydro_deploy::Deployment;
2589    #[cfg(feature = "deploy")]
2590    use serde::{Deserialize, Serialize};
2591    #[cfg(any(feature = "deploy", feature = "sim"))]
2592    use stageleft::q;
2593
2594    #[cfg(any(feature = "deploy", feature = "sim"))]
2595    use crate::compile::builder::FlowBuilder;
2596    #[cfg(feature = "deploy")]
2597    use crate::live_collections::sliced::sliced;
2598    #[cfg(feature = "deploy")]
2599    use crate::live_collections::stream::ExactlyOnce;
2600    #[cfg(feature = "sim")]
2601    use crate::live_collections::stream::NoOrder;
2602    #[cfg(any(feature = "deploy", feature = "sim"))]
2603    use crate::live_collections::stream::TotalOrder;
2604    #[cfg(any(feature = "deploy", feature = "sim"))]
2605    use crate::location::Location;
2606    #[cfg(any(feature = "deploy", feature = "sim"))]
2607    use crate::nondet::nondet;
2608
2609    mod backtrace_chained_ops;
2610
2611    #[cfg(feature = "deploy")]
2612    struct P1 {}
2613    #[cfg(feature = "deploy")]
2614    struct P2 {}
2615
2616    #[cfg(feature = "deploy")]
2617    #[derive(Serialize, Deserialize, Debug)]
2618    struct SendOverNetwork {
2619        n: u32,
2620    }
2621
2622    #[cfg(feature = "deploy")]
2623    #[tokio::test]
2624    async fn first_ten_distributed() {
2625        use crate::networking::TCP;
2626
2627        let mut deployment = Deployment::new();
2628
2629        let mut flow = FlowBuilder::new();
2630        let first_node = flow.process::<P1>();
2631        let second_node = flow.process::<P2>();
2632        let external = flow.external::<P2>();
2633
2634        let numbers = first_node.source_iter(q!(0..10));
2635        let out_port = numbers
2636            .map(q!(|n| SendOverNetwork { n }))
2637            .send(&second_node, TCP.fail_stop().bincode())
2638            .send_bincode_external(&external);
2639
2640        let nodes = flow
2641            .with_process(&first_node, deployment.Localhost())
2642            .with_process(&second_node, deployment.Localhost())
2643            .with_external(&external, deployment.Localhost())
2644            .deploy(&mut deployment);
2645
2646        deployment.deploy().await.unwrap();
2647
2648        let mut external_out = nodes.connect(out_port).await;
2649
2650        deployment.start().await.unwrap();
2651
2652        for i in 0..10 {
2653            assert_eq!(external_out.next().await.unwrap().n, i);
2654        }
2655    }
2656
2657    #[cfg(feature = "deploy")]
2658    #[tokio::test]
2659    async fn first_cardinality() {
2660        let mut deployment = Deployment::new();
2661
2662        let mut flow = FlowBuilder::new();
2663        let node = flow.process::<()>();
2664        let external = flow.external::<()>();
2665
2666        let node_tick = node.tick();
2667        let count = node_tick
2668            .singleton(q!([1, 2, 3]))
2669            .into_stream()
2670            .flatten_ordered()
2671            .first()
2672            .into_stream()
2673            .count()
2674            .all_ticks()
2675            .send_bincode_external(&external);
2676
2677        let nodes = flow
2678            .with_process(&node, deployment.Localhost())
2679            .with_external(&external, deployment.Localhost())
2680            .deploy(&mut deployment);
2681
2682        deployment.deploy().await.unwrap();
2683
2684        let mut external_out = nodes.connect(count).await;
2685
2686        deployment.start().await.unwrap();
2687
2688        assert_eq!(external_out.next().await.unwrap(), 1);
2689    }
2690
2691    #[cfg(feature = "deploy")]
2692    #[tokio::test]
2693    async fn unbounded_reduce_remembers_state() {
2694        let mut deployment = Deployment::new();
2695
2696        let mut flow = FlowBuilder::new();
2697        let node = flow.process::<()>();
2698        let external = flow.external::<()>();
2699
2700        let (input_port, input) = node.source_external_bincode(&external);
2701        let out = input
2702            .reduce(q!(|acc, v| *acc += v))
2703            .sample_eager(nondet!(/** test */))
2704            .send_bincode_external(&external);
2705
2706        let nodes = flow
2707            .with_process(&node, deployment.Localhost())
2708            .with_external(&external, deployment.Localhost())
2709            .deploy(&mut deployment);
2710
2711        deployment.deploy().await.unwrap();
2712
2713        let mut external_in = nodes.connect(input_port).await;
2714        let mut external_out = nodes.connect(out).await;
2715
2716        deployment.start().await.unwrap();
2717
2718        external_in.send(1).await.unwrap();
2719        assert_eq!(external_out.next().await.unwrap(), 1);
2720
2721        external_in.send(2).await.unwrap();
2722        assert_eq!(external_out.next().await.unwrap(), 3);
2723    }
2724
2725    #[cfg(feature = "deploy")]
2726    #[tokio::test]
2727    async fn top_level_bounded_cross_singleton() {
2728        let mut deployment = Deployment::new();
2729
2730        let mut flow = FlowBuilder::new();
2731        let node = flow.process::<()>();
2732        let external = flow.external::<()>();
2733
2734        let (input_port, input) =
2735            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2736
2737        let out = input
2738            .cross_singleton(
2739                node.source_iter(q!(vec![1, 2, 3]))
2740                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
2741            )
2742            .send_bincode_external(&external);
2743
2744        let nodes = flow
2745            .with_process(&node, deployment.Localhost())
2746            .with_external(&external, deployment.Localhost())
2747            .deploy(&mut deployment);
2748
2749        deployment.deploy().await.unwrap();
2750
2751        let mut external_in = nodes.connect(input_port).await;
2752        let mut external_out = nodes.connect(out).await;
2753
2754        deployment.start().await.unwrap();
2755
2756        external_in.send(1).await.unwrap();
2757        assert_eq!(external_out.next().await.unwrap(), (1, 6));
2758
2759        external_in.send(2).await.unwrap();
2760        assert_eq!(external_out.next().await.unwrap(), (2, 6));
2761    }
2762
2763    #[cfg(feature = "deploy")]
2764    #[tokio::test]
2765    async fn top_level_bounded_reduce_cardinality() {
2766        let mut deployment = Deployment::new();
2767
2768        let mut flow = FlowBuilder::new();
2769        let node = flow.process::<()>();
2770        let external = flow.external::<()>();
2771
2772        let (input_port, input) =
2773            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2774
2775        let out = sliced! {
2776            let input = use(input, nondet!(/** test */));
2777            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
2778            input.cross_singleton(v.into_stream().count())
2779        }
2780        .send_bincode_external(&external);
2781
2782        let nodes = flow
2783            .with_process(&node, deployment.Localhost())
2784            .with_external(&external, deployment.Localhost())
2785            .deploy(&mut deployment);
2786
2787        deployment.deploy().await.unwrap();
2788
2789        let mut external_in = nodes.connect(input_port).await;
2790        let mut external_out = nodes.connect(out).await;
2791
2792        deployment.start().await.unwrap();
2793
2794        external_in.send(1).await.unwrap();
2795        assert_eq!(external_out.next().await.unwrap(), (1, 1));
2796
2797        external_in.send(2).await.unwrap();
2798        assert_eq!(external_out.next().await.unwrap(), (2, 1));
2799    }
2800
2801    #[cfg(feature = "deploy")]
2802    #[tokio::test]
2803    async fn top_level_bounded_into_singleton_cardinality() {
2804        let mut deployment = Deployment::new();
2805
2806        let mut flow = FlowBuilder::new();
2807        let node = flow.process::<()>();
2808        let external = flow.external::<()>();
2809
2810        let (input_port, input) =
2811            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2812
2813        let out = sliced! {
2814            let input = use(input, nondet!(/** test */));
2815            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
2816            input.cross_singleton(v.into_stream().count())
2817        }
2818        .send_bincode_external(&external);
2819
2820        let nodes = flow
2821            .with_process(&node, deployment.Localhost())
2822            .with_external(&external, deployment.Localhost())
2823            .deploy(&mut deployment);
2824
2825        deployment.deploy().await.unwrap();
2826
2827        let mut external_in = nodes.connect(input_port).await;
2828        let mut external_out = nodes.connect(out).await;
2829
2830        deployment.start().await.unwrap();
2831
2832        external_in.send(1).await.unwrap();
2833        assert_eq!(external_out.next().await.unwrap(), (1, 1));
2834
2835        external_in.send(2).await.unwrap();
2836        assert_eq!(external_out.next().await.unwrap(), (2, 1));
2837    }
2838
2839    #[cfg(feature = "deploy")]
2840    #[tokio::test]
2841    async fn atomic_fold_replays_each_tick() {
2842        let mut deployment = Deployment::new();
2843
2844        let mut flow = FlowBuilder::new();
2845        let node = flow.process::<()>();
2846        let external = flow.external::<()>();
2847
2848        let (input_port, input) =
2849            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2850        let tick = node.tick();
2851
2852        let out = input
2853            .batch(&tick, nondet!(/** test */))
2854            .cross_singleton(
2855                node.source_iter(q!(vec![1, 2, 3]))
2856                    .atomic(&tick)
2857                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
2858                    .snapshot_atomic(nondet!(/** test */)),
2859            )
2860            .all_ticks()
2861            .send_bincode_external(&external);
2862
2863        let nodes = flow
2864            .with_process(&node, deployment.Localhost())
2865            .with_external(&external, deployment.Localhost())
2866            .deploy(&mut deployment);
2867
2868        deployment.deploy().await.unwrap();
2869
2870        let mut external_in = nodes.connect(input_port).await;
2871        let mut external_out = nodes.connect(out).await;
2872
2873        deployment.start().await.unwrap();
2874
2875        external_in.send(1).await.unwrap();
2876        assert_eq!(external_out.next().await.unwrap(), (1, 6));
2877
2878        external_in.send(2).await.unwrap();
2879        assert_eq!(external_out.next().await.unwrap(), (2, 6));
2880    }
2881
2882    #[cfg(feature = "deploy")]
2883    #[tokio::test]
2884    async fn unbounded_scan_remembers_state() {
2885        let mut deployment = Deployment::new();
2886
2887        let mut flow = FlowBuilder::new();
2888        let node = flow.process::<()>();
2889        let external = flow.external::<()>();
2890
2891        let (input_port, input) = node.source_external_bincode(&external);
2892        let out = input
2893            .scan(
2894                q!(|| 0),
2895                q!(|acc, v| {
2896                    *acc += v;
2897                    Some(*acc)
2898                }),
2899            )
2900            .send_bincode_external(&external);
2901
2902        let nodes = flow
2903            .with_process(&node, deployment.Localhost())
2904            .with_external(&external, deployment.Localhost())
2905            .deploy(&mut deployment);
2906
2907        deployment.deploy().await.unwrap();
2908
2909        let mut external_in = nodes.connect(input_port).await;
2910        let mut external_out = nodes.connect(out).await;
2911
2912        deployment.start().await.unwrap();
2913
2914        external_in.send(1).await.unwrap();
2915        assert_eq!(external_out.next().await.unwrap(), 1);
2916
2917        external_in.send(2).await.unwrap();
2918        assert_eq!(external_out.next().await.unwrap(), 3);
2919    }
2920
2921    #[cfg(feature = "deploy")]
2922    #[tokio::test]
2923    async fn unbounded_enumerate_remembers_state() {
2924        let mut deployment = Deployment::new();
2925
2926        let mut flow = FlowBuilder::new();
2927        let node = flow.process::<()>();
2928        let external = flow.external::<()>();
2929
2930        let (input_port, input) = node.source_external_bincode(&external);
2931        let out = input.enumerate().send_bincode_external(&external);
2932
2933        let nodes = flow
2934            .with_process(&node, deployment.Localhost())
2935            .with_external(&external, deployment.Localhost())
2936            .deploy(&mut deployment);
2937
2938        deployment.deploy().await.unwrap();
2939
2940        let mut external_in = nodes.connect(input_port).await;
2941        let mut external_out = nodes.connect(out).await;
2942
2943        deployment.start().await.unwrap();
2944
2945        external_in.send(1).await.unwrap();
2946        assert_eq!(external_out.next().await.unwrap(), (0, 1));
2947
2948        external_in.send(2).await.unwrap();
2949        assert_eq!(external_out.next().await.unwrap(), (1, 2));
2950    }
2951
2952    #[cfg(feature = "deploy")]
2953    #[tokio::test]
2954    async fn unbounded_unique_remembers_state() {
2955        let mut deployment = Deployment::new();
2956
2957        let mut flow = FlowBuilder::new();
2958        let node = flow.process::<()>();
2959        let external = flow.external::<()>();
2960
2961        let (input_port, input) =
2962            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2963        let out = input.unique().send_bincode_external(&external);
2964
2965        let nodes = flow
2966            .with_process(&node, deployment.Localhost())
2967            .with_external(&external, deployment.Localhost())
2968            .deploy(&mut deployment);
2969
2970        deployment.deploy().await.unwrap();
2971
2972        let mut external_in = nodes.connect(input_port).await;
2973        let mut external_out = nodes.connect(out).await;
2974
2975        deployment.start().await.unwrap();
2976
2977        external_in.send(1).await.unwrap();
2978        assert_eq!(external_out.next().await.unwrap(), 1);
2979
2980        external_in.send(2).await.unwrap();
2981        assert_eq!(external_out.next().await.unwrap(), 2);
2982
2983        external_in.send(1).await.unwrap();
2984        external_in.send(3).await.unwrap();
2985        assert_eq!(external_out.next().await.unwrap(), 3);
2986    }
2987
2988    #[cfg(feature = "sim")]
2989    #[test]
2990    #[should_panic]
2991    fn sim_batch_nondet_size() {
2992        let mut flow = FlowBuilder::new();
2993        let node = flow.process::<()>();
2994
2995        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
2996
2997        let tick = node.tick();
2998        let out_recv = input
2999            .batch(&tick, nondet!(/** test */))
3000            .count()
3001            .all_ticks()
3002            .sim_output();
3003
3004        flow.sim().exhaustive(async || {
3005            in_send.send(());
3006            in_send.send(());
3007            in_send.send(());
3008
3009            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3010        });
3011    }
3012
3013    #[cfg(feature = "sim")]
3014    #[test]
3015    fn sim_batch_preserves_order() {
3016        let mut flow = FlowBuilder::new();
3017        let node = flow.process::<()>();
3018
3019        let (in_send, input) = node.sim_input();
3020
3021        let tick = node.tick();
3022        let out_recv = input
3023            .batch(&tick, nondet!(/** test */))
3024            .all_ticks()
3025            .sim_output();
3026
3027        flow.sim().exhaustive(async || {
3028            in_send.send(1);
3029            in_send.send(2);
3030            in_send.send(3);
3031
3032            out_recv.assert_yields_only([1, 2, 3]).await;
3033        });
3034    }
3035
3036    #[cfg(feature = "sim")]
3037    #[test]
3038    #[should_panic]
3039    fn sim_batch_unordered_shuffles() {
3040        let mut flow = FlowBuilder::new();
3041        let node = flow.process::<()>();
3042
3043        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3044
3045        let tick = node.tick();
3046        let batch = input.batch(&tick, nondet!(/** test */));
3047        let out_recv = batch
3048            .clone()
3049            .min()
3050            .zip(batch.max())
3051            .all_ticks()
3052            .sim_output();
3053
3054        flow.sim().exhaustive(async || {
3055            in_send.send_many_unordered([1, 2, 3]);
3056
3057            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3058                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3059            }
3060        });
3061    }
3062
3063    #[cfg(feature = "sim")]
3064    #[test]
3065    fn sim_batch_unordered_shuffles_count() {
3066        let mut flow = FlowBuilder::new();
3067        let node = flow.process::<()>();
3068
3069        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3070
3071        let tick = node.tick();
3072        let batch = input.batch(&tick, nondet!(/** test */));
3073        let out_recv = batch.all_ticks().sim_output();
3074
3075        let instance_count = flow.sim().exhaustive(async || {
3076            in_send.send_many_unordered([1, 2, 3, 4]);
3077            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3078        });
3079
3080        assert_eq!(
3081            instance_count,
3082            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3083        )
3084    }
3085
3086    #[cfg(feature = "sim")]
3087    #[test]
3088    #[should_panic]
3089    fn sim_observe_order_batched() {
3090        let mut flow = FlowBuilder::new();
3091        let node = flow.process::<()>();
3092
3093        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3094
3095        let tick = node.tick();
3096        let batch = input.batch(&tick, nondet!(/** test */));
3097        let out_recv = batch
3098            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3099            .all_ticks()
3100            .sim_output();
3101
3102        flow.sim().exhaustive(async || {
3103            in_send.send_many_unordered([1, 2, 3, 4]);
3104            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3105        });
3106    }
3107
3108    #[cfg(feature = "sim")]
3109    #[test]
3110    fn sim_observe_order_batched_count() {
3111        let mut flow = FlowBuilder::new();
3112        let node = flow.process::<()>();
3113
3114        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3115
3116        let tick = node.tick();
3117        let batch = input.batch(&tick, nondet!(/** test */));
3118        let out_recv = batch
3119            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3120            .all_ticks()
3121            .sim_output();
3122
3123        let instance_count = flow.sim().exhaustive(async || {
3124            in_send.send_many_unordered([1, 2, 3, 4]);
3125            let _ = out_recv.collect::<Vec<_>>().await;
3126        });
3127
3128        assert_eq!(
3129            instance_count,
3130            192 // 4! * 2^{4 - 1}
3131        )
3132    }
3133
3134    #[cfg(feature = "sim")]
3135    #[test]
3136    fn sim_unordered_count_instance_count() {
3137        let mut flow = FlowBuilder::new();
3138        let node = flow.process::<()>();
3139
3140        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3141
3142        let tick = node.tick();
3143        let out_recv = input
3144            .count()
3145            .snapshot(&tick, nondet!(/** test */))
3146            .all_ticks()
3147            .sim_output();
3148
3149        let instance_count = flow.sim().exhaustive(async || {
3150            in_send.send_many_unordered([1, 2, 3, 4]);
3151            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3152        });
3153
3154        assert_eq!(
3155            instance_count,
3156            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3157        )
3158    }
3159
3160    #[cfg(feature = "sim")]
3161    #[test]
3162    fn sim_top_level_assume_ordering() {
3163        let mut flow = FlowBuilder::new();
3164        let node = flow.process::<()>();
3165
3166        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3167
3168        let out_recv = input
3169            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3170            .sim_output();
3171
3172        let instance_count = flow.sim().exhaustive(async || {
3173            in_send.send_many_unordered([1, 2, 3]);
3174            let mut out = out_recv.collect::<Vec<_>>().await;
3175            out.sort();
3176            assert_eq!(out, vec![1, 2, 3]);
3177        });
3178
3179        assert_eq!(instance_count, 6)
3180    }
3181
3182    #[cfg(feature = "sim")]
3183    #[test]
3184    fn sim_top_level_assume_ordering_cycle_back() {
3185        let mut flow = FlowBuilder::new();
3186        let node = flow.process::<()>();
3187
3188        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3189
3190        let (complete_cycle_back, cycle_back) =
3191            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3192        let ordered = input
3193            .interleave(cycle_back)
3194            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3195        complete_cycle_back.complete(
3196            ordered
3197                .clone()
3198                .map(q!(|v| v + 1))
3199                .filter(q!(|v| v % 2 == 1)),
3200        );
3201
3202        let out_recv = ordered.sim_output();
3203
3204        let mut saw = false;
3205        let instance_count = flow.sim().exhaustive(async || {
3206            in_send.send_many_unordered([0, 2]);
3207            let out = out_recv.collect::<Vec<_>>().await;
3208
3209            if out.starts_with(&[0, 1, 2]) {
3210                saw = true;
3211            }
3212        });
3213
3214        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3215        assert_eq!(instance_count, 6)
3216    }
3217
3218    #[cfg(feature = "sim")]
3219    #[test]
3220    fn sim_top_level_assume_ordering_cycle_back_tick() {
3221        let mut flow = FlowBuilder::new();
3222        let node = flow.process::<()>();
3223
3224        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3225
3226        let (complete_cycle_back, cycle_back) =
3227            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3228        let ordered = input
3229            .interleave(cycle_back)
3230            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3231        complete_cycle_back.complete(
3232            ordered
3233                .clone()
3234                .batch(&node.tick(), nondet!(/** test */))
3235                .all_ticks()
3236                .map(q!(|v| v + 1))
3237                .filter(q!(|v| v % 2 == 1)),
3238        );
3239
3240        let out_recv = ordered.sim_output();
3241
3242        let mut saw = false;
3243        let instance_count = flow.sim().exhaustive(async || {
3244            in_send.send_many_unordered([0, 2]);
3245            let out = out_recv.collect::<Vec<_>>().await;
3246
3247            if out.starts_with(&[0, 1, 2]) {
3248                saw = true;
3249            }
3250        });
3251
3252        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3253        assert_eq!(instance_count, 58)
3254    }
3255
3256    #[cfg(feature = "sim")]
3257    #[test]
3258    fn sim_top_level_assume_ordering_multiple() {
3259        let mut flow = FlowBuilder::new();
3260        let node = flow.process::<()>();
3261
3262        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3263        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3264
3265        let (complete_cycle_back, cycle_back) =
3266            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3267        let input1_ordered = input
3268            .clone()
3269            .interleave(cycle_back)
3270            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3271        let foo = input1_ordered
3272            .clone()
3273            .map(q!(|v| v + 3))
3274            .weaken_ordering::<NoOrder>()
3275            .interleave(input2)
3276            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3277
3278        complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3279
3280        let out_recv = input1_ordered.sim_output();
3281
3282        let mut saw = false;
3283        let instance_count = flow.sim().exhaustive(async || {
3284            in_send.send_many_unordered([0, 1]);
3285            let out = out_recv.collect::<Vec<_>>().await;
3286
3287            if out.starts_with(&[0, 3, 1]) {
3288                saw = true;
3289            }
3290        });
3291
3292        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3293        assert_eq!(instance_count, 24)
3294    }
3295
3296    #[cfg(feature = "sim")]
3297    #[test]
3298    fn sim_atomic_assume_ordering_cycle_back() {
3299        let mut flow = FlowBuilder::new();
3300        let node = flow.process::<()>();
3301
3302        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3303
3304        let (complete_cycle_back, cycle_back) =
3305            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3306        let ordered = input
3307            .interleave(cycle_back)
3308            .atomic(&node.tick())
3309            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3310            .end_atomic();
3311        complete_cycle_back.complete(
3312            ordered
3313                .clone()
3314                .map(q!(|v| v + 1))
3315                .filter(q!(|v| v % 2 == 1)),
3316        );
3317
3318        let out_recv = ordered.sim_output();
3319
3320        let instance_count = flow.sim().exhaustive(async || {
3321            in_send.send_many_unordered([0, 2]);
3322            let out = out_recv.collect::<Vec<_>>().await;
3323            assert_eq!(out.len(), 4);
3324        });
3325
3326        assert_eq!(instance_count, 22)
3327    }
3328
3329    #[cfg(feature = "deploy")]
3330    #[tokio::test]
3331    async fn partition_evens_odds() {
3332        let mut deployment = Deployment::new();
3333
3334        let mut flow = FlowBuilder::new();
3335        let node = flow.process::<()>();
3336        let external = flow.external::<()>();
3337
3338        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3339        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3340        let evens_port = evens.send_bincode_external(&external);
3341        let odds_port = odds.send_bincode_external(&external);
3342
3343        let nodes = flow
3344            .with_process(&node, deployment.Localhost())
3345            .with_external(&external, deployment.Localhost())
3346            .deploy(&mut deployment);
3347
3348        deployment.deploy().await.unwrap();
3349
3350        let mut evens_out = nodes.connect(evens_port).await;
3351        let mut odds_out = nodes.connect(odds_port).await;
3352
3353        deployment.start().await.unwrap();
3354
3355        let mut even_results = Vec::new();
3356        for _ in 0..3 {
3357            even_results.push(evens_out.next().await.unwrap());
3358        }
3359        even_results.sort();
3360        assert_eq!(even_results, vec![2, 4, 6]);
3361
3362        let mut odd_results = Vec::new();
3363        for _ in 0..3 {
3364            odd_results.push(odds_out.next().await.unwrap());
3365        }
3366        odd_results.sort();
3367        assert_eq!(odd_results, vec![1, 3, 5]);
3368    }
3369}