Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26#[cfg(stageleft_runtime)]
27use crate::location::dynamic::{DynLocation, LocationId};
28use crate::location::tick::{Atomic, DeferTick, NoAtomic};
29use crate::location::{Location, NoTick, Tick, check_matching_location};
30use crate::manual_expr::ManualExpr;
31use crate::nondet::{NonDet, nondet};
32use crate::prelude::manual_proof;
33use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
34
35pub mod networking;
36
37/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
38#[sealed::sealed]
39pub trait Ordering:
40    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
41{
42    /// The [`StreamOrder`] corresponding to this type.
43    const ORDERING_KIND: StreamOrder;
44}
45
46/// Marks the stream as being totally ordered, which means that there are
47/// no sources of non-determinism (other than intentional ones) that will
48/// affect the order of elements.
49pub enum TotalOrder {}
50
51#[sealed::sealed]
52impl Ordering for TotalOrder {
53    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
54}
55
56/// Marks the stream as having no order, which means that the order of
57/// elements may be affected by non-determinism.
58///
59/// This restricts certain operators, such as `fold` and `reduce`, to only
60/// be used with commutative aggregation functions.
61pub enum NoOrder {}
62
63#[sealed::sealed]
64impl Ordering for NoOrder {
65    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
66}
67
68/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
69/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
70/// have `Self` guarantees instead.
71#[sealed::sealed]
72pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
73#[sealed::sealed]
74impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
75
76/// Helper trait for determining the weakest of two orderings.
77#[sealed::sealed]
78pub trait MinOrder<Other: ?Sized> {
79    /// The weaker of the two orderings.
80    type Min: Ordering;
81}
82
83#[sealed::sealed]
84impl<O: Ordering> MinOrder<O> for TotalOrder {
85    type Min = O;
86}
87
88#[sealed::sealed]
89impl<O: Ordering> MinOrder<O> for NoOrder {
90    type Min = NoOrder;
91}
92
93/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
94#[sealed::sealed]
95pub trait Retries:
96    MinRetries<Self, Min = Self>
97    + MinRetries<ExactlyOnce, Min = Self>
98    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
99{
100    /// The [`StreamRetry`] corresponding to this type.
101    const RETRIES_KIND: StreamRetry;
102}
103
104/// Marks the stream as having deterministic message cardinality, with no
105/// possibility of duplicates.
106pub enum ExactlyOnce {}
107
108#[sealed::sealed]
109impl Retries for ExactlyOnce {
110    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
111}
112
113/// Marks the stream as having non-deterministic message cardinality, which
114/// means that duplicates may occur, but messages will not be dropped.
115pub enum AtLeastOnce {}
116
117#[sealed::sealed]
118impl Retries for AtLeastOnce {
119    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
120}
121
122/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
123/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
124/// have `Self` guarantees instead.
125#[sealed::sealed]
126pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
127#[sealed::sealed]
128impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
129
130/// Helper trait for determining the weakest of two retry guarantees.
131#[sealed::sealed]
132pub trait MinRetries<Other: ?Sized> {
133    /// The weaker of the two retry guarantees.
134    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
135}
136
137#[sealed::sealed]
138impl<R: Retries> MinRetries<R> for ExactlyOnce {
139    type Min = R;
140}
141
142#[sealed::sealed]
143impl<R: Retries> MinRetries<R> for AtLeastOnce {
144    type Min = AtLeastOnce;
145}
146
147#[sealed::sealed]
148#[diagnostic::on_unimplemented(
149    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
150    label = "required here",
151    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
152)]
153/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
154pub trait IsOrdered: Ordering {}
155
156#[sealed::sealed]
157#[diagnostic::do_not_recommend]
158impl IsOrdered for TotalOrder {}
159
160#[sealed::sealed]
161#[diagnostic::on_unimplemented(
162    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
163    label = "required here",
164    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
165)]
166/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
167pub trait IsExactlyOnce: Retries {}
168
169#[sealed::sealed]
170#[diagnostic::do_not_recommend]
171impl IsExactlyOnce for ExactlyOnce {}
172
173/// Streaming sequence of elements with type `Type`.
174///
175/// This live collection represents a growing sequence of elements, with new elements being
176/// asynchronously appended to the end of the sequence. This can be used to model the arrival
177/// of network input, such as API requests, or streaming ingestion.
178///
179/// By default, all streams have deterministic ordering and each element is materialized exactly
180/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
181/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
182/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
183///
184/// Type Parameters:
185/// - `Type`: the type of elements in the stream
186/// - `Loc`: the location where the stream is being materialized
187/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
188/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
189///   (default is [`TotalOrder`])
190/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
191///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
192pub struct Stream<
193    Type,
194    Loc,
195    Bound: Boundedness = Unbounded,
196    Order: Ordering = TotalOrder,
197    Retry: Retries = ExactlyOnce,
198> {
199    pub(crate) location: Loc,
200    pub(crate) ir_node: RefCell<HydroNode>,
201    pub(crate) flow_state: FlowState,
202
203    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
204}
205
206impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
207    fn drop(&mut self) {
208        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
209        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
210            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
211                input: Box::new(ir_node),
212                op_metadata: HydroIrOpMetadata::new(),
213            });
214        }
215    }
216}
217
218impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
219    for Stream<T, L, Unbounded, O, R>
220where
221    L: Location<'a>,
222{
223    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
224        let new_meta = stream
225            .location
226            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
227
228        Stream {
229            location: stream.location.clone(),
230            flow_state: stream.flow_state.clone(),
231            ir_node: RefCell::new(HydroNode::Cast {
232                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
233                metadata: new_meta,
234            }),
235            _phantom: PhantomData,
236        }
237    }
238}
239
240impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
241    for Stream<T, L, B, NoOrder, R>
242where
243    L: Location<'a>,
244{
245    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
246        stream.weaken_ordering()
247    }
248}
249
250impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
251    for Stream<T, L, B, O, AtLeastOnce>
252where
253    L: Location<'a>,
254{
255    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
256        stream.weaken_retries()
257    }
258}
259
260impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
261where
262    L: Location<'a>,
263{
264    fn defer_tick(self) -> Self {
265        Stream::defer_tick(self)
266    }
267}
268
269impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
270    for Stream<T, Tick<L>, Bounded, O, R>
271where
272    L: Location<'a>,
273{
274    type Location = Tick<L>;
275
276    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
277        Stream::new(
278            location.clone(),
279            HydroNode::CycleSource {
280                cycle_id,
281                metadata: location.new_node_metadata(Self::collection_kind()),
282            },
283        )
284    }
285}
286
287impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
288    for Stream<T, Tick<L>, Bounded, O, R>
289where
290    L: Location<'a>,
291{
292    type Location = Tick<L>;
293
294    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
295        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
296            location.clone(),
297            HydroNode::DeferTick {
298                input: Box::new(HydroNode::CycleSource {
299                    cycle_id,
300                    metadata: location.new_node_metadata(Self::collection_kind()),
301                }),
302                metadata: location.new_node_metadata(Self::collection_kind()),
303            },
304        );
305
306        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
307    }
308}
309
310impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
311    for Stream<T, Tick<L>, Bounded, O, R>
312where
313    L: Location<'a>,
314{
315    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
316        assert_eq!(
317            Location::id(&self.location),
318            expected_location,
319            "locations do not match"
320        );
321        self.location
322            .flow_state()
323            .borrow_mut()
324            .push_root(HydroRoot::CycleSink {
325                cycle_id,
326                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
327                op_metadata: HydroIrOpMetadata::new(),
328            });
329    }
330}
331
332impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
333    for Stream<T, L, B, O, R>
334where
335    L: Location<'a> + NoTick,
336{
337    type Location = L;
338
339    fn create_source(cycle_id: CycleId, location: L) -> Self {
340        Stream::new(
341            location.clone(),
342            HydroNode::CycleSource {
343                cycle_id,
344                metadata: location.new_node_metadata(Self::collection_kind()),
345            },
346        )
347    }
348}
349
350impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
351    for Stream<T, L, B, O, R>
352where
353    L: Location<'a> + NoTick,
354{
355    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
356        assert_eq!(
357            Location::id(&self.location),
358            expected_location,
359            "locations do not match"
360        );
361        self.location
362            .flow_state()
363            .borrow_mut()
364            .push_root(HydroRoot::CycleSink {
365                cycle_id,
366                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
367                op_metadata: HydroIrOpMetadata::new(),
368            });
369    }
370}
371
372impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
373where
374    T: Clone,
375    L: Location<'a>,
376{
377    fn clone(&self) -> Self {
378        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
379            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
380            *self.ir_node.borrow_mut() = HydroNode::Tee {
381                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
382                metadata: self.location.new_node_metadata(Self::collection_kind()),
383            };
384        }
385
386        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
387            Stream {
388                location: self.location.clone(),
389                flow_state: self.flow_state.clone(),
390                ir_node: HydroNode::Tee {
391                    inner: SharedNode(inner.0.clone()),
392                    metadata: metadata.clone(),
393                }
394                .into(),
395                _phantom: PhantomData,
396            }
397        } else {
398            unreachable!()
399        }
400    }
401}
402
403impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
404where
405    L: Location<'a>,
406{
407    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
408        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
409        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
410
411        let flow_state = location.flow_state().clone();
412        Stream {
413            location,
414            flow_state,
415            ir_node: RefCell::new(ir_node),
416            _phantom: PhantomData,
417        }
418    }
419
420    /// Returns the [`Location`] where this stream is being materialized.
421    pub fn location(&self) -> &L {
422        &self.location
423    }
424
425    pub(crate) fn collection_kind() -> CollectionKind {
426        CollectionKind::Stream {
427            bound: B::BOUND_KIND,
428            order: O::ORDERING_KIND,
429            retry: R::RETRIES_KIND,
430            element_type: quote_type::<T>().into(),
431        }
432    }
433
434    /// Produces a stream based on invoking `f` on each element.
435    /// If you do not want to modify the stream and instead only want to view
436    /// each item use [`Stream::inspect`] instead.
437    ///
438    /// # Example
439    /// ```rust
440    /// # #[cfg(feature = "deploy")] {
441    /// # use hydro_lang::prelude::*;
442    /// # use futures::StreamExt;
443    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
444    /// let words = process.source_iter(q!(vec!["hello", "world"]));
445    /// words.map(q!(|x| x.to_uppercase()))
446    /// # }, |mut stream| async move {
447    /// # for w in vec!["HELLO", "WORLD"] {
448    /// #     assert_eq!(stream.next().await.unwrap(), w);
449    /// # }
450    /// # }));
451    /// # }
452    /// ```
453    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
454    where
455        F: Fn(T) -> U + 'a,
456    {
457        let f = f.splice_fn1_ctx(&self.location).into();
458        Stream::new(
459            self.location.clone(),
460            HydroNode::Map {
461                f,
462                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
463                metadata: self
464                    .location
465                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
466            },
467        )
468    }
469
470    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
471    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
472    /// for the output type `U` must produce items in a **deterministic** order.
473    ///
474    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
475    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
476    ///
477    /// # Example
478    /// ```rust
479    /// # #[cfg(feature = "deploy")] {
480    /// # use hydro_lang::prelude::*;
481    /// # use futures::StreamExt;
482    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
483    /// process
484    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
485    ///     .flat_map_ordered(q!(|x| x))
486    /// # }, |mut stream| async move {
487    /// // 1, 2, 3, 4
488    /// # for w in (1..5) {
489    /// #     assert_eq!(stream.next().await.unwrap(), w);
490    /// # }
491    /// # }));
492    /// # }
493    /// ```
494    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
495    where
496        I: IntoIterator<Item = U>,
497        F: Fn(T) -> I + 'a,
498    {
499        let f = f.splice_fn1_ctx(&self.location).into();
500        Stream::new(
501            self.location.clone(),
502            HydroNode::FlatMap {
503                f,
504                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
505                metadata: self
506                    .location
507                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
508            },
509        )
510    }
511
512    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
513    /// for the output type `U` to produce items in any order.
514    ///
515    /// # Example
516    /// ```rust
517    /// # #[cfg(feature = "deploy")] {
518    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
519    /// # use futures::StreamExt;
520    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
521    /// process
522    ///     .source_iter(q!(vec![
523    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
524    ///         std::collections::HashSet::from_iter(vec![3, 4]),
525    ///     ]))
526    ///     .flat_map_unordered(q!(|x| x))
527    /// # }, |mut stream| async move {
528    /// // 1, 2, 3, 4, but in no particular order
529    /// # let mut results = Vec::new();
530    /// # for w in (1..5) {
531    /// #     results.push(stream.next().await.unwrap());
532    /// # }
533    /// # results.sort();
534    /// # assert_eq!(results, vec![1, 2, 3, 4]);
535    /// # }));
536    /// # }
537    /// ```
538    pub fn flat_map_unordered<U, I, F>(
539        self,
540        f: impl IntoQuotedMut<'a, F, L>,
541    ) -> Stream<U, L, B, NoOrder, R>
542    where
543        I: IntoIterator<Item = U>,
544        F: Fn(T) -> I + 'a,
545    {
546        let f = f.splice_fn1_ctx(&self.location).into();
547        Stream::new(
548            self.location.clone(),
549            HydroNode::FlatMap {
550                f,
551                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
552                metadata: self
553                    .location
554                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
555            },
556        )
557    }
558
559    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
560    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
561    ///
562    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
563    /// not deterministic, use [`Stream::flatten_unordered`] instead.
564    ///
565    /// ```rust
566    /// # #[cfg(feature = "deploy")] {
567    /// # use hydro_lang::prelude::*;
568    /// # use futures::StreamExt;
569    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
570    /// process
571    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
572    ///     .flatten_ordered()
573    /// # }, |mut stream| async move {
574    /// // 1, 2, 3, 4
575    /// # for w in (1..5) {
576    /// #     assert_eq!(stream.next().await.unwrap(), w);
577    /// # }
578    /// # }));
579    /// # }
580    /// ```
581    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
582    where
583        T: IntoIterator<Item = U>,
584    {
585        self.flat_map_ordered(q!(|d| d))
586    }
587
588    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
589    /// for the element type `T` to produce items in any order.
590    ///
591    /// # Example
592    /// ```rust
593    /// # #[cfg(feature = "deploy")] {
594    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
595    /// # use futures::StreamExt;
596    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
597    /// process
598    ///     .source_iter(q!(vec![
599    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
600    ///         std::collections::HashSet::from_iter(vec![3, 4]),
601    ///     ]))
602    ///     .flatten_unordered()
603    /// # }, |mut stream| async move {
604    /// // 1, 2, 3, 4, but in no particular order
605    /// # let mut results = Vec::new();
606    /// # for w in (1..5) {
607    /// #     results.push(stream.next().await.unwrap());
608    /// # }
609    /// # results.sort();
610    /// # assert_eq!(results, vec![1, 2, 3, 4]);
611    /// # }));
612    /// # }
613    /// ```
614    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
615    where
616        T: IntoIterator<Item = U>,
617    {
618        self.flat_map_unordered(q!(|d| d))
619    }
620
621    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
622    /// `f`, preserving the order of the elements.
623    ///
624    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
625    /// not modify or take ownership of the values. If you need to modify the values while filtering
626    /// use [`Stream::filter_map`] instead.
627    ///
628    /// # Example
629    /// ```rust
630    /// # #[cfg(feature = "deploy")] {
631    /// # use hydro_lang::prelude::*;
632    /// # use futures::StreamExt;
633    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
634    /// process
635    ///     .source_iter(q!(vec![1, 2, 3, 4]))
636    ///     .filter(q!(|&x| x > 2))
637    /// # }, |mut stream| async move {
638    /// // 3, 4
639    /// # for w in (3..5) {
640    /// #     assert_eq!(stream.next().await.unwrap(), w);
641    /// # }
642    /// # }));
643    /// # }
644    /// ```
645    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
646    where
647        F: Fn(&T) -> bool + 'a,
648    {
649        let f = f.splice_fn1_borrow_ctx(&self.location).into();
650        Stream::new(
651            self.location.clone(),
652            HydroNode::Filter {
653                f,
654                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
655                metadata: self.location.new_node_metadata(Self::collection_kind()),
656            },
657        )
658    }
659
660    /// Splits the stream into two streams based on a predicate, without cloning elements.
661    ///
662    /// Elements for which `f` returns `true` are sent to the first output stream,
663    /// and elements for which `f` returns `false` are sent to the second output stream.
664    ///
665    /// Unlike using `filter` twice, this only evaluates the predicate once per element
666    /// and does not require `T: Clone`.
667    ///
668    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
669    /// the predicate is only used for routing; the element itself is moved to the
670    /// appropriate output stream.
671    ///
672    /// # Example
673    /// ```rust
674    /// # #[cfg(feature = "deploy")] {
675    /// # use hydro_lang::prelude::*;
676    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
677    /// # use futures::StreamExt;
678    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
679    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
680    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
681    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
682    /// evens.map(q!(|x| (x, true)))
683    ///     .interleave(odds.map(q!(|x| (x, false))))
684    /// # }, |mut stream| async move {
685    /// # let mut results = Vec::new();
686    /// # for _ in 0..6 {
687    /// #     results.push(stream.next().await.unwrap());
688    /// # }
689    /// # results.sort();
690    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
691    /// # }));
692    /// # }
693    /// ```
694    #[expect(
695        clippy::type_complexity,
696        reason = "return type mirrors the input stream type"
697    )]
698    pub fn partition<F>(
699        self,
700        f: impl IntoQuotedMut<'a, F, L>,
701    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
702    where
703        F: Fn(&T) -> bool + 'a,
704    {
705        let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
706        let shared = SharedNode(Rc::new(RefCell::new(
707            self.ir_node.replace(HydroNode::Placeholder),
708        )));
709
710        let true_stream = Stream::new(
711            self.location.clone(),
712            HydroNode::Partition {
713                inner: SharedNode(shared.0.clone()),
714                f: f.clone(),
715                is_true: true,
716                metadata: self.location.new_node_metadata(Self::collection_kind()),
717            },
718        );
719
720        let false_stream = Stream::new(
721            self.location.clone(),
722            HydroNode::Partition {
723                inner: SharedNode(shared.0),
724                f,
725                is_true: false,
726                metadata: self.location.new_node_metadata(Self::collection_kind()),
727            },
728        );
729
730        (true_stream, false_stream)
731    }
732
733    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
734    ///
735    /// # Example
736    /// ```rust
737    /// # #[cfg(feature = "deploy")] {
738    /// # use hydro_lang::prelude::*;
739    /// # use futures::StreamExt;
740    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
741    /// process
742    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
743    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
744    /// # }, |mut stream| async move {
745    /// // 1, 2
746    /// # for w in (1..3) {
747    /// #     assert_eq!(stream.next().await.unwrap(), w);
748    /// # }
749    /// # }));
750    /// # }
751    /// ```
752    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
753    where
754        F: Fn(T) -> Option<U> + 'a,
755    {
756        let f = f.splice_fn1_ctx(&self.location).into();
757        Stream::new(
758            self.location.clone(),
759            HydroNode::FilterMap {
760                f,
761                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
762                metadata: self
763                    .location
764                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
765            },
766        )
767    }
768
769    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
770    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
771    /// If `other` is an empty [`Optional`], no values will be produced.
772    ///
773    /// # Example
774    /// ```rust
775    /// # #[cfg(feature = "deploy")] {
776    /// # use hydro_lang::prelude::*;
777    /// # use futures::StreamExt;
778    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
779    /// let tick = process.tick();
780    /// let batch = process
781    ///   .source_iter(q!(vec![1, 2, 3, 4]))
782    ///   .batch(&tick, nondet!(/** test */));
783    /// let count = batch.clone().count(); // `count()` returns a singleton
784    /// batch.cross_singleton(count).all_ticks()
785    /// # }, |mut stream| async move {
786    /// // (1, 4), (2, 4), (3, 4), (4, 4)
787    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
788    /// #     assert_eq!(stream.next().await.unwrap(), w);
789    /// # }
790    /// # }));
791    /// # }
792    /// ```
793    pub fn cross_singleton<O2>(
794        self,
795        other: impl Into<Optional<O2, L, Bounded>>,
796    ) -> Stream<(T, O2), L, B, O, R>
797    where
798        O2: Clone,
799    {
800        let other: Optional<O2, L, Bounded> = other.into();
801        check_matching_location(&self.location, &other.location);
802
803        Stream::new(
804            self.location.clone(),
805            HydroNode::CrossSingleton {
806                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
807                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
808                metadata: self
809                    .location
810                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
811            },
812        )
813    }
814
815    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
816    ///
817    /// # Example
818    /// ```rust
819    /// # #[cfg(feature = "deploy")] {
820    /// # use hydro_lang::prelude::*;
821    /// # use futures::StreamExt;
822    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
823    /// let tick = process.tick();
824    /// // ticks are lazy by default, forces the second tick to run
825    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
826    ///
827    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
828    /// let batch_first_tick = process
829    ///   .source_iter(q!(vec![1, 2, 3, 4]))
830    ///   .batch(&tick, nondet!(/** test */));
831    /// let batch_second_tick = process
832    ///   .source_iter(q!(vec![5, 6, 7, 8]))
833    ///   .batch(&tick, nondet!(/** test */))
834    ///   .defer_tick();
835    /// batch_first_tick.chain(batch_second_tick)
836    ///   .filter_if(signal)
837    ///   .all_ticks()
838    /// # }, |mut stream| async move {
839    /// // [1, 2, 3, 4]
840    /// # for w in vec![1, 2, 3, 4] {
841    /// #     assert_eq!(stream.next().await.unwrap(), w);
842    /// # }
843    /// # }));
844    /// # }
845    /// ```
846    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
847        self.cross_singleton(signal.filter(q!(|b| *b)))
848            .map(q!(|(d, _)| d))
849    }
850
851    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
852    ///
853    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
854    /// leader of a cluster.
855    ///
856    /// # Example
857    /// ```rust
858    /// # #[cfg(feature = "deploy")] {
859    /// # use hydro_lang::prelude::*;
860    /// # use futures::StreamExt;
861    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
862    /// let tick = process.tick();
863    /// // ticks are lazy by default, forces the second tick to run
864    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
865    ///
866    /// let batch_first_tick = process
867    ///   .source_iter(q!(vec![1, 2, 3, 4]))
868    ///   .batch(&tick, nondet!(/** test */));
869    /// let batch_second_tick = process
870    ///   .source_iter(q!(vec![5, 6, 7, 8]))
871    ///   .batch(&tick, nondet!(/** test */))
872    ///   .defer_tick(); // appears on the second tick
873    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
874    /// batch_first_tick.chain(batch_second_tick)
875    ///   .filter_if_some(some_on_first_tick)
876    ///   .all_ticks()
877    /// # }, |mut stream| async move {
878    /// // [1, 2, 3, 4]
879    /// # for w in vec![1, 2, 3, 4] {
880    /// #     assert_eq!(stream.next().await.unwrap(), w);
881    /// # }
882    /// # }));
883    /// # }
884    /// ```
885    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
886    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
887        self.filter_if(signal.is_some())
888    }
889
890    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
891    ///
892    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
893    /// some local state.
894    ///
895    /// # Example
896    /// ```rust
897    /// # #[cfg(feature = "deploy")] {
898    /// # use hydro_lang::prelude::*;
899    /// # use futures::StreamExt;
900    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
901    /// let tick = process.tick();
902    /// // ticks are lazy by default, forces the second tick to run
903    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
904    ///
905    /// let batch_first_tick = process
906    ///   .source_iter(q!(vec![1, 2, 3, 4]))
907    ///   .batch(&tick, nondet!(/** test */));
908    /// let batch_second_tick = process
909    ///   .source_iter(q!(vec![5, 6, 7, 8]))
910    ///   .batch(&tick, nondet!(/** test */))
911    ///   .defer_tick(); // appears on the second tick
912    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
913    /// batch_first_tick.chain(batch_second_tick)
914    ///   .filter_if_none(some_on_first_tick)
915    ///   .all_ticks()
916    /// # }, |mut stream| async move {
917    /// // [5, 6, 7, 8]
918    /// # for w in vec![5, 6, 7, 8] {
919    /// #     assert_eq!(stream.next().await.unwrap(), w);
920    /// # }
921    /// # }));
922    /// # }
923    /// ```
924    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
925    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
926        self.filter_if(other.is_none())
927    }
928
929    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
930    /// tupled pairs in a non-deterministic order.
931    ///
932    /// # Example
933    /// ```rust
934    /// # #[cfg(feature = "deploy")] {
935    /// # use hydro_lang::prelude::*;
936    /// # use std::collections::HashSet;
937    /// # use futures::StreamExt;
938    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
939    /// let tick = process.tick();
940    /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
941    /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
942    /// stream1.cross_product(stream2)
943    /// # }, |mut stream| async move {
944    /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
945    /// # stream.map(|i| assert!(expected.contains(&i)));
946    /// # }));
947    /// # }
948    /// ```
949    pub fn cross_product<T2, O2: Ordering>(
950        self,
951        other: Stream<T2, L, B, O2, R>,
952    ) -> Stream<(T, T2), L, B, NoOrder, R>
953    where
954        T: Clone,
955        T2: Clone,
956    {
957        check_matching_location(&self.location, &other.location);
958
959        Stream::new(
960            self.location.clone(),
961            HydroNode::CrossProduct {
962                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
963                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
964                metadata: self
965                    .location
966                    .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
967            },
968        )
969    }
970
971    /// Takes one stream as input and filters out any duplicate occurrences. The output
972    /// contains all unique values from the input.
973    ///
974    /// # Example
975    /// ```rust
976    /// # #[cfg(feature = "deploy")] {
977    /// # use hydro_lang::prelude::*;
978    /// # use futures::StreamExt;
979    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
980    /// let tick = process.tick();
981    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
982    /// # }, |mut stream| async move {
983    /// # for w in vec![1, 2, 3, 4] {
984    /// #     assert_eq!(stream.next().await.unwrap(), w);
985    /// # }
986    /// # }));
987    /// # }
988    /// ```
989    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
990    where
991        T: Eq + Hash,
992    {
993        Stream::new(
994            self.location.clone(),
995            HydroNode::Unique {
996                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
997                metadata: self
998                    .location
999                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1000            },
1001        )
1002    }
1003
1004    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1005    ///
1006    /// The `other` stream must be [`Bounded`], since this function will wait until
1007    /// all its elements are available before producing any output.
1008    /// # Example
1009    /// ```rust
1010    /// # #[cfg(feature = "deploy")] {
1011    /// # use hydro_lang::prelude::*;
1012    /// # use futures::StreamExt;
1013    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1014    /// let tick = process.tick();
1015    /// let stream = process
1016    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1017    ///   .batch(&tick, nondet!(/** test */));
1018    /// let batch = process
1019    ///   .source_iter(q!(vec![1, 2]))
1020    ///   .batch(&tick, nondet!(/** test */));
1021    /// stream.filter_not_in(batch).all_ticks()
1022    /// # }, |mut stream| async move {
1023    /// # for w in vec![3, 4] {
1024    /// #     assert_eq!(stream.next().await.unwrap(), w);
1025    /// # }
1026    /// # }));
1027    /// # }
1028    /// ```
1029    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1030    where
1031        T: Eq + Hash,
1032        B2: IsBounded,
1033    {
1034        check_matching_location(&self.location, &other.location);
1035
1036        Stream::new(
1037            self.location.clone(),
1038            HydroNode::Difference {
1039                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1040                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1041                metadata: self
1042                    .location
1043                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1044            },
1045        )
1046    }
1047
1048    /// An operator which allows you to "inspect" each element of a stream without
1049    /// modifying it. The closure `f` is called on a reference to each item. This is
1050    /// mainly useful for debugging, and should not be used to generate side-effects.
1051    ///
1052    /// # Example
1053    /// ```rust
1054    /// # #[cfg(feature = "deploy")] {
1055    /// # use hydro_lang::prelude::*;
1056    /// # use futures::StreamExt;
1057    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1058    /// let nums = process.source_iter(q!(vec![1, 2]));
1059    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1060    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1061    /// # }, |mut stream| async move {
1062    /// # for w in vec![1, 2] {
1063    /// #     assert_eq!(stream.next().await.unwrap(), w);
1064    /// # }
1065    /// # }));
1066    /// # }
1067    /// ```
1068    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1069    where
1070        F: Fn(&T) + 'a,
1071    {
1072        let f = f.splice_fn1_borrow_ctx(&self.location).into();
1073
1074        Stream::new(
1075            self.location.clone(),
1076            HydroNode::Inspect {
1077                f,
1078                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1079                metadata: self.location.new_node_metadata(Self::collection_kind()),
1080            },
1081        )
1082    }
1083
1084    /// Executes the provided closure for every element in this stream.
1085    ///
1086    /// Because the closure may have side effects, the stream must have deterministic order
1087    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1088    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1089    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1090    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1091    where
1092        O: IsOrdered,
1093        R: IsExactlyOnce,
1094    {
1095        let f = f.splice_fn1_ctx(&self.location).into();
1096        self.location
1097            .flow_state()
1098            .borrow_mut()
1099            .push_root(HydroRoot::ForEach {
1100                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1101                f,
1102                op_metadata: HydroIrOpMetadata::new(),
1103            });
1104    }
1105
1106    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1107    /// TCP socket to some other server. You should _not_ use this API for interacting with
1108    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1109    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1110    /// interaction with asynchronous sinks.
1111    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1112    where
1113        O: IsOrdered,
1114        R: IsExactlyOnce,
1115        S: 'a + futures::Sink<T> + Unpin,
1116    {
1117        self.location
1118            .flow_state()
1119            .borrow_mut()
1120            .push_root(HydroRoot::DestSink {
1121                sink: sink.splice_typed_ctx(&self.location).into(),
1122                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1123                op_metadata: HydroIrOpMetadata::new(),
1124            });
1125    }
1126
1127    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1128    ///
1129    /// # Example
1130    /// ```rust
1131    /// # #[cfg(feature = "deploy")] {
1132    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1133    /// # use futures::StreamExt;
1134    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1135    /// let tick = process.tick();
1136    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1137    /// numbers.enumerate()
1138    /// # }, |mut stream| async move {
1139    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1140    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1141    /// #     assert_eq!(stream.next().await.unwrap(), w);
1142    /// # }
1143    /// # }));
1144    /// # }
1145    /// ```
1146    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1147    where
1148        O: IsOrdered,
1149        R: IsExactlyOnce,
1150    {
1151        Stream::new(
1152            self.location.clone(),
1153            HydroNode::Enumerate {
1154                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1155                metadata: self.location.new_node_metadata(Stream::<
1156                    (usize, T),
1157                    L,
1158                    B,
1159                    TotalOrder,
1160                    ExactlyOnce,
1161                >::collection_kind()),
1162            },
1163        )
1164    }
1165
1166    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1167    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1168    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1169    ///
1170    /// Depending on the input stream guarantees, the closure may need to be commutative
1171    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1172    ///
1173    /// # Example
1174    /// ```rust
1175    /// # #[cfg(feature = "deploy")] {
1176    /// # use hydro_lang::prelude::*;
1177    /// # use futures::StreamExt;
1178    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1179    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1180    /// words
1181    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1182    ///     .into_stream()
1183    /// # }, |mut stream| async move {
1184    /// // "HELLOWORLD"
1185    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1186    /// # }));
1187    /// # }
1188    /// ```
1189    pub fn fold<A, I, F, C, Idemp>(
1190        self,
1191        init: impl IntoQuotedMut<'a, I, L>,
1192        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1193    ) -> Singleton<A, L, B>
1194    where
1195        I: Fn() -> A + 'a,
1196        F: Fn(&mut A, T),
1197        C: ValidCommutativityFor<O>,
1198        Idemp: ValidIdempotenceFor<R>,
1199    {
1200        let init = init.splice_fn0_ctx(&self.location).into();
1201        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1202        proof.register_proof(&comb);
1203
1204        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1205        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1206
1207        let core = HydroNode::Fold {
1208            init,
1209            acc: comb.into(),
1210            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1211            metadata: ordered_etc
1212                .location
1213                .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1214        };
1215
1216        Singleton::new(ordered_etc.location.clone(), core)
1217    }
1218
1219    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1220    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1221    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1222    /// reference, so that it can be modified in place.
1223    ///
1224    /// Depending on the input stream guarantees, the closure may need to be commutative
1225    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1226    ///
1227    /// # Example
1228    /// ```rust
1229    /// # #[cfg(feature = "deploy")] {
1230    /// # use hydro_lang::prelude::*;
1231    /// # use futures::StreamExt;
1232    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1233    /// let bools = process.source_iter(q!(vec![false, true, false]));
1234    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1235    /// # }, |mut stream| async move {
1236    /// // true
1237    /// # assert_eq!(stream.next().await.unwrap(), true);
1238    /// # }));
1239    /// # }
1240    /// ```
1241    pub fn reduce<F, C, Idemp>(
1242        self,
1243        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1244    ) -> Optional<T, L, B>
1245    where
1246        F: Fn(&mut T, T) + 'a,
1247        C: ValidCommutativityFor<O>,
1248        Idemp: ValidIdempotenceFor<R>,
1249    {
1250        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1251        proof.register_proof(&f);
1252
1253        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1254        let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1255
1256        let core = HydroNode::Reduce {
1257            f: f.into(),
1258            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1259            metadata: ordered_etc
1260                .location
1261                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1262        };
1263
1264        Optional::new(ordered_etc.location.clone(), core)
1265    }
1266
1267    /// Computes the maximum element in the stream as an [`Optional`], which
1268    /// will be empty until the first element in the input arrives.
1269    ///
1270    /// # Example
1271    /// ```rust
1272    /// # #[cfg(feature = "deploy")] {
1273    /// # use hydro_lang::prelude::*;
1274    /// # use futures::StreamExt;
1275    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1276    /// let tick = process.tick();
1277    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1278    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1279    /// batch.max().all_ticks()
1280    /// # }, |mut stream| async move {
1281    /// // 4
1282    /// # assert_eq!(stream.next().await.unwrap(), 4);
1283    /// # }));
1284    /// # }
1285    /// ```
1286    pub fn max(self) -> Optional<T, L, B>
1287    where
1288        T: Ord,
1289    {
1290        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1291            .assume_ordering_trusted_bounded::<TotalOrder>(
1292                nondet!(/** max is commutative, but order affects intermediates */),
1293            )
1294            .reduce(q!(|curr, new| {
1295                if new > *curr {
1296                    *curr = new;
1297                }
1298            }))
1299    }
1300
1301    /// Computes the minimum element in the stream as an [`Optional`], which
1302    /// will be empty until the first element in the input arrives.
1303    ///
1304    /// # Example
1305    /// ```rust
1306    /// # #[cfg(feature = "deploy")] {
1307    /// # use hydro_lang::prelude::*;
1308    /// # use futures::StreamExt;
1309    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1310    /// let tick = process.tick();
1311    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1312    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1313    /// batch.min().all_ticks()
1314    /// # }, |mut stream| async move {
1315    /// // 1
1316    /// # assert_eq!(stream.next().await.unwrap(), 1);
1317    /// # }));
1318    /// # }
1319    /// ```
1320    pub fn min(self) -> Optional<T, L, B>
1321    where
1322        T: Ord,
1323    {
1324        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1325            .assume_ordering_trusted_bounded::<TotalOrder>(
1326                nondet!(/** max is commutative, but order affects intermediates */),
1327            )
1328            .reduce(q!(|curr, new| {
1329                if new < *curr {
1330                    *curr = new;
1331                }
1332            }))
1333    }
1334
1335    /// Computes the first element in the stream as an [`Optional`], which
1336    /// will be empty until the first element in the input arrives.
1337    ///
1338    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1339    /// re-ordering of elements may cause the first element to change.
1340    ///
1341    /// # Example
1342    /// ```rust
1343    /// # #[cfg(feature = "deploy")] {
1344    /// # use hydro_lang::prelude::*;
1345    /// # use futures::StreamExt;
1346    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1347    /// let tick = process.tick();
1348    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1349    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1350    /// batch.first().all_ticks()
1351    /// # }, |mut stream| async move {
1352    /// // 1
1353    /// # assert_eq!(stream.next().await.unwrap(), 1);
1354    /// # }));
1355    /// # }
1356    /// ```
1357    pub fn first(self) -> Optional<T, L, B>
1358    where
1359        O: IsOrdered,
1360    {
1361        self.make_totally_ordered()
1362            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1363            .reduce(q!(|_, _| {}))
1364    }
1365
1366    /// Computes the last element in the stream as an [`Optional`], which
1367    /// will be empty until an element in the input arrives.
1368    ///
1369    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1370    /// re-ordering of elements may cause the last element to change.
1371    ///
1372    /// # Example
1373    /// ```rust
1374    /// # #[cfg(feature = "deploy")] {
1375    /// # use hydro_lang::prelude::*;
1376    /// # use futures::StreamExt;
1377    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1378    /// let tick = process.tick();
1379    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1380    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1381    /// batch.last().all_ticks()
1382    /// # }, |mut stream| async move {
1383    /// // 4
1384    /// # assert_eq!(stream.next().await.unwrap(), 4);
1385    /// # }));
1386    /// # }
1387    /// ```
1388    pub fn last(self) -> Optional<T, L, B>
1389    where
1390        O: IsOrdered,
1391    {
1392        self.make_totally_ordered()
1393            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1394            .reduce(q!(|curr, new| *curr = new))
1395    }
1396
1397    /// Collects all the elements of this stream into a single [`Vec`] element.
1398    ///
1399    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1400    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1401    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1402    /// the vector at an arbitrary point in time.
1403    ///
1404    /// # Example
1405    /// ```rust
1406    /// # #[cfg(feature = "deploy")] {
1407    /// # use hydro_lang::prelude::*;
1408    /// # use futures::StreamExt;
1409    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1410    /// let tick = process.tick();
1411    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1412    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1413    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1414    /// # }, |mut stream| async move {
1415    /// // [ vec![1, 2, 3, 4] ]
1416    /// # for w in vec![vec![1, 2, 3, 4]] {
1417    /// #     assert_eq!(stream.next().await.unwrap(), w);
1418    /// # }
1419    /// # }));
1420    /// # }
1421    /// ```
1422    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1423    where
1424        O: IsOrdered,
1425        R: IsExactlyOnce,
1426    {
1427        self.make_totally_ordered().make_exactly_once().fold(
1428            q!(|| vec![]),
1429            q!(|acc, v| {
1430                acc.push(v);
1431            }),
1432        )
1433    }
1434
1435    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1436    /// and emitting each intermediate result.
1437    ///
1438    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1439    /// containing all intermediate accumulated values. The scan operation can also terminate early
1440    /// by returning `None`.
1441    ///
1442    /// The function takes a mutable reference to the accumulator and the current element, and returns
1443    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1444    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1445    ///
1446    /// # Examples
1447    ///
1448    /// Basic usage - running sum:
1449    /// ```rust
1450    /// # #[cfg(feature = "deploy")] {
1451    /// # use hydro_lang::prelude::*;
1452    /// # use futures::StreamExt;
1453    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1454    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1455    ///     q!(|| 0),
1456    ///     q!(|acc, x| {
1457    ///         *acc += x;
1458    ///         Some(*acc)
1459    ///     }),
1460    /// )
1461    /// # }, |mut stream| async move {
1462    /// // Output: 1, 3, 6, 10
1463    /// # for w in vec![1, 3, 6, 10] {
1464    /// #     assert_eq!(stream.next().await.unwrap(), w);
1465    /// # }
1466    /// # }));
1467    /// # }
1468    /// ```
1469    ///
1470    /// Early termination example:
1471    /// ```rust
1472    /// # #[cfg(feature = "deploy")] {
1473    /// # use hydro_lang::prelude::*;
1474    /// # use futures::StreamExt;
1475    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1476    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1477    ///     q!(|| 1),
1478    ///     q!(|state, x| {
1479    ///         *state = *state * x;
1480    ///         if *state > 6 {
1481    ///             None // Terminate the stream
1482    ///         } else {
1483    ///             Some(-*state)
1484    ///         }
1485    ///     }),
1486    /// )
1487    /// # }, |mut stream| async move {
1488    /// // Output: -1, -2, -6
1489    /// # for w in vec![-1, -2, -6] {
1490    /// #     assert_eq!(stream.next().await.unwrap(), w);
1491    /// # }
1492    /// # }));
1493    /// # }
1494    /// ```
1495    pub fn scan<A, U, I, F>(
1496        self,
1497        init: impl IntoQuotedMut<'a, I, L>,
1498        f: impl IntoQuotedMut<'a, F, L>,
1499    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1500    where
1501        O: IsOrdered,
1502        R: IsExactlyOnce,
1503        I: Fn() -> A + 'a,
1504        F: Fn(&mut A, T) -> Option<U> + 'a,
1505    {
1506        let init = init.splice_fn0_ctx(&self.location).into();
1507        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1508
1509        Stream::new(
1510            self.location.clone(),
1511            HydroNode::Scan {
1512                init,
1513                acc: f,
1514                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1515                metadata: self.location.new_node_metadata(
1516                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1517                ),
1518            },
1519        )
1520    }
1521
1522    /// Iteratively processes the elements of the stream using a state machine that can yield
1523    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1524    /// syntax in Rust, without requiring special syntax.
1525    ///
1526    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1527    /// state. The second argument defines the processing logic, taking in a mutable reference
1528    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1529    /// variants define what is emitted and whether further inputs should be processed.
1530    ///
1531    /// # Example
1532    /// ```rust
1533    /// # #[cfg(feature = "deploy")] {
1534    /// # use hydro_lang::prelude::*;
1535    /// # use futures::StreamExt;
1536    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1537    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1538    ///     q!(|| 0),
1539    ///     q!(|acc, x| {
1540    ///         *acc += x;
1541    ///         if *acc > 100 {
1542    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1543    ///         } else if *acc % 2 == 0 {
1544    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1545    ///         } else {
1546    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1547    ///         }
1548    ///     }),
1549    /// )
1550    /// # }, |mut stream| async move {
1551    /// // Output: "even", "done!"
1552    /// # let mut results = Vec::new();
1553    /// # for _ in 0..2 {
1554    /// #     results.push(stream.next().await.unwrap());
1555    /// # }
1556    /// # results.sort();
1557    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1558    /// # }));
1559    /// # }
1560    /// ```
1561    pub fn generator<A, U, I, F>(
1562        self,
1563        init: impl IntoQuotedMut<'a, I, L> + Copy,
1564        f: impl IntoQuotedMut<'a, F, L> + Copy,
1565    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1566    where
1567        O: IsOrdered,
1568        R: IsExactlyOnce,
1569        I: Fn() -> A + 'a,
1570        F: Fn(&mut A, T) -> Generate<U> + 'a,
1571    {
1572        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1573        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1574
1575        let this = self.make_totally_ordered().make_exactly_once();
1576
1577        // State is Option<Option<A>>:
1578        //   None = not yet initialized
1579        //   Some(Some(a)) = active with state a
1580        //   Some(None) = terminated
1581        let scan_init = q!(|| None)
1582            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1583            .into();
1584        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1585            if state.is_none() {
1586                *state = Some(Some(init()));
1587            }
1588            match state {
1589                Some(Some(state_value)) => match f(state_value, v) {
1590                    Generate::Yield(out) => Some(Some(out)),
1591                    Generate::Return(out) => {
1592                        *state = Some(None);
1593                        Some(Some(out))
1594                    }
1595                    // Unlike KeyedStream, we can terminate the scan directly on
1596                    // Break/Return because there is only one state (no other keys
1597                    // that still need processing).
1598                    Generate::Break => None,
1599                    Generate::Continue => Some(None),
1600                },
1601                // State is Some(None) after Return; terminate the scan.
1602                _ => None,
1603            }
1604        })
1605        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1606        .into();
1607
1608        let scan_node = HydroNode::Scan {
1609            init: scan_init,
1610            acc: scan_f,
1611            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1612            metadata: this.location.new_node_metadata(Stream::<
1613                Option<U>,
1614                L,
1615                B,
1616                TotalOrder,
1617                ExactlyOnce,
1618            >::collection_kind()),
1619        };
1620
1621        let flatten_f = q!(|d| d)
1622            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1623            .into();
1624        let flatten_node = HydroNode::FlatMap {
1625            f: flatten_f,
1626            input: Box::new(scan_node),
1627            metadata: this
1628                .location
1629                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1630        };
1631
1632        Stream::new(this.location.clone(), flatten_node)
1633    }
1634
1635    /// Given a time interval, returns a stream corresponding to samples taken from the
1636    /// stream roughly at that interval. The output will have elements in the same order
1637    /// as the input, but with arbitrary elements skipped between samples. There is also
1638    /// no guarantee on the exact timing of the samples.
1639    ///
1640    /// # Non-Determinism
1641    /// The output stream is non-deterministic in which elements are sampled, since this
1642    /// is controlled by a clock.
1643    pub fn sample_every(
1644        self,
1645        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1646        nondet: NonDet,
1647    ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1648    where
1649        L: NoTick + NoAtomic,
1650    {
1651        let samples = self.location.source_interval(interval, nondet);
1652
1653        let tick = self.location.tick();
1654        self.batch(&tick, nondet)
1655            .filter_if(samples.batch(&tick, nondet).first().is_some())
1656            .all_ticks()
1657            .weaken_retries()
1658    }
1659
1660    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1661    /// stream has not emitted a value since that duration.
1662    ///
1663    /// # Non-Determinism
1664    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1665    /// samples take place, timeouts may be non-deterministically generated or missed,
1666    /// and the notification of the timeout may be delayed as well. There is also no
1667    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1668    /// detected based on when the next sample is taken.
1669    pub fn timeout(
1670        self,
1671        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1672        nondet: NonDet,
1673    ) -> Optional<(), L, Unbounded>
1674    where
1675        L: NoTick + NoAtomic,
1676    {
1677        let tick = self.location.tick();
1678
1679        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1680            q!(|| None),
1681            q!(
1682                |latest, _| {
1683                    *latest = Some(Instant::now());
1684                },
1685                commutative = manual_proof!(/** TODO */)
1686            ),
1687        );
1688
1689        latest_received
1690            .snapshot(&tick, nondet)
1691            .filter_map(q!(move |latest_received| {
1692                if let Some(latest_received) = latest_received {
1693                    if Instant::now().duration_since(latest_received) > duration {
1694                        Some(())
1695                    } else {
1696                        None
1697                    }
1698                } else {
1699                    Some(())
1700                }
1701            }))
1702            .latest()
1703    }
1704
1705    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1706    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1707    ///
1708    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1709    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1710    /// argument that declares where the stream will be atomically processed. Batching a stream into
1711    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1712    /// [`Tick`] will introduce asynchrony.
1713    pub fn atomic(self, tick: &Tick<L>) -> Stream<T, Atomic<L>, B, O, R> {
1714        let out_location = Atomic { tick: tick.clone() };
1715        Stream::new(
1716            out_location.clone(),
1717            HydroNode::BeginAtomic {
1718                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1719                metadata: out_location
1720                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1721            },
1722        )
1723    }
1724
1725    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1726    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1727    /// the order of the input. The output stream will execute in the [`Tick`] that was
1728    /// used to create the atomic section.
1729    ///
1730    /// # Non-Determinism
1731    /// The batch boundaries are non-deterministic and may change across executions.
1732    pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1733        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1734        Stream::new(
1735            tick.clone(),
1736            HydroNode::Batch {
1737                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1738                metadata: tick
1739                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1740            },
1741        )
1742    }
1743
1744    /// An operator which allows you to "name" a `HydroNode`.
1745    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1746    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1747        {
1748            let mut node = self.ir_node.borrow_mut();
1749            let metadata = node.metadata_mut();
1750            metadata.tag = Some(name.to_owned());
1751        }
1752        self
1753    }
1754
1755    /// Explicitly "casts" the stream to a type with a different ordering
1756    /// guarantee. Useful in unsafe code where the ordering cannot be proven
1757    /// by the type-system.
1758    ///
1759    /// # Non-Determinism
1760    /// This function is used as an escape hatch, and any mistakes in the
1761    /// provided ordering guarantee will propagate into the guarantees
1762    /// for the rest of the program.
1763    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1764        if O::ORDERING_KIND == O2::ORDERING_KIND {
1765            Stream::new(
1766                self.location.clone(),
1767                self.ir_node.replace(HydroNode::Placeholder),
1768            )
1769        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1770            // We can always weaken the ordering guarantee
1771            Stream::new(
1772                self.location.clone(),
1773                HydroNode::Cast {
1774                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1775                    metadata: self
1776                        .location
1777                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1778                },
1779            )
1780        } else {
1781            Stream::new(
1782                self.location.clone(),
1783                HydroNode::ObserveNonDet {
1784                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1785                    trusted: false,
1786                    metadata: self
1787                        .location
1788                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1789                },
1790            )
1791        }
1792    }
1793
1794    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1795    // intermediate states will not be revealed
1796    fn assume_ordering_trusted_bounded<O2: Ordering>(
1797        self,
1798        nondet: NonDet,
1799    ) -> Stream<T, L, B, O2, R> {
1800        if B::BOUNDED {
1801            self.assume_ordering_trusted(nondet)
1802        } else {
1803            self.assume_ordering(nondet)
1804        }
1805    }
1806
1807    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1808    // is not observable
1809    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1810        self,
1811        _nondet: NonDet,
1812    ) -> Stream<T, L, B, O2, R> {
1813        if O::ORDERING_KIND == O2::ORDERING_KIND {
1814            Stream::new(
1815                self.location.clone(),
1816                self.ir_node.replace(HydroNode::Placeholder),
1817            )
1818        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1819            // We can always weaken the ordering guarantee
1820            Stream::new(
1821                self.location.clone(),
1822                HydroNode::Cast {
1823                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1824                    metadata: self
1825                        .location
1826                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1827                },
1828            )
1829        } else {
1830            Stream::new(
1831                self.location.clone(),
1832                HydroNode::ObserveNonDet {
1833                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1834                    trusted: true,
1835                    metadata: self
1836                        .location
1837                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1838                },
1839            )
1840        }
1841    }
1842
1843    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1844    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1845    /// which is always safe because that is the weakest possible guarantee.
1846    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1847        self.weaken_ordering::<NoOrder>()
1848    }
1849
1850    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1851    /// enforcing that `O2` is weaker than the input ordering guarantee.
1852    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1853        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1854        self.assume_ordering::<O2>(nondet)
1855    }
1856
1857    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1858    /// implies that `O == TotalOrder`.
1859    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1860    where
1861        O: IsOrdered,
1862    {
1863        self.assume_ordering(nondet!(/** no-op */))
1864    }
1865
1866    /// Explicitly "casts" the stream to a type with a different retries
1867    /// guarantee. Useful in unsafe code where the lack of retries cannot
1868    /// be proven by the type-system.
1869    ///
1870    /// # Non-Determinism
1871    /// This function is used as an escape hatch, and any mistakes in the
1872    /// provided retries guarantee will propagate into the guarantees
1873    /// for the rest of the program.
1874    pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1875        if R::RETRIES_KIND == R2::RETRIES_KIND {
1876            Stream::new(
1877                self.location.clone(),
1878                self.ir_node.replace(HydroNode::Placeholder),
1879            )
1880        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1881            // We can always weaken the retries guarantee
1882            Stream::new(
1883                self.location.clone(),
1884                HydroNode::Cast {
1885                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1886                    metadata: self
1887                        .location
1888                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1889                },
1890            )
1891        } else {
1892            Stream::new(
1893                self.location.clone(),
1894                HydroNode::ObserveNonDet {
1895                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1896                    trusted: false,
1897                    metadata: self
1898                        .location
1899                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1900                },
1901            )
1902        }
1903    }
1904
1905    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1906    // is not observable
1907    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1908        if R::RETRIES_KIND == R2::RETRIES_KIND {
1909            Stream::new(
1910                self.location.clone(),
1911                self.ir_node.replace(HydroNode::Placeholder),
1912            )
1913        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1914            // We can always weaken the retries guarantee
1915            Stream::new(
1916                self.location.clone(),
1917                HydroNode::Cast {
1918                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1919                    metadata: self
1920                        .location
1921                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1922                },
1923            )
1924        } else {
1925            Stream::new(
1926                self.location.clone(),
1927                HydroNode::ObserveNonDet {
1928                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1929                    trusted: true,
1930                    metadata: self
1931                        .location
1932                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1933                },
1934            )
1935        }
1936    }
1937
1938    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
1939    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1940    /// which is always safe because that is the weakest possible guarantee.
1941    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1942        self.weaken_retries::<AtLeastOnce>()
1943    }
1944
1945    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1946    /// enforcing that `R2` is weaker than the input retries guarantee.
1947    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
1948        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1949        self.assume_retries::<R2>(nondet)
1950    }
1951
1952    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
1953    /// implies that `R == ExactlyOnce`.
1954    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
1955    where
1956        R: IsExactlyOnce,
1957    {
1958        self.assume_retries(nondet!(/** no-op */))
1959    }
1960
1961    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1962    /// implies that `B == Bounded`.
1963    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
1964    where
1965        B: IsBounded,
1966    {
1967        Stream::new(
1968            self.location.clone(),
1969            self.ir_node.replace(HydroNode::Placeholder),
1970        )
1971    }
1972}
1973
1974impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1975where
1976    L: Location<'a>,
1977{
1978    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1979    ///
1980    /// # Example
1981    /// ```rust
1982    /// # #[cfg(feature = "deploy")] {
1983    /// # use hydro_lang::prelude::*;
1984    /// # use futures::StreamExt;
1985    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1986    /// process.source_iter(q!(&[1, 2, 3])).cloned()
1987    /// # }, |mut stream| async move {
1988    /// // 1, 2, 3
1989    /// # for w in vec![1, 2, 3] {
1990    /// #     assert_eq!(stream.next().await.unwrap(), w);
1991    /// # }
1992    /// # }));
1993    /// # }
1994    /// ```
1995    pub fn cloned(self) -> Stream<T, L, B, O, R>
1996    where
1997        T: Clone,
1998    {
1999        self.map(q!(|d| d.clone()))
2000    }
2001}
2002
2003impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2004where
2005    L: Location<'a>,
2006{
2007    /// Computes the number of elements in the stream as a [`Singleton`].
2008    ///
2009    /// # Example
2010    /// ```rust
2011    /// # #[cfg(feature = "deploy")] {
2012    /// # use hydro_lang::prelude::*;
2013    /// # use futures::StreamExt;
2014    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2015    /// let tick = process.tick();
2016    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2017    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2018    /// batch.count().all_ticks()
2019    /// # }, |mut stream| async move {
2020    /// // 4
2021    /// # assert_eq!(stream.next().await.unwrap(), 4);
2022    /// # }));
2023    /// # }
2024    /// ```
2025    pub fn count(self) -> Singleton<usize, L, B> {
2026        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2027            /// Order does not affect eventual count, and also does not affect intermediate states.
2028        ))
2029        .fold(q!(|| 0usize), q!(|count, _| *count += 1))
2030    }
2031}
2032
2033impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2034    /// Produces a new stream that interleaves the elements of the two input streams.
2035    /// The result has [`NoOrder`] because the order of interleaving is not guaranteed.
2036    ///
2037    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2038    /// [`Bounded`], you can use [`Stream::chain`] instead.
2039    ///
2040    /// # Example
2041    /// ```rust
2042    /// # #[cfg(feature = "deploy")] {
2043    /// # use hydro_lang::prelude::*;
2044    /// # use futures::StreamExt;
2045    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2046    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2047    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2048    /// numbers.clone().map(q!(|x| x + 1)).interleave(numbers)
2049    /// # }, |mut stream| async move {
2050    /// // 2, 3, 4, 5, and 1, 2, 3, 4 interleaved in unknown order
2051    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2052    /// #     assert_eq!(stream.next().await.unwrap(), w);
2053    /// # }
2054    /// # }));
2055    /// # }
2056    /// ```
2057    pub fn interleave<O2: Ordering, R2: Retries>(
2058        self,
2059        other: Stream<T, L, Unbounded, O2, R2>,
2060    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2061    where
2062        R: MinRetries<R2>,
2063    {
2064        Stream::new(
2065            self.location.clone(),
2066            HydroNode::Chain {
2067                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2068                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2069                metadata: self.location.new_node_metadata(Stream::<
2070                    T,
2071                    L,
2072                    Unbounded,
2073                    NoOrder,
2074                    <R as MinRetries<R2>>::Min,
2075                >::collection_kind()),
2076            },
2077        )
2078    }
2079}
2080
2081impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
2082    /// Produces a new stream that combines the elements of the two input streams,
2083    /// preserving the relative order of elements within each input.
2084    ///
2085    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2086    /// [`Bounded`], you can use [`Stream::chain`] instead.
2087    ///
2088    /// # Non-Determinism
2089    /// The order in which elements *across* the two streams will be interleaved is
2090    /// non-deterministic, so the order of elements will vary across runs. If the output order
2091    /// is irrelevant, use [`Stream::interleave`] instead, which is deterministic but emits an
2092    /// unordered stream.
2093    ///
2094    /// # Example
2095    /// ```rust
2096    /// # #[cfg(feature = "deploy")] {
2097    /// # use hydro_lang::prelude::*;
2098    /// # use futures::StreamExt;
2099    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2100    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2101    /// # process.source_iter(q!(vec![1, 3])).into();
2102    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2103    /// # }, |mut stream| async move {
2104    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2105    /// # for w in vec![1, 3, 2, 4] {
2106    /// #     assert_eq!(stream.next().await.unwrap(), w);
2107    /// # }
2108    /// # }));
2109    /// # }
2110    /// ```
2111    pub fn merge_ordered<R2: Retries>(
2112        self,
2113        other: Stream<T, L, Unbounded, TotalOrder, R2>,
2114        _nondet: NonDet,
2115    ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
2116    where
2117        R: MinRetries<R2>,
2118    {
2119        Stream::new(
2120            self.location.clone(),
2121            HydroNode::Chain {
2122                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2123                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2124                metadata: self.location.new_node_metadata(Stream::<
2125                    T,
2126                    L,
2127                    Unbounded,
2128                    TotalOrder,
2129                    <R as MinRetries<R2>>::Min,
2130                >::collection_kind()),
2131            },
2132        )
2133    }
2134}
2135
2136impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2137where
2138    L: Location<'a>,
2139{
2140    /// Produces a new stream that emits the input elements in sorted order.
2141    ///
2142    /// The input stream can have any ordering guarantee, but the output stream
2143    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2144    /// elements in the input stream are available, so it requires the input stream
2145    /// to be [`Bounded`].
2146    ///
2147    /// # Example
2148    /// ```rust
2149    /// # #[cfg(feature = "deploy")] {
2150    /// # use hydro_lang::prelude::*;
2151    /// # use futures::StreamExt;
2152    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2153    /// let tick = process.tick();
2154    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2155    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2156    /// batch.sort().all_ticks()
2157    /// # }, |mut stream| async move {
2158    /// // 1, 2, 3, 4
2159    /// # for w in (1..5) {
2160    /// #     assert_eq!(stream.next().await.unwrap(), w);
2161    /// # }
2162    /// # }));
2163    /// # }
2164    /// ```
2165    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2166    where
2167        B: IsBounded,
2168        T: Ord,
2169    {
2170        let this = self.make_bounded();
2171        Stream::new(
2172            this.location.clone(),
2173            HydroNode::Sort {
2174                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2175                metadata: this
2176                    .location
2177                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2178            },
2179        )
2180    }
2181
2182    /// Produces a new stream that first emits the elements of the `self` stream,
2183    /// and then emits the elements of the `other` stream. The output stream has
2184    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2185    /// [`TotalOrder`] guarantee.
2186    ///
2187    /// Currently, both input streams must be [`Bounded`]. This operator will block
2188    /// on the first stream until all its elements are available. In a future version,
2189    /// we will relax the requirement on the `other` stream.
2190    ///
2191    /// # Example
2192    /// ```rust
2193    /// # #[cfg(feature = "deploy")] {
2194    /// # use hydro_lang::prelude::*;
2195    /// # use futures::StreamExt;
2196    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2197    /// let tick = process.tick();
2198    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2199    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2200    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2201    /// # }, |mut stream| async move {
2202    /// // 2, 3, 4, 5, 1, 2, 3, 4
2203    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2204    /// #     assert_eq!(stream.next().await.unwrap(), w);
2205    /// # }
2206    /// # }));
2207    /// # }
2208    /// ```
2209    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2210        self,
2211        other: Stream<T, L, B2, O2, R2>,
2212    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2213    where
2214        B: IsBounded,
2215        O: MinOrder<O2>,
2216        R: MinRetries<R2>,
2217    {
2218        check_matching_location(&self.location, &other.location);
2219
2220        Stream::new(
2221            self.location.clone(),
2222            HydroNode::Chain {
2223                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2224                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2225                metadata: self.location.new_node_metadata(Stream::<
2226                    T,
2227                    L,
2228                    B2,
2229                    <O as MinOrder<O2>>::Min,
2230                    <R as MinRetries<R2>>::Min,
2231                >::collection_kind()),
2232            },
2233        )
2234    }
2235
2236    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2237    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2238    /// because this is compiled into a nested loop.
2239    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2240        self,
2241        other: Stream<T2, L, Bounded, O2, R>,
2242    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2243    where
2244        B: IsBounded,
2245        T: Clone,
2246        T2: Clone,
2247    {
2248        let this = self.make_bounded();
2249        check_matching_location(&this.location, &other.location);
2250
2251        Stream::new(
2252            this.location.clone(),
2253            HydroNode::CrossProduct {
2254                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2255                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2256                metadata: this.location.new_node_metadata(Stream::<
2257                    (T, T2),
2258                    L,
2259                    Bounded,
2260                    <O2 as MinOrder<O>>::Min,
2261                    R,
2262                >::collection_kind()),
2263            },
2264        )
2265    }
2266
2267    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2268    /// `self` used as the values for *each* key.
2269    ///
2270    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2271    /// values. For example, it can be used to send the same set of elements to several cluster
2272    /// members, if the membership information is available as a [`KeyedSingleton`].
2273    ///
2274    /// # Example
2275    /// ```rust
2276    /// # #[cfg(feature = "deploy")] {
2277    /// # use hydro_lang::prelude::*;
2278    /// # use futures::StreamExt;
2279    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2280    /// # let tick = process.tick();
2281    /// let keyed_singleton = // { 1: (), 2: () }
2282    /// # process
2283    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2284    /// #     .into_keyed()
2285    /// #     .batch(&tick, nondet!(/** test */))
2286    /// #     .first();
2287    /// let stream = // [ "a", "b" ]
2288    /// # process
2289    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2290    /// #     .batch(&tick, nondet!(/** test */));
2291    /// stream.repeat_with_keys(keyed_singleton)
2292    /// # .entries().all_ticks()
2293    /// # }, |mut stream| async move {
2294    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2295    /// # let mut results = Vec::new();
2296    /// # for _ in 0..4 {
2297    /// #     results.push(stream.next().await.unwrap());
2298    /// # }
2299    /// # results.sort();
2300    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2301    /// # }));
2302    /// # }
2303    /// ```
2304    pub fn repeat_with_keys<K, V2>(
2305        self,
2306        keys: KeyedSingleton<K, V2, L, Bounded>,
2307    ) -> KeyedStream<K, T, L, Bounded, O, R>
2308    where
2309        B: IsBounded,
2310        K: Clone,
2311        T: Clone,
2312    {
2313        keys.keys()
2314            .weaken_retries()
2315            .assume_ordering_trusted::<TotalOrder>(
2316                nondet!(/** keyed stream does not depend on ordering of keys */),
2317            )
2318            .cross_product_nested_loop(self.make_bounded())
2319            .into_keyed()
2320    }
2321}
2322
2323impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2324where
2325    L: Location<'a>,
2326{
2327    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2328    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2329    /// by equi-joining the two streams on the key attribute `K`.
2330    ///
2331    /// # Example
2332    /// ```rust
2333    /// # #[cfg(feature = "deploy")] {
2334    /// # use hydro_lang::prelude::*;
2335    /// # use std::collections::HashSet;
2336    /// # use futures::StreamExt;
2337    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2338    /// let tick = process.tick();
2339    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2340    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2341    /// stream1.join(stream2)
2342    /// # }, |mut stream| async move {
2343    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2344    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2345    /// # stream.map(|i| assert!(expected.contains(&i)));
2346    /// # }));
2347    /// # }
2348    pub fn join<V2, O2: Ordering, R2: Retries>(
2349        self,
2350        n: Stream<(K, V2), L, B, O2, R2>,
2351    ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2352    where
2353        K: Eq + Hash,
2354        R: MinRetries<R2>,
2355    {
2356        check_matching_location(&self.location, &n.location);
2357
2358        Stream::new(
2359            self.location.clone(),
2360            HydroNode::Join {
2361                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2362                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2363                metadata: self.location.new_node_metadata(Stream::<
2364                    (K, (V1, V2)),
2365                    L,
2366                    B,
2367                    NoOrder,
2368                    <R as MinRetries<R2>>::Min,
2369                >::collection_kind()),
2370            },
2371        )
2372    }
2373
2374    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2375    /// computes the anti-join of the items in the input -- i.e. returns
2376    /// unique items in the first input that do not have a matching key
2377    /// in the second input.
2378    ///
2379    /// # Example
2380    /// ```rust
2381    /// # #[cfg(feature = "deploy")] {
2382    /// # use hydro_lang::prelude::*;
2383    /// # use futures::StreamExt;
2384    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2385    /// let tick = process.tick();
2386    /// let stream = process
2387    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2388    ///   .batch(&tick, nondet!(/** test */));
2389    /// let batch = process
2390    ///   .source_iter(q!(vec![1, 2]))
2391    ///   .batch(&tick, nondet!(/** test */));
2392    /// stream.anti_join(batch).all_ticks()
2393    /// # }, |mut stream| async move {
2394    /// # for w in vec![(3, 'c'), (4, 'd')] {
2395    /// #     assert_eq!(stream.next().await.unwrap(), w);
2396    /// # }
2397    /// # }));
2398    /// # }
2399    pub fn anti_join<O2: Ordering, R2: Retries>(
2400        self,
2401        n: Stream<K, L, Bounded, O2, R2>,
2402    ) -> Stream<(K, V1), L, B, O, R>
2403    where
2404        K: Eq + Hash,
2405    {
2406        check_matching_location(&self.location, &n.location);
2407
2408        Stream::new(
2409            self.location.clone(),
2410            HydroNode::AntiJoin {
2411                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2412                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2413                metadata: self
2414                    .location
2415                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2416            },
2417        )
2418    }
2419}
2420
2421impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2422    Stream<(K, V), L, B, O, R>
2423{
2424    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2425    /// is used as the key and the second element is added to the entries associated with that key.
2426    ///
2427    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2428    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2429    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2430    /// total ordering _within_ each group but no ordering _across_ groups.
2431    ///
2432    /// # Example
2433    /// ```rust
2434    /// # #[cfg(feature = "deploy")] {
2435    /// # use hydro_lang::prelude::*;
2436    /// # use futures::StreamExt;
2437    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2438    /// process
2439    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2440    ///     .into_keyed()
2441    /// #   .entries()
2442    /// # }, |mut stream| async move {
2443    /// // { 1: [2, 3], 2: [4] }
2444    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2445    /// #     assert_eq!(stream.next().await.unwrap(), w);
2446    /// # }
2447    /// # }));
2448    /// # }
2449    /// ```
2450    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2451        KeyedStream::new(
2452            self.location.clone(),
2453            HydroNode::Cast {
2454                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2455                metadata: self
2456                    .location
2457                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2458            },
2459        )
2460    }
2461}
2462
2463impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2464where
2465    K: Eq + Hash,
2466    L: Location<'a>,
2467{
2468    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2469    /// # Example
2470    /// ```rust
2471    /// # #[cfg(feature = "deploy")] {
2472    /// # use hydro_lang::prelude::*;
2473    /// # use futures::StreamExt;
2474    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2475    /// let tick = process.tick();
2476    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2477    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2478    /// batch.keys().all_ticks()
2479    /// # }, |mut stream| async move {
2480    /// // 1, 2
2481    /// # assert_eq!(stream.next().await.unwrap(), 1);
2482    /// # assert_eq!(stream.next().await.unwrap(), 2);
2483    /// # }));
2484    /// # }
2485    /// ```
2486    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2487        self.into_keyed()
2488            .fold(
2489                q!(|| ()),
2490                q!(
2491                    |_, _| {},
2492                    commutative = manual_proof!(/** values are ignored */),
2493                    idempotent = manual_proof!(/** values are ignored */)
2494                ),
2495            )
2496            .keys()
2497    }
2498}
2499
2500impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2501where
2502    L: Location<'a> + NoTick,
2503{
2504    /// Returns a stream corresponding to the latest batch of elements being atomically
2505    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2506    /// the order of the input.
2507    ///
2508    /// # Non-Determinism
2509    /// The batch boundaries are non-deterministic and may change across executions.
2510    pub fn batch_atomic(self, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
2511        Stream::new(
2512            self.location.clone().tick,
2513            HydroNode::Batch {
2514                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2515                metadata: self
2516                    .location
2517                    .tick
2518                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2519            },
2520        )
2521    }
2522
2523    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2524    /// See [`Stream::atomic`] for more details.
2525    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2526        Stream::new(
2527            self.location.tick.l.clone(),
2528            HydroNode::EndAtomic {
2529                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2530                metadata: self
2531                    .location
2532                    .tick
2533                    .l
2534                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2535            },
2536        )
2537    }
2538}
2539
2540impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2541where
2542    L: Location<'a> + NoTick + NoAtomic,
2543    F: Future<Output = T>,
2544{
2545    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2546    /// Future outputs are produced as available, regardless of input arrival order.
2547    ///
2548    /// # Example
2549    /// ```rust
2550    /// # #[cfg(feature = "deploy")] {
2551    /// # use std::collections::HashSet;
2552    /// # use futures::StreamExt;
2553    /// # use hydro_lang::prelude::*;
2554    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2555    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2556    ///     .map(q!(|x| async move {
2557    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2558    ///         x
2559    ///     }))
2560    ///     .resolve_futures()
2561    /// #   },
2562    /// #   |mut stream| async move {
2563    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2564    /// #       let mut output = HashSet::new();
2565    /// #       for _ in 1..10 {
2566    /// #           output.insert(stream.next().await.unwrap());
2567    /// #       }
2568    /// #       assert_eq!(
2569    /// #           output,
2570    /// #           HashSet::<i32>::from_iter(1..10)
2571    /// #       );
2572    /// #   },
2573    /// # ));
2574    /// # }
2575    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2576        Stream::new(
2577            self.location.clone(),
2578            HydroNode::ResolveFutures {
2579                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2580                metadata: self
2581                    .location
2582                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2583            },
2584        )
2585    }
2586
2587    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2588    /// Future outputs are produced in the same order as the input stream.
2589    ///
2590    /// # Example
2591    /// ```rust
2592    /// # #[cfg(feature = "deploy")] {
2593    /// # use std::collections::HashSet;
2594    /// # use futures::StreamExt;
2595    /// # use hydro_lang::prelude::*;
2596    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2597    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2598    ///     .map(q!(|x| async move {
2599    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2600    ///         x
2601    ///     }))
2602    ///     .resolve_futures_ordered()
2603    /// #   },
2604    /// #   |mut stream| async move {
2605    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2606    /// #       let mut output = Vec::new();
2607    /// #       for _ in 1..10 {
2608    /// #           output.push(stream.next().await.unwrap());
2609    /// #       }
2610    /// #       assert_eq!(
2611    /// #           output,
2612    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2613    /// #       );
2614    /// #   },
2615    /// # ));
2616    /// # }
2617    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2618        Stream::new(
2619            self.location.clone(),
2620            HydroNode::ResolveFuturesOrdered {
2621                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2622                metadata: self
2623                    .location
2624                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2625            },
2626        )
2627    }
2628}
2629
2630impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2631where
2632    L: Location<'a>,
2633{
2634    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2635    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2636    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2637        Stream::new(
2638            self.location.outer().clone(),
2639            HydroNode::YieldConcat {
2640                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2641                metadata: self
2642                    .location
2643                    .outer()
2644                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2645            },
2646        )
2647    }
2648
2649    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2650    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2651    ///
2652    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2653    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2654    /// stream's [`Tick`] context.
2655    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2656        let out_location = Atomic {
2657            tick: self.location.clone(),
2658        };
2659
2660        Stream::new(
2661            out_location.clone(),
2662            HydroNode::YieldConcat {
2663                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2664                metadata: out_location
2665                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2666            },
2667        )
2668    }
2669
2670    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2671    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2672    /// input.
2673    ///
2674    /// This API is particularly useful for stateful computation on batches of data, such as
2675    /// maintaining an accumulated state that is up to date with the current batch.
2676    ///
2677    /// # Example
2678    /// ```rust
2679    /// # #[cfg(feature = "deploy")] {
2680    /// # use hydro_lang::prelude::*;
2681    /// # use futures::StreamExt;
2682    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2683    /// let tick = process.tick();
2684    /// # // ticks are lazy by default, forces the second tick to run
2685    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2686    /// # let batch_first_tick = process
2687    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
2688    /// #  .batch(&tick, nondet!(/** test */));
2689    /// # let batch_second_tick = process
2690    /// #   .source_iter(q!(vec![5, 6, 7]))
2691    /// #   .batch(&tick, nondet!(/** test */))
2692    /// #   .defer_tick(); // appears on the second tick
2693    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2694    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2695    ///
2696    /// input.batch(&tick, nondet!(/** test */))
2697    ///     .across_ticks(|s| s.count()).all_ticks()
2698    /// # }, |mut stream| async move {
2699    /// // [4, 7]
2700    /// assert_eq!(stream.next().await.unwrap(), 4);
2701    /// assert_eq!(stream.next().await.unwrap(), 7);
2702    /// # }));
2703    /// # }
2704    /// ```
2705    pub fn across_ticks<Out: BatchAtomic>(
2706        self,
2707        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2708    ) -> Out::Batched {
2709        thunk(self.all_ticks_atomic()).batched_atomic()
2710    }
2711
2712    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2713    /// always has the elements of `self` at tick `T - 1`.
2714    ///
2715    /// At tick `0`, the output stream is empty, since there is no previous tick.
2716    ///
2717    /// This operator enables stateful iterative processing with ticks, by sending data from one
2718    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2719    ///
2720    /// # Example
2721    /// ```rust
2722    /// # #[cfg(feature = "deploy")] {
2723    /// # use hydro_lang::prelude::*;
2724    /// # use futures::StreamExt;
2725    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2726    /// let tick = process.tick();
2727    /// // ticks are lazy by default, forces the second tick to run
2728    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2729    ///
2730    /// let batch_first_tick = process
2731    ///   .source_iter(q!(vec![1, 2, 3, 4]))
2732    ///   .batch(&tick, nondet!(/** test */));
2733    /// let batch_second_tick = process
2734    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
2735    ///   .batch(&tick, nondet!(/** test */))
2736    ///   .defer_tick(); // appears on the second tick
2737    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2738    ///
2739    /// changes_across_ticks.clone().filter_not_in(
2740    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
2741    /// ).all_ticks()
2742    /// # }, |mut stream| async move {
2743    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2744    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2745    /// #     assert_eq!(stream.next().await.unwrap(), w);
2746    /// # }
2747    /// # }));
2748    /// # }
2749    /// ```
2750    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2751        Stream::new(
2752            self.location.clone(),
2753            HydroNode::DeferTick {
2754                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2755                metadata: self
2756                    .location
2757                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2758            },
2759        )
2760    }
2761}
2762
2763#[cfg(test)]
2764mod tests {
2765    #[cfg(feature = "deploy")]
2766    use futures::{SinkExt, StreamExt};
2767    #[cfg(feature = "deploy")]
2768    use hydro_deploy::Deployment;
2769    #[cfg(feature = "deploy")]
2770    use serde::{Deserialize, Serialize};
2771    #[cfg(any(feature = "deploy", feature = "sim"))]
2772    use stageleft::q;
2773
2774    #[cfg(any(feature = "deploy", feature = "sim"))]
2775    use crate::compile::builder::FlowBuilder;
2776    #[cfg(feature = "deploy")]
2777    use crate::live_collections::sliced::sliced;
2778    #[cfg(feature = "deploy")]
2779    use crate::live_collections::stream::ExactlyOnce;
2780    #[cfg(feature = "sim")]
2781    use crate::live_collections::stream::NoOrder;
2782    #[cfg(any(feature = "deploy", feature = "sim"))]
2783    use crate::live_collections::stream::TotalOrder;
2784    #[cfg(any(feature = "deploy", feature = "sim"))]
2785    use crate::location::Location;
2786    #[cfg(any(feature = "deploy", feature = "sim"))]
2787    use crate::nondet::nondet;
2788
2789    mod backtrace_chained_ops;
2790
2791    #[cfg(feature = "deploy")]
2792    struct P1 {}
2793    #[cfg(feature = "deploy")]
2794    struct P2 {}
2795
2796    #[cfg(feature = "deploy")]
2797    #[derive(Serialize, Deserialize, Debug)]
2798    struct SendOverNetwork {
2799        n: u32,
2800    }
2801
2802    #[cfg(feature = "deploy")]
2803    #[tokio::test]
2804    async fn first_ten_distributed() {
2805        use crate::networking::TCP;
2806
2807        let mut deployment = Deployment::new();
2808
2809        let mut flow = FlowBuilder::new();
2810        let first_node = flow.process::<P1>();
2811        let second_node = flow.process::<P2>();
2812        let external = flow.external::<P2>();
2813
2814        let numbers = first_node.source_iter(q!(0..10));
2815        let out_port = numbers
2816            .map(q!(|n| SendOverNetwork { n }))
2817            .send(&second_node, TCP.fail_stop().bincode())
2818            .send_bincode_external(&external);
2819
2820        let nodes = flow
2821            .with_process(&first_node, deployment.Localhost())
2822            .with_process(&second_node, deployment.Localhost())
2823            .with_external(&external, deployment.Localhost())
2824            .deploy(&mut deployment);
2825
2826        deployment.deploy().await.unwrap();
2827
2828        let mut external_out = nodes.connect(out_port).await;
2829
2830        deployment.start().await.unwrap();
2831
2832        for i in 0..10 {
2833            assert_eq!(external_out.next().await.unwrap().n, i);
2834        }
2835    }
2836
2837    #[cfg(feature = "deploy")]
2838    #[tokio::test]
2839    async fn first_cardinality() {
2840        let mut deployment = Deployment::new();
2841
2842        let mut flow = FlowBuilder::new();
2843        let node = flow.process::<()>();
2844        let external = flow.external::<()>();
2845
2846        let node_tick = node.tick();
2847        let count = node_tick
2848            .singleton(q!([1, 2, 3]))
2849            .into_stream()
2850            .flatten_ordered()
2851            .first()
2852            .into_stream()
2853            .count()
2854            .all_ticks()
2855            .send_bincode_external(&external);
2856
2857        let nodes = flow
2858            .with_process(&node, deployment.Localhost())
2859            .with_external(&external, deployment.Localhost())
2860            .deploy(&mut deployment);
2861
2862        deployment.deploy().await.unwrap();
2863
2864        let mut external_out = nodes.connect(count).await;
2865
2866        deployment.start().await.unwrap();
2867
2868        assert_eq!(external_out.next().await.unwrap(), 1);
2869    }
2870
2871    #[cfg(feature = "deploy")]
2872    #[tokio::test]
2873    async fn unbounded_reduce_remembers_state() {
2874        let mut deployment = Deployment::new();
2875
2876        let mut flow = FlowBuilder::new();
2877        let node = flow.process::<()>();
2878        let external = flow.external::<()>();
2879
2880        let (input_port, input) = node.source_external_bincode(&external);
2881        let out = input
2882            .reduce(q!(|acc, v| *acc += v))
2883            .sample_eager(nondet!(/** test */))
2884            .send_bincode_external(&external);
2885
2886        let nodes = flow
2887            .with_process(&node, deployment.Localhost())
2888            .with_external(&external, deployment.Localhost())
2889            .deploy(&mut deployment);
2890
2891        deployment.deploy().await.unwrap();
2892
2893        let mut external_in = nodes.connect(input_port).await;
2894        let mut external_out = nodes.connect(out).await;
2895
2896        deployment.start().await.unwrap();
2897
2898        external_in.send(1).await.unwrap();
2899        assert_eq!(external_out.next().await.unwrap(), 1);
2900
2901        external_in.send(2).await.unwrap();
2902        assert_eq!(external_out.next().await.unwrap(), 3);
2903    }
2904
2905    #[cfg(feature = "deploy")]
2906    #[tokio::test]
2907    async fn top_level_bounded_cross_singleton() {
2908        let mut deployment = Deployment::new();
2909
2910        let mut flow = FlowBuilder::new();
2911        let node = flow.process::<()>();
2912        let external = flow.external::<()>();
2913
2914        let (input_port, input) =
2915            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2916
2917        let out = input
2918            .cross_singleton(
2919                node.source_iter(q!(vec![1, 2, 3]))
2920                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
2921            )
2922            .send_bincode_external(&external);
2923
2924        let nodes = flow
2925            .with_process(&node, deployment.Localhost())
2926            .with_external(&external, deployment.Localhost())
2927            .deploy(&mut deployment);
2928
2929        deployment.deploy().await.unwrap();
2930
2931        let mut external_in = nodes.connect(input_port).await;
2932        let mut external_out = nodes.connect(out).await;
2933
2934        deployment.start().await.unwrap();
2935
2936        external_in.send(1).await.unwrap();
2937        assert_eq!(external_out.next().await.unwrap(), (1, 6));
2938
2939        external_in.send(2).await.unwrap();
2940        assert_eq!(external_out.next().await.unwrap(), (2, 6));
2941    }
2942
2943    #[cfg(feature = "deploy")]
2944    #[tokio::test]
2945    async fn top_level_bounded_reduce_cardinality() {
2946        let mut deployment = Deployment::new();
2947
2948        let mut flow = FlowBuilder::new();
2949        let node = flow.process::<()>();
2950        let external = flow.external::<()>();
2951
2952        let (input_port, input) =
2953            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2954
2955        let out = sliced! {
2956            let input = use(input, nondet!(/** test */));
2957            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
2958            input.cross_singleton(v.into_stream().count())
2959        }
2960        .send_bincode_external(&external);
2961
2962        let nodes = flow
2963            .with_process(&node, deployment.Localhost())
2964            .with_external(&external, deployment.Localhost())
2965            .deploy(&mut deployment);
2966
2967        deployment.deploy().await.unwrap();
2968
2969        let mut external_in = nodes.connect(input_port).await;
2970        let mut external_out = nodes.connect(out).await;
2971
2972        deployment.start().await.unwrap();
2973
2974        external_in.send(1).await.unwrap();
2975        assert_eq!(external_out.next().await.unwrap(), (1, 1));
2976
2977        external_in.send(2).await.unwrap();
2978        assert_eq!(external_out.next().await.unwrap(), (2, 1));
2979    }
2980
2981    #[cfg(feature = "deploy")]
2982    #[tokio::test]
2983    async fn top_level_bounded_into_singleton_cardinality() {
2984        let mut deployment = Deployment::new();
2985
2986        let mut flow = FlowBuilder::new();
2987        let node = flow.process::<()>();
2988        let external = flow.external::<()>();
2989
2990        let (input_port, input) =
2991            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
2992
2993        let out = sliced! {
2994            let input = use(input, nondet!(/** test */));
2995            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
2996            input.cross_singleton(v.into_stream().count())
2997        }
2998        .send_bincode_external(&external);
2999
3000        let nodes = flow
3001            .with_process(&node, deployment.Localhost())
3002            .with_external(&external, deployment.Localhost())
3003            .deploy(&mut deployment);
3004
3005        deployment.deploy().await.unwrap();
3006
3007        let mut external_in = nodes.connect(input_port).await;
3008        let mut external_out = nodes.connect(out).await;
3009
3010        deployment.start().await.unwrap();
3011
3012        external_in.send(1).await.unwrap();
3013        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3014
3015        external_in.send(2).await.unwrap();
3016        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3017    }
3018
3019    #[cfg(feature = "deploy")]
3020    #[tokio::test]
3021    async fn atomic_fold_replays_each_tick() {
3022        let mut deployment = Deployment::new();
3023
3024        let mut flow = FlowBuilder::new();
3025        let node = flow.process::<()>();
3026        let external = flow.external::<()>();
3027
3028        let (input_port, input) =
3029            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3030        let tick = node.tick();
3031
3032        let out = input
3033            .batch(&tick, nondet!(/** test */))
3034            .cross_singleton(
3035                node.source_iter(q!(vec![1, 2, 3]))
3036                    .atomic(&tick)
3037                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3038                    .snapshot_atomic(nondet!(/** test */)),
3039            )
3040            .all_ticks()
3041            .send_bincode_external(&external);
3042
3043        let nodes = flow
3044            .with_process(&node, deployment.Localhost())
3045            .with_external(&external, deployment.Localhost())
3046            .deploy(&mut deployment);
3047
3048        deployment.deploy().await.unwrap();
3049
3050        let mut external_in = nodes.connect(input_port).await;
3051        let mut external_out = nodes.connect(out).await;
3052
3053        deployment.start().await.unwrap();
3054
3055        external_in.send(1).await.unwrap();
3056        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3057
3058        external_in.send(2).await.unwrap();
3059        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3060    }
3061
3062    #[cfg(feature = "deploy")]
3063    #[tokio::test]
3064    async fn unbounded_scan_remembers_state() {
3065        let mut deployment = Deployment::new();
3066
3067        let mut flow = FlowBuilder::new();
3068        let node = flow.process::<()>();
3069        let external = flow.external::<()>();
3070
3071        let (input_port, input) = node.source_external_bincode(&external);
3072        let out = input
3073            .scan(
3074                q!(|| 0),
3075                q!(|acc, v| {
3076                    *acc += v;
3077                    Some(*acc)
3078                }),
3079            )
3080            .send_bincode_external(&external);
3081
3082        let nodes = flow
3083            .with_process(&node, deployment.Localhost())
3084            .with_external(&external, deployment.Localhost())
3085            .deploy(&mut deployment);
3086
3087        deployment.deploy().await.unwrap();
3088
3089        let mut external_in = nodes.connect(input_port).await;
3090        let mut external_out = nodes.connect(out).await;
3091
3092        deployment.start().await.unwrap();
3093
3094        external_in.send(1).await.unwrap();
3095        assert_eq!(external_out.next().await.unwrap(), 1);
3096
3097        external_in.send(2).await.unwrap();
3098        assert_eq!(external_out.next().await.unwrap(), 3);
3099    }
3100
3101    #[cfg(feature = "deploy")]
3102    #[tokio::test]
3103    async fn unbounded_enumerate_remembers_state() {
3104        let mut deployment = Deployment::new();
3105
3106        let mut flow = FlowBuilder::new();
3107        let node = flow.process::<()>();
3108        let external = flow.external::<()>();
3109
3110        let (input_port, input) = node.source_external_bincode(&external);
3111        let out = input.enumerate().send_bincode_external(&external);
3112
3113        let nodes = flow
3114            .with_process(&node, deployment.Localhost())
3115            .with_external(&external, deployment.Localhost())
3116            .deploy(&mut deployment);
3117
3118        deployment.deploy().await.unwrap();
3119
3120        let mut external_in = nodes.connect(input_port).await;
3121        let mut external_out = nodes.connect(out).await;
3122
3123        deployment.start().await.unwrap();
3124
3125        external_in.send(1).await.unwrap();
3126        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3127
3128        external_in.send(2).await.unwrap();
3129        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3130    }
3131
3132    #[cfg(feature = "deploy")]
3133    #[tokio::test]
3134    async fn unbounded_unique_remembers_state() {
3135        let mut deployment = Deployment::new();
3136
3137        let mut flow = FlowBuilder::new();
3138        let node = flow.process::<()>();
3139        let external = flow.external::<()>();
3140
3141        let (input_port, input) =
3142            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3143        let out = input.unique().send_bincode_external(&external);
3144
3145        let nodes = flow
3146            .with_process(&node, deployment.Localhost())
3147            .with_external(&external, deployment.Localhost())
3148            .deploy(&mut deployment);
3149
3150        deployment.deploy().await.unwrap();
3151
3152        let mut external_in = nodes.connect(input_port).await;
3153        let mut external_out = nodes.connect(out).await;
3154
3155        deployment.start().await.unwrap();
3156
3157        external_in.send(1).await.unwrap();
3158        assert_eq!(external_out.next().await.unwrap(), 1);
3159
3160        external_in.send(2).await.unwrap();
3161        assert_eq!(external_out.next().await.unwrap(), 2);
3162
3163        external_in.send(1).await.unwrap();
3164        external_in.send(3).await.unwrap();
3165        assert_eq!(external_out.next().await.unwrap(), 3);
3166    }
3167
3168    #[cfg(feature = "sim")]
3169    #[test]
3170    #[should_panic]
3171    fn sim_batch_nondet_size() {
3172        let mut flow = FlowBuilder::new();
3173        let node = flow.process::<()>();
3174
3175        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3176
3177        let tick = node.tick();
3178        let out_recv = input
3179            .batch(&tick, nondet!(/** test */))
3180            .count()
3181            .all_ticks()
3182            .sim_output();
3183
3184        flow.sim().exhaustive(async || {
3185            in_send.send(());
3186            in_send.send(());
3187            in_send.send(());
3188
3189            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3190        });
3191    }
3192
3193    #[cfg(feature = "sim")]
3194    #[test]
3195    fn sim_batch_preserves_order() {
3196        let mut flow = FlowBuilder::new();
3197        let node = flow.process::<()>();
3198
3199        let (in_send, input) = node.sim_input();
3200
3201        let tick = node.tick();
3202        let out_recv = input
3203            .batch(&tick, nondet!(/** test */))
3204            .all_ticks()
3205            .sim_output();
3206
3207        flow.sim().exhaustive(async || {
3208            in_send.send(1);
3209            in_send.send(2);
3210            in_send.send(3);
3211
3212            out_recv.assert_yields_only([1, 2, 3]).await;
3213        });
3214    }
3215
3216    #[cfg(feature = "sim")]
3217    #[test]
3218    #[should_panic]
3219    fn sim_batch_unordered_shuffles() {
3220        let mut flow = FlowBuilder::new();
3221        let node = flow.process::<()>();
3222
3223        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3224
3225        let tick = node.tick();
3226        let batch = input.batch(&tick, nondet!(/** test */));
3227        let out_recv = batch
3228            .clone()
3229            .min()
3230            .zip(batch.max())
3231            .all_ticks()
3232            .sim_output();
3233
3234        flow.sim().exhaustive(async || {
3235            in_send.send_many_unordered([1, 2, 3]);
3236
3237            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3238                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3239            }
3240        });
3241    }
3242
3243    #[cfg(feature = "sim")]
3244    #[test]
3245    fn sim_batch_unordered_shuffles_count() {
3246        let mut flow = FlowBuilder::new();
3247        let node = flow.process::<()>();
3248
3249        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3250
3251        let tick = node.tick();
3252        let batch = input.batch(&tick, nondet!(/** test */));
3253        let out_recv = batch.all_ticks().sim_output();
3254
3255        let instance_count = flow.sim().exhaustive(async || {
3256            in_send.send_many_unordered([1, 2, 3, 4]);
3257            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3258        });
3259
3260        assert_eq!(
3261            instance_count,
3262            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3263        )
3264    }
3265
3266    #[cfg(feature = "sim")]
3267    #[test]
3268    #[should_panic]
3269    fn sim_observe_order_batched() {
3270        let mut flow = FlowBuilder::new();
3271        let node = flow.process::<()>();
3272
3273        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3274
3275        let tick = node.tick();
3276        let batch = input.batch(&tick, nondet!(/** test */));
3277        let out_recv = batch
3278            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3279            .all_ticks()
3280            .sim_output();
3281
3282        flow.sim().exhaustive(async || {
3283            in_send.send_many_unordered([1, 2, 3, 4]);
3284            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3285        });
3286    }
3287
3288    #[cfg(feature = "sim")]
3289    #[test]
3290    fn sim_observe_order_batched_count() {
3291        let mut flow = FlowBuilder::new();
3292        let node = flow.process::<()>();
3293
3294        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3295
3296        let tick = node.tick();
3297        let batch = input.batch(&tick, nondet!(/** test */));
3298        let out_recv = batch
3299            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3300            .all_ticks()
3301            .sim_output();
3302
3303        let instance_count = flow.sim().exhaustive(async || {
3304            in_send.send_many_unordered([1, 2, 3, 4]);
3305            let _ = out_recv.collect::<Vec<_>>().await;
3306        });
3307
3308        assert_eq!(
3309            instance_count,
3310            192 // 4! * 2^{4 - 1}
3311        )
3312    }
3313
3314    #[cfg(feature = "sim")]
3315    #[test]
3316    fn sim_unordered_count_instance_count() {
3317        let mut flow = FlowBuilder::new();
3318        let node = flow.process::<()>();
3319
3320        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3321
3322        let tick = node.tick();
3323        let out_recv = input
3324            .count()
3325            .snapshot(&tick, nondet!(/** test */))
3326            .all_ticks()
3327            .sim_output();
3328
3329        let instance_count = flow.sim().exhaustive(async || {
3330            in_send.send_many_unordered([1, 2, 3, 4]);
3331            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3332        });
3333
3334        assert_eq!(
3335            instance_count,
3336            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3337        )
3338    }
3339
3340    #[cfg(feature = "sim")]
3341    #[test]
3342    fn sim_top_level_assume_ordering() {
3343        let mut flow = FlowBuilder::new();
3344        let node = flow.process::<()>();
3345
3346        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3347
3348        let out_recv = input
3349            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3350            .sim_output();
3351
3352        let instance_count = flow.sim().exhaustive(async || {
3353            in_send.send_many_unordered([1, 2, 3]);
3354            let mut out = out_recv.collect::<Vec<_>>().await;
3355            out.sort();
3356            assert_eq!(out, vec![1, 2, 3]);
3357        });
3358
3359        assert_eq!(instance_count, 6)
3360    }
3361
3362    #[cfg(feature = "sim")]
3363    #[test]
3364    fn sim_top_level_assume_ordering_cycle_back() {
3365        let mut flow = FlowBuilder::new();
3366        let node = flow.process::<()>();
3367
3368        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3369
3370        let (complete_cycle_back, cycle_back) =
3371            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3372        let ordered = input
3373            .interleave(cycle_back)
3374            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3375        complete_cycle_back.complete(
3376            ordered
3377                .clone()
3378                .map(q!(|v| v + 1))
3379                .filter(q!(|v| v % 2 == 1)),
3380        );
3381
3382        let out_recv = ordered.sim_output();
3383
3384        let mut saw = false;
3385        let instance_count = flow.sim().exhaustive(async || {
3386            in_send.send_many_unordered([0, 2]);
3387            let out = out_recv.collect::<Vec<_>>().await;
3388
3389            if out.starts_with(&[0, 1, 2]) {
3390                saw = true;
3391            }
3392        });
3393
3394        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3395        assert_eq!(instance_count, 6)
3396    }
3397
3398    #[cfg(feature = "sim")]
3399    #[test]
3400    fn sim_top_level_assume_ordering_cycle_back_tick() {
3401        let mut flow = FlowBuilder::new();
3402        let node = flow.process::<()>();
3403
3404        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3405
3406        let (complete_cycle_back, cycle_back) =
3407            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3408        let ordered = input
3409            .interleave(cycle_back)
3410            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3411        complete_cycle_back.complete(
3412            ordered
3413                .clone()
3414                .batch(&node.tick(), nondet!(/** test */))
3415                .all_ticks()
3416                .map(q!(|v| v + 1))
3417                .filter(q!(|v| v % 2 == 1)),
3418        );
3419
3420        let out_recv = ordered.sim_output();
3421
3422        let mut saw = false;
3423        let instance_count = flow.sim().exhaustive(async || {
3424            in_send.send_many_unordered([0, 2]);
3425            let out = out_recv.collect::<Vec<_>>().await;
3426
3427            if out.starts_with(&[0, 1, 2]) {
3428                saw = true;
3429            }
3430        });
3431
3432        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3433        assert_eq!(instance_count, 58)
3434    }
3435
3436    #[cfg(feature = "sim")]
3437    #[test]
3438    fn sim_top_level_assume_ordering_multiple() {
3439        let mut flow = FlowBuilder::new();
3440        let node = flow.process::<()>();
3441
3442        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3443        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3444
3445        let (complete_cycle_back, cycle_back) =
3446            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3447        let input1_ordered = input
3448            .clone()
3449            .interleave(cycle_back)
3450            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3451        let foo = input1_ordered
3452            .clone()
3453            .map(q!(|v| v + 3))
3454            .weaken_ordering::<NoOrder>()
3455            .interleave(input2)
3456            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3457
3458        complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3459
3460        let out_recv = input1_ordered.sim_output();
3461
3462        let mut saw = false;
3463        let instance_count = flow.sim().exhaustive(async || {
3464            in_send.send_many_unordered([0, 1]);
3465            let out = out_recv.collect::<Vec<_>>().await;
3466
3467            if out.starts_with(&[0, 3, 1]) {
3468                saw = true;
3469            }
3470        });
3471
3472        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3473        assert_eq!(instance_count, 24)
3474    }
3475
3476    #[cfg(feature = "sim")]
3477    #[test]
3478    fn sim_atomic_assume_ordering_cycle_back() {
3479        let mut flow = FlowBuilder::new();
3480        let node = flow.process::<()>();
3481
3482        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3483
3484        let (complete_cycle_back, cycle_back) =
3485            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3486        let ordered = input
3487            .interleave(cycle_back)
3488            .atomic(&node.tick())
3489            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3490            .end_atomic();
3491        complete_cycle_back.complete(
3492            ordered
3493                .clone()
3494                .map(q!(|v| v + 1))
3495                .filter(q!(|v| v % 2 == 1)),
3496        );
3497
3498        let out_recv = ordered.sim_output();
3499
3500        let instance_count = flow.sim().exhaustive(async || {
3501            in_send.send_many_unordered([0, 2]);
3502            let out = out_recv.collect::<Vec<_>>().await;
3503            assert_eq!(out.len(), 4);
3504        });
3505
3506        assert_eq!(instance_count, 22)
3507    }
3508
3509    #[cfg(feature = "deploy")]
3510    #[tokio::test]
3511    async fn partition_evens_odds() {
3512        let mut deployment = Deployment::new();
3513
3514        let mut flow = FlowBuilder::new();
3515        let node = flow.process::<()>();
3516        let external = flow.external::<()>();
3517
3518        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3519        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3520        let evens_port = evens.send_bincode_external(&external);
3521        let odds_port = odds.send_bincode_external(&external);
3522
3523        let nodes = flow
3524            .with_process(&node, deployment.Localhost())
3525            .with_external(&external, deployment.Localhost())
3526            .deploy(&mut deployment);
3527
3528        deployment.deploy().await.unwrap();
3529
3530        let mut evens_out = nodes.connect(evens_port).await;
3531        let mut odds_out = nodes.connect(odds_port).await;
3532
3533        deployment.start().await.unwrap();
3534
3535        let mut even_results = Vec::new();
3536        for _ in 0..3 {
3537            even_results.push(evens_out.next().await.unwrap());
3538        }
3539        even_results.sort();
3540        assert_eq!(even_results, vec![2, 4, 6]);
3541
3542        let mut odd_results = Vec::new();
3543        for _ in 0..3 {
3544            odd_results.push(odds_out.next().await.unwrap());
3545        }
3546        odd_results.sort();
3547        assert_eq!(odd_results, vec![1, 3, 5]);
3548    }
3549
3550    #[cfg(feature = "deploy")]
3551    #[tokio::test]
3552    async fn unconsumed_inspect_still_runs() {
3553        use crate::deploy::DeployCrateWrapper;
3554
3555        let mut deployment = Deployment::new();
3556
3557        let mut flow = FlowBuilder::new();
3558        let node = flow.process::<()>();
3559
3560        // The return value of .inspect() is intentionally dropped.
3561        // Before the Null-root fix, this would silently do nothing.
3562        node.source_iter(q!(0..5))
3563            .inspect(q!(|x| println!("inspect: {}", x)));
3564
3565        let nodes = flow
3566            .with_process(&node, deployment.Localhost())
3567            .deploy(&mut deployment);
3568
3569        deployment.deploy().await.unwrap();
3570
3571        let mut stdout = nodes.get_process(&node).stdout();
3572
3573        deployment.start().await.unwrap();
3574
3575        let mut lines = Vec::new();
3576        for _ in 0..5 {
3577            lines.push(stdout.recv().await.unwrap());
3578        }
3579        lines.sort();
3580        assert_eq!(
3581            lines,
3582            vec![
3583                "inspect: 0",
3584                "inspect: 1",
3585                "inspect: 2",
3586                "inspect: 3",
3587                "inspect: 4",
3588            ]
3589        );
3590    }
3591}