Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26use crate::compile::builder::{CycleId, ExternalPortId};
27#[cfg(feature = "build")]
28use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
29use crate::location::dynamic::LocationId;
30use crate::location::{LocationKey, NetworkHint};
31
32pub mod backtrace;
33use backtrace::Backtrace;
34
35/// Wrapper that displays only the tokens of a parsed expr.
36///
37/// Boxes `syn::Type` which is ~240 bytes.
38#[derive(Clone, Hash)]
39pub struct DebugExpr(pub Box<syn::Expr>);
40
41impl From<syn::Expr> for DebugExpr {
42    fn from(expr: syn::Expr) -> Self {
43        Self(Box::new(expr))
44    }
45}
46
47impl Deref for DebugExpr {
48    type Target = syn::Expr;
49
50    fn deref(&self) -> &Self::Target {
51        &self.0
52    }
53}
54
55impl ToTokens for DebugExpr {
56    fn to_tokens(&self, tokens: &mut TokenStream) {
57        self.0.to_tokens(tokens);
58    }
59}
60
61impl Debug for DebugExpr {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        write!(f, "{}", self.0.to_token_stream())
64    }
65}
66
67impl Display for DebugExpr {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        let original = self.0.as_ref().clone();
70        let simplified = simplify_q_macro(original);
71
72        // For now, just use quote formatting without trying to parse as a statement
73        // This avoids the syn::parse_quote! issues entirely
74        write!(f, "q!({})", quote::quote!(#simplified))
75    }
76}
77
78/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
79fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
80    // Try to parse the token string as a syn::Expr
81    // Use a visitor to simplify q! macro expansions
82    let mut simplifier = QMacroSimplifier::new();
83    simplifier.visit_expr_mut(&mut expr);
84
85    // If we found and simplified a q! macro, return the simplified version
86    if let Some(simplified) = simplifier.simplified_result {
87        simplified
88    } else {
89        expr
90    }
91}
92
93/// AST visitor that simplifies q! macro expansions
94#[derive(Default)]
95pub struct QMacroSimplifier {
96    pub simplified_result: Option<syn::Expr>,
97}
98
99impl QMacroSimplifier {
100    pub fn new() -> Self {
101        Self::default()
102    }
103}
104
105impl VisitMut for QMacroSimplifier {
106    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
107        // Check if we already found a result to avoid further processing
108        if self.simplified_result.is_some() {
109            return;
110        }
111
112        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
113            // Look for calls to stageleft::runtime_support::fn*
114            && self.is_stageleft_runtime_support_call(&path_expr.path)
115            // Try to extract the closure from the arguments
116            && let Some(closure) = self.extract_closure_from_args(&call.args)
117        {
118            self.simplified_result = Some(closure);
119            return;
120        }
121
122        // Continue visiting child expressions using the default implementation
123        // Use the default visitor to avoid infinite recursion
124        syn::visit_mut::visit_expr_mut(self, expr);
125    }
126}
127
128impl QMacroSimplifier {
129    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
130        // Check if this is a call to stageleft::runtime_support::fn*
131        if let Some(last_segment) = path.segments.last() {
132            let fn_name = last_segment.ident.to_string();
133            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
134            fn_name.contains("_type_hint")
135                && path.segments.len() > 2
136                && path.segments[0].ident == "stageleft"
137                && path.segments[1].ident == "runtime_support"
138        } else {
139            false
140        }
141    }
142
143    fn extract_closure_from_args(
144        &self,
145        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
146    ) -> Option<syn::Expr> {
147        // Look through the arguments for a closure expression
148        for arg in args {
149            if let syn::Expr::Closure(_) = arg {
150                return Some(arg.clone());
151            }
152            // Also check for closures nested in other expressions (like blocks)
153            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
154                return Some(closure_expr);
155            }
156        }
157        None
158    }
159
160    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
161        let mut visitor = ClosureFinder {
162            found_closure: None,
163            prefer_inner_blocks: true,
164        };
165        visitor.visit_expr(expr);
166        visitor.found_closure
167    }
168}
169
170/// Visitor that finds closures in expressions with special block handling
171struct ClosureFinder {
172    found_closure: Option<syn::Expr>,
173    prefer_inner_blocks: bool,
174}
175
176impl<'ast> Visit<'ast> for ClosureFinder {
177    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
178        // If we already found a closure, don't continue searching
179        if self.found_closure.is_some() {
180            return;
181        }
182
183        match expr {
184            syn::Expr::Closure(_) => {
185                self.found_closure = Some(expr.clone());
186            }
187            syn::Expr::Block(block) if self.prefer_inner_blocks => {
188                // Special handling for blocks - look for inner blocks that contain closures
189                for stmt in &block.block.stmts {
190                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
191                        && let syn::Expr::Block(_) = stmt_expr
192                    {
193                        // Check if this nested block contains a closure
194                        let mut inner_visitor = ClosureFinder {
195                            found_closure: None,
196                            prefer_inner_blocks: false, // Avoid infinite recursion
197                        };
198                        inner_visitor.visit_expr(stmt_expr);
199                        if inner_visitor.found_closure.is_some() {
200                            // Found a closure in an inner block, return that block
201                            self.found_closure = Some(stmt_expr.clone());
202                            return;
203                        }
204                    }
205                }
206
207                // If no inner block with closure found, continue with normal visitation
208                visit::visit_expr(self, expr);
209
210                // If we found a closure, just return the closure itself, not the whole block
211                // unless we're in the special case where we want the containing block
212                if self.found_closure.is_some() {
213                    // The closure was found during visitation, no need to wrap in block
214                }
215            }
216            _ => {
217                // Use default visitor behavior for all other expressions
218                visit::visit_expr(self, expr);
219            }
220        }
221    }
222}
223
224/// Debug displays the type's tokens.
225///
226/// Boxes `syn::Type` which is ~320 bytes.
227#[derive(Clone, PartialEq, Eq, Hash)]
228pub struct DebugType(pub Box<syn::Type>);
229
230impl From<syn::Type> for DebugType {
231    fn from(t: syn::Type) -> Self {
232        Self(Box::new(t))
233    }
234}
235
236impl Deref for DebugType {
237    type Target = syn::Type;
238
239    fn deref(&self) -> &Self::Target {
240        &self.0
241    }
242}
243
244impl ToTokens for DebugType {
245    fn to_tokens(&self, tokens: &mut TokenStream) {
246        self.0.to_tokens(tokens);
247    }
248}
249
250impl Debug for DebugType {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        write!(f, "{}", self.0.to_token_stream())
253    }
254}
255
256pub enum DebugInstantiate {
257    Building,
258    Finalized(Box<DebugInstantiateFinalized>),
259}
260
261#[cfg_attr(
262    not(feature = "build"),
263    expect(
264        dead_code,
265        reason = "sink, source unused without `feature = \"build\"`."
266    )
267)]
268pub struct DebugInstantiateFinalized {
269    sink: syn::Expr,
270    source: syn::Expr,
271    connect_fn: Option<Box<dyn FnOnce()>>,
272}
273
274impl From<DebugInstantiateFinalized> for DebugInstantiate {
275    fn from(f: DebugInstantiateFinalized) -> Self {
276        Self::Finalized(Box::new(f))
277    }
278}
279
280impl Debug for DebugInstantiate {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        write!(f, "<network instantiate>")
283    }
284}
285
286impl Hash for DebugInstantiate {
287    fn hash<H: Hasher>(&self, _state: &mut H) {
288        // Do nothing
289    }
290}
291
292impl Clone for DebugInstantiate {
293    fn clone(&self) -> Self {
294        match self {
295            DebugInstantiate::Building => DebugInstantiate::Building,
296            DebugInstantiate::Finalized(_) => {
297                panic!("DebugInstantiate::Finalized should not be cloned")
298            }
299        }
300    }
301}
302
303/// Tracks the instantiation state of a `ClusterMembers` source.
304///
305/// During `compile_network`, the first `ClusterMembers` node for a given
306/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
307/// receives the expression returned by `Deploy::cluster_membership_stream`.
308/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
309/// during code-gen they simply reference the tee output of the first node
310/// instead of creating a redundant `source_stream`.
311#[derive(Debug, Hash, Clone)]
312pub enum ClusterMembersState {
313    /// Not yet instantiated.
314    Uninit,
315    /// The primary instance: holds the stream expression and will emit
316    /// `source_stream(expr) -> tee()` during code-gen.
317    Stream(DebugExpr),
318    /// A secondary instance that references the tee output of the primary.
319    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
320    /// can derive the deterministic tee ident without extra state.
321    Tee(LocationId, LocationId),
322}
323
324/// A source in a Hydro graph, where data enters the graph.
325#[derive(Debug, Hash, Clone)]
326pub enum HydroSource {
327    Stream(DebugExpr),
328    ExternalNetwork(),
329    Iter(DebugExpr),
330    Spin(),
331    ClusterMembers(LocationId, ClusterMembersState),
332    Embedded(syn::Ident),
333}
334
335#[cfg(feature = "build")]
336/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
337/// and simulations.
338///
339/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
340/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
341pub trait DfirBuilder {
342    /// Whether the representation of singletons should include intermediate states.
343    fn singleton_intermediates(&self) -> bool;
344
345    /// Gets the DFIR builder for the given location, creating it if necessary.
346    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
347
348    fn batch(
349        &mut self,
350        in_ident: syn::Ident,
351        in_location: &LocationId,
352        in_kind: &CollectionKind,
353        out_ident: &syn::Ident,
354        out_location: &LocationId,
355        op_meta: &HydroIrOpMetadata,
356    );
357    fn yield_from_tick(
358        &mut self,
359        in_ident: syn::Ident,
360        in_location: &LocationId,
361        in_kind: &CollectionKind,
362        out_ident: &syn::Ident,
363        out_location: &LocationId,
364    );
365
366    fn begin_atomic(
367        &mut self,
368        in_ident: syn::Ident,
369        in_location: &LocationId,
370        in_kind: &CollectionKind,
371        out_ident: &syn::Ident,
372        out_location: &LocationId,
373        op_meta: &HydroIrOpMetadata,
374    );
375    fn end_atomic(
376        &mut self,
377        in_ident: syn::Ident,
378        in_location: &LocationId,
379        in_kind: &CollectionKind,
380        out_ident: &syn::Ident,
381    );
382
383    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
384    fn observe_nondet(
385        &mut self,
386        trusted: bool,
387        location: &LocationId,
388        in_ident: syn::Ident,
389        in_kind: &CollectionKind,
390        out_ident: &syn::Ident,
391        out_kind: &CollectionKind,
392        op_meta: &HydroIrOpMetadata,
393    );
394
395    #[expect(clippy::too_many_arguments, reason = "TODO")]
396    fn create_network(
397        &mut self,
398        from: &LocationId,
399        to: &LocationId,
400        input_ident: syn::Ident,
401        out_ident: &syn::Ident,
402        serialize: Option<&DebugExpr>,
403        sink: syn::Expr,
404        source: syn::Expr,
405        deserialize: Option<&DebugExpr>,
406        tag_id: usize,
407        networking_info: &crate::networking::NetworkingInfo,
408    );
409
410    fn create_external_source(
411        &mut self,
412        on: &LocationId,
413        source_expr: syn::Expr,
414        out_ident: &syn::Ident,
415        deserialize: Option<&DebugExpr>,
416        tag_id: usize,
417    );
418
419    fn create_external_output(
420        &mut self,
421        on: &LocationId,
422        sink_expr: syn::Expr,
423        input_ident: &syn::Ident,
424        serialize: Option<&DebugExpr>,
425        tag_id: usize,
426    );
427}
428
429#[cfg(feature = "build")]
430impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
431    fn singleton_intermediates(&self) -> bool {
432        false
433    }
434
435    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
436        self.entry(location.root().key())
437            .expect("location was removed")
438            .or_default()
439    }
440
441    fn batch(
442        &mut self,
443        in_ident: syn::Ident,
444        in_location: &LocationId,
445        in_kind: &CollectionKind,
446        out_ident: &syn::Ident,
447        _out_location: &LocationId,
448        _op_meta: &HydroIrOpMetadata,
449    ) {
450        let builder = self.get_dfir_mut(in_location.root());
451        if in_kind.is_bounded()
452            && matches!(
453                in_kind,
454                CollectionKind::Singleton { .. }
455                    | CollectionKind::Optional { .. }
456                    | CollectionKind::KeyedSingleton { .. }
457            )
458        {
459            assert!(in_location.is_top_level());
460            builder.add_dfir(
461                parse_quote! {
462                    #out_ident = #in_ident -> persist::<'static>();
463                },
464                None,
465                None,
466            );
467        } else {
468            builder.add_dfir(
469                parse_quote! {
470                    #out_ident = #in_ident;
471                },
472                None,
473                None,
474            );
475        }
476    }
477
478    fn yield_from_tick(
479        &mut self,
480        in_ident: syn::Ident,
481        in_location: &LocationId,
482        _in_kind: &CollectionKind,
483        out_ident: &syn::Ident,
484        _out_location: &LocationId,
485    ) {
486        let builder = self.get_dfir_mut(in_location.root());
487        builder.add_dfir(
488            parse_quote! {
489                #out_ident = #in_ident;
490            },
491            None,
492            None,
493        );
494    }
495
496    fn begin_atomic(
497        &mut self,
498        in_ident: syn::Ident,
499        in_location: &LocationId,
500        _in_kind: &CollectionKind,
501        out_ident: &syn::Ident,
502        _out_location: &LocationId,
503        _op_meta: &HydroIrOpMetadata,
504    ) {
505        let builder = self.get_dfir_mut(in_location.root());
506        builder.add_dfir(
507            parse_quote! {
508                #out_ident = #in_ident;
509            },
510            None,
511            None,
512        );
513    }
514
515    fn end_atomic(
516        &mut self,
517        in_ident: syn::Ident,
518        in_location: &LocationId,
519        _in_kind: &CollectionKind,
520        out_ident: &syn::Ident,
521    ) {
522        let builder = self.get_dfir_mut(in_location.root());
523        builder.add_dfir(
524            parse_quote! {
525                #out_ident = #in_ident;
526            },
527            None,
528            None,
529        );
530    }
531
532    fn observe_nondet(
533        &mut self,
534        _trusted: bool,
535        location: &LocationId,
536        in_ident: syn::Ident,
537        _in_kind: &CollectionKind,
538        out_ident: &syn::Ident,
539        _out_kind: &CollectionKind,
540        _op_meta: &HydroIrOpMetadata,
541    ) {
542        let builder = self.get_dfir_mut(location);
543        builder.add_dfir(
544            parse_quote! {
545                #out_ident = #in_ident;
546            },
547            None,
548            None,
549        );
550    }
551
552    fn create_network(
553        &mut self,
554        from: &LocationId,
555        to: &LocationId,
556        input_ident: syn::Ident,
557        out_ident: &syn::Ident,
558        serialize: Option<&DebugExpr>,
559        sink: syn::Expr,
560        source: syn::Expr,
561        deserialize: Option<&DebugExpr>,
562        tag_id: usize,
563        _networking_info: &crate::networking::NetworkingInfo,
564    ) {
565        let sender_builder = self.get_dfir_mut(from);
566        if let Some(serialize_pipeline) = serialize {
567            sender_builder.add_dfir(
568                parse_quote! {
569                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
570                },
571                None,
572                // operator tag separates send and receive, which otherwise have the same next_stmt_id
573                Some(&format!("send{}", tag_id)),
574            );
575        } else {
576            sender_builder.add_dfir(
577                parse_quote! {
578                    #input_ident -> dest_sink(#sink);
579                },
580                None,
581                Some(&format!("send{}", tag_id)),
582            );
583        }
584
585        let receiver_builder = self.get_dfir_mut(to);
586        if let Some(deserialize_pipeline) = deserialize {
587            receiver_builder.add_dfir(
588                parse_quote! {
589                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
590                },
591                None,
592                Some(&format!("recv{}", tag_id)),
593            );
594        } else {
595            receiver_builder.add_dfir(
596                parse_quote! {
597                    #out_ident = source_stream(#source);
598                },
599                None,
600                Some(&format!("recv{}", tag_id)),
601            );
602        }
603    }
604
605    fn create_external_source(
606        &mut self,
607        on: &LocationId,
608        source_expr: syn::Expr,
609        out_ident: &syn::Ident,
610        deserialize: Option<&DebugExpr>,
611        tag_id: usize,
612    ) {
613        let receiver_builder = self.get_dfir_mut(on);
614        if let Some(deserialize_pipeline) = deserialize {
615            receiver_builder.add_dfir(
616                parse_quote! {
617                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
618                },
619                None,
620                Some(&format!("recv{}", tag_id)),
621            );
622        } else {
623            receiver_builder.add_dfir(
624                parse_quote! {
625                    #out_ident = source_stream(#source_expr);
626                },
627                None,
628                Some(&format!("recv{}", tag_id)),
629            );
630        }
631    }
632
633    fn create_external_output(
634        &mut self,
635        on: &LocationId,
636        sink_expr: syn::Expr,
637        input_ident: &syn::Ident,
638        serialize: Option<&DebugExpr>,
639        tag_id: usize,
640    ) {
641        let sender_builder = self.get_dfir_mut(on);
642        if let Some(serialize_fn) = serialize {
643            sender_builder.add_dfir(
644                parse_quote! {
645                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
646                },
647                None,
648                // operator tag separates send and receive, which otherwise have the same next_stmt_id
649                Some(&format!("send{}", tag_id)),
650            );
651        } else {
652            sender_builder.add_dfir(
653                parse_quote! {
654                    #input_ident -> dest_sink(#sink_expr);
655                },
656                None,
657                Some(&format!("send{}", tag_id)),
658            );
659        }
660    }
661}
662
663#[cfg(feature = "build")]
664pub enum BuildersOrCallback<'a, L, N>
665where
666    L: FnMut(&mut HydroRoot, &mut usize),
667    N: FnMut(&mut HydroNode, &mut usize),
668{
669    Builders(&'a mut dyn DfirBuilder),
670    Callback(L, N),
671}
672
673/// An root in a Hydro graph, which is an pipeline that doesn't emit
674/// any downstream values. Traversals over the dataflow graph and
675/// generating DFIR IR start from roots.
676#[derive(Debug, Hash)]
677pub enum HydroRoot {
678    ForEach {
679        f: DebugExpr,
680        input: Box<HydroNode>,
681        op_metadata: HydroIrOpMetadata,
682    },
683    SendExternal {
684        to_external_key: LocationKey,
685        to_port_id: ExternalPortId,
686        to_many: bool,
687        unpaired: bool,
688        serialize_fn: Option<DebugExpr>,
689        instantiate_fn: DebugInstantiate,
690        input: Box<HydroNode>,
691        op_metadata: HydroIrOpMetadata,
692    },
693    DestSink {
694        sink: DebugExpr,
695        input: Box<HydroNode>,
696        op_metadata: HydroIrOpMetadata,
697    },
698    CycleSink {
699        cycle_id: CycleId,
700        input: Box<HydroNode>,
701        op_metadata: HydroIrOpMetadata,
702    },
703    EmbeddedOutput {
704        ident: syn::Ident,
705        input: Box<HydroNode>,
706        op_metadata: HydroIrOpMetadata,
707    },
708}
709
710impl HydroRoot {
711    #[cfg(feature = "build")]
712    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
713    pub fn compile_network<'a, D>(
714        &mut self,
715        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
716        seen_tees: &mut SeenSharedNodes,
717        seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
718        processes: &SparseSecondaryMap<LocationKey, D::Process>,
719        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
720        externals: &SparseSecondaryMap<LocationKey, D::External>,
721        env: &mut D::InstantiateEnv,
722    ) where
723        D: Deploy<'a>,
724    {
725        let refcell_extra_stmts = RefCell::new(extra_stmts);
726        let refcell_env = RefCell::new(env);
727        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
728        self.transform_bottom_up(
729            &mut |l| {
730                if let HydroRoot::SendExternal {
731                    input,
732                    to_external_key,
733                    to_port_id,
734                    to_many,
735                    unpaired,
736                    instantiate_fn,
737                    ..
738                } = l
739                {
740                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
741                        DebugInstantiate::Building => {
742                            let to_node = externals
743                                .get(*to_external_key)
744                                .unwrap_or_else(|| {
745                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
746                                })
747                                .clone();
748
749                            match input.metadata().location_id.root() {
750                                &LocationId::Process(process_key) => {
751                                    if *to_many {
752                                        (
753                                            (
754                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
755                                                parse_quote!(DUMMY),
756                                            ),
757                                            Box::new(|| {}) as Box<dyn FnOnce()>,
758                                        )
759                                    } else {
760                                        let from_node = processes
761                                            .get(process_key)
762                                            .unwrap_or_else(|| {
763                                                panic!("A process used in the graph was not instantiated: {}", process_key)
764                                            })
765                                            .clone();
766
767                                        let sink_port = from_node.next_port();
768                                        let source_port = to_node.next_port();
769
770                                        if *unpaired {
771                                            use stageleft::quote_type;
772                                            use tokio_util::codec::LengthDelimitedCodec;
773
774                                            to_node.register(*to_port_id, source_port.clone());
775
776                                            let _ = D::e2o_source(
777                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
778                                                &to_node, &source_port,
779                                                &from_node, &sink_port,
780                                                &quote_type::<LengthDelimitedCodec>(),
781                                                format!("{}_{}", *to_external_key, *to_port_id)
782                                            );
783                                        }
784
785                                        (
786                                            (
787                                                D::o2e_sink(
788                                                    &from_node,
789                                                    &sink_port,
790                                                    &to_node,
791                                                    &source_port,
792                                                    format!("{}_{}", *to_external_key, *to_port_id)
793                                                ),
794                                                parse_quote!(DUMMY),
795                                            ),
796                                            if *unpaired {
797                                                D::e2o_connect(
798                                                    &to_node,
799                                                    &source_port,
800                                                    &from_node,
801                                                    &sink_port,
802                                                    *to_many,
803                                                    NetworkHint::Auto,
804                                                )
805                                            } else {
806                                                Box::new(|| {}) as Box<dyn FnOnce()>
807                                            },
808                                        )
809                                    }
810                                }
811                                LocationId::Cluster(_) => todo!(),
812                                _ => panic!()
813                            }
814                        },
815
816                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
817                    };
818
819                    *instantiate_fn = DebugInstantiateFinalized {
820                        sink: sink_expr,
821                        source: source_expr,
822                        connect_fn: Some(connect_fn),
823                    }
824                    .into();
825                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
826                    let element_type = match &input.metadata().collection_kind {
827                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
828                        _ => panic!("Embedded output must have Stream collection kind"),
829                    };
830                    let location_key = match input.metadata().location_id.root() {
831                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
832                        _ => panic!("Embedded output must be on a process or cluster"),
833                    };
834                    D::register_embedded_output(
835                        &mut refcell_env.borrow_mut(),
836                        location_key,
837                        ident,
838                        &element_type,
839                    );
840                }
841            },
842            &mut |n| {
843                if let HydroNode::Network {
844                    name,
845                    networking_info,
846                    input,
847                    instantiate_fn,
848                    metadata,
849                    ..
850                } = n
851                {
852                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
853                        DebugInstantiate::Building => instantiate_network::<D>(
854                            &mut refcell_env.borrow_mut(),
855                            input.metadata().location_id.root(),
856                            metadata.location_id.root(),
857                            processes,
858                            clusters,
859                            name.as_deref(),
860                            networking_info,
861                        ),
862
863                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
864                    };
865
866                    *instantiate_fn = DebugInstantiateFinalized {
867                        sink: sink_expr,
868                        source: source_expr,
869                        connect_fn: Some(connect_fn),
870                    }
871                    .into();
872                } else if let HydroNode::ExternalInput {
873                    from_external_key,
874                    from_port_id,
875                    from_many,
876                    codec_type,
877                    port_hint,
878                    instantiate_fn,
879                    metadata,
880                    ..
881                } = n
882                {
883                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
884                        DebugInstantiate::Building => {
885                            let from_node = externals
886                                .get(*from_external_key)
887                                .unwrap_or_else(|| {
888                                    panic!(
889                                        "A external used in the graph was not instantiated: {}",
890                                        from_external_key,
891                                    )
892                                })
893                                .clone();
894
895                            match metadata.location_id.root() {
896                                &LocationId::Process(process_key) => {
897                                    let to_node = processes
898                                        .get(process_key)
899                                        .unwrap_or_else(|| {
900                                            panic!("A process used in the graph was not instantiated: {}", process_key)
901                                        })
902                                        .clone();
903
904                                    let sink_port = from_node.next_port();
905                                    let source_port = to_node.next_port();
906
907                                    from_node.register(*from_port_id, sink_port.clone());
908
909                                    (
910                                        (
911                                            parse_quote!(DUMMY),
912                                            if *from_many {
913                                                D::e2o_many_source(
914                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
915                                                    &to_node, &source_port,
916                                                    codec_type.0.as_ref(),
917                                                    format!("{}_{}", *from_external_key, *from_port_id)
918                                                )
919                                            } else {
920                                                D::e2o_source(
921                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
922                                                    &from_node, &sink_port,
923                                                    &to_node, &source_port,
924                                                    codec_type.0.as_ref(),
925                                                    format!("{}_{}", *from_external_key, *from_port_id)
926                                                )
927                                            },
928                                        ),
929                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
930                                    )
931                                }
932                                LocationId::Cluster(_) => todo!(),
933                                _ => panic!()
934                            }
935                        },
936
937                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
938                    };
939
940                    *instantiate_fn = DebugInstantiateFinalized {
941                        sink: sink_expr,
942                        source: source_expr,
943                        connect_fn: Some(connect_fn),
944                    }
945                    .into();
946                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
947                    let element_type = match &metadata.collection_kind {
948                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
949                        _ => panic!("Embedded source must have Stream collection kind"),
950                    };
951                    let location_key = match metadata.location_id.root() {
952                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
953                        _ => panic!("Embedded source must be on a process or cluster"),
954                    };
955                    D::register_embedded_input(
956                        &mut refcell_env.borrow_mut(),
957                        location_key,
958                        ident,
959                        &element_type,
960                    );
961                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
962                    match state {
963                        ClusterMembersState::Uninit => {
964                            let at_location = metadata.location_id.root().clone();
965                            let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
966                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
967                                // First occurrence: call cluster_membership_stream and mark as Stream.
968                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
969                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
970                                    &(),
971                                );
972                                *state = ClusterMembersState::Stream(expr.into());
973                            } else {
974                                // Already instantiated for this (at, target) pair: just tee.
975                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
976                            }
977                        }
978                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
979                            panic!("cluster members already finalized");
980                        }
981                    }
982                }
983            },
984            seen_tees,
985            false,
986        );
987    }
988
989    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
990        self.transform_bottom_up(
991            &mut |l| {
992                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
993                    match instantiate_fn {
994                        DebugInstantiate::Building => panic!("network not built"),
995
996                        DebugInstantiate::Finalized(finalized) => {
997                            (finalized.connect_fn.take().unwrap())();
998                        }
999                    }
1000                }
1001            },
1002            &mut |n| {
1003                if let HydroNode::Network { instantiate_fn, .. }
1004                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1005                {
1006                    match instantiate_fn {
1007                        DebugInstantiate::Building => panic!("network not built"),
1008
1009                        DebugInstantiate::Finalized(finalized) => {
1010                            (finalized.connect_fn.take().unwrap())();
1011                        }
1012                    }
1013                }
1014            },
1015            seen_tees,
1016            false,
1017        );
1018    }
1019
1020    pub fn transform_bottom_up(
1021        &mut self,
1022        transform_root: &mut impl FnMut(&mut HydroRoot),
1023        transform_node: &mut impl FnMut(&mut HydroNode),
1024        seen_tees: &mut SeenSharedNodes,
1025        check_well_formed: bool,
1026    ) {
1027        self.transform_children(
1028            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1029            seen_tees,
1030        );
1031
1032        transform_root(self);
1033    }
1034
1035    pub fn transform_children(
1036        &mut self,
1037        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1038        seen_tees: &mut SeenSharedNodes,
1039    ) {
1040        match self {
1041            HydroRoot::ForEach { input, .. }
1042            | HydroRoot::SendExternal { input, .. }
1043            | HydroRoot::DestSink { input, .. }
1044            | HydroRoot::CycleSink { input, .. }
1045            | HydroRoot::EmbeddedOutput { input, .. } => {
1046                transform(input, seen_tees);
1047            }
1048        }
1049    }
1050
1051    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1052        match self {
1053            HydroRoot::ForEach {
1054                f,
1055                input,
1056                op_metadata,
1057            } => HydroRoot::ForEach {
1058                f: f.clone(),
1059                input: Box::new(input.deep_clone(seen_tees)),
1060                op_metadata: op_metadata.clone(),
1061            },
1062            HydroRoot::SendExternal {
1063                to_external_key,
1064                to_port_id,
1065                to_many,
1066                unpaired,
1067                serialize_fn,
1068                instantiate_fn,
1069                input,
1070                op_metadata,
1071            } => HydroRoot::SendExternal {
1072                to_external_key: *to_external_key,
1073                to_port_id: *to_port_id,
1074                to_many: *to_many,
1075                unpaired: *unpaired,
1076                serialize_fn: serialize_fn.clone(),
1077                instantiate_fn: instantiate_fn.clone(),
1078                input: Box::new(input.deep_clone(seen_tees)),
1079                op_metadata: op_metadata.clone(),
1080            },
1081            HydroRoot::DestSink {
1082                sink,
1083                input,
1084                op_metadata,
1085            } => HydroRoot::DestSink {
1086                sink: sink.clone(),
1087                input: Box::new(input.deep_clone(seen_tees)),
1088                op_metadata: op_metadata.clone(),
1089            },
1090            HydroRoot::CycleSink {
1091                cycle_id,
1092                input,
1093                op_metadata,
1094            } => HydroRoot::CycleSink {
1095                cycle_id: *cycle_id,
1096                input: Box::new(input.deep_clone(seen_tees)),
1097                op_metadata: op_metadata.clone(),
1098            },
1099            HydroRoot::EmbeddedOutput {
1100                ident,
1101                input,
1102                op_metadata,
1103            } => HydroRoot::EmbeddedOutput {
1104                ident: ident.clone(),
1105                input: Box::new(input.deep_clone(seen_tees)),
1106                op_metadata: op_metadata.clone(),
1107            },
1108        }
1109    }
1110
1111    #[cfg(feature = "build")]
1112    pub fn emit(
1113        &mut self,
1114        graph_builders: &mut dyn DfirBuilder,
1115        seen_tees: &mut SeenSharedNodes,
1116        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1117        next_stmt_id: &mut usize,
1118    ) {
1119        self.emit_core(
1120            &mut BuildersOrCallback::<
1121                fn(&mut HydroRoot, &mut usize),
1122                fn(&mut HydroNode, &mut usize),
1123            >::Builders(graph_builders),
1124            seen_tees,
1125            built_tees,
1126            next_stmt_id,
1127        );
1128    }
1129
1130    #[cfg(feature = "build")]
1131    pub fn emit_core(
1132        &mut self,
1133        builders_or_callback: &mut BuildersOrCallback<
1134            impl FnMut(&mut HydroRoot, &mut usize),
1135            impl FnMut(&mut HydroNode, &mut usize),
1136        >,
1137        seen_tees: &mut SeenSharedNodes,
1138        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1139        next_stmt_id: &mut usize,
1140    ) {
1141        match self {
1142            HydroRoot::ForEach { f, input, .. } => {
1143                let input_ident =
1144                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1145
1146                match builders_or_callback {
1147                    BuildersOrCallback::Builders(graph_builders) => {
1148                        graph_builders
1149                            .get_dfir_mut(&input.metadata().location_id)
1150                            .add_dfir(
1151                                parse_quote! {
1152                                    #input_ident -> for_each(#f);
1153                                },
1154                                None,
1155                                Some(&next_stmt_id.to_string()),
1156                            );
1157                    }
1158                    BuildersOrCallback::Callback(leaf_callback, _) => {
1159                        leaf_callback(self, next_stmt_id);
1160                    }
1161                }
1162
1163                *next_stmt_id += 1;
1164            }
1165
1166            HydroRoot::SendExternal {
1167                serialize_fn,
1168                instantiate_fn,
1169                input,
1170                ..
1171            } => {
1172                let input_ident =
1173                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1174
1175                match builders_or_callback {
1176                    BuildersOrCallback::Builders(graph_builders) => {
1177                        let (sink_expr, _) = match instantiate_fn {
1178                            DebugInstantiate::Building => (
1179                                syn::parse_quote!(DUMMY_SINK),
1180                                syn::parse_quote!(DUMMY_SOURCE),
1181                            ),
1182
1183                            DebugInstantiate::Finalized(finalized) => {
1184                                (finalized.sink.clone(), finalized.source.clone())
1185                            }
1186                        };
1187
1188                        graph_builders.create_external_output(
1189                            &input.metadata().location_id,
1190                            sink_expr,
1191                            &input_ident,
1192                            serialize_fn.as_ref(),
1193                            *next_stmt_id,
1194                        );
1195                    }
1196                    BuildersOrCallback::Callback(leaf_callback, _) => {
1197                        leaf_callback(self, next_stmt_id);
1198                    }
1199                }
1200
1201                *next_stmt_id += 1;
1202            }
1203
1204            HydroRoot::DestSink { sink, input, .. } => {
1205                let input_ident =
1206                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1207
1208                match builders_or_callback {
1209                    BuildersOrCallback::Builders(graph_builders) => {
1210                        graph_builders
1211                            .get_dfir_mut(&input.metadata().location_id)
1212                            .add_dfir(
1213                                parse_quote! {
1214                                    #input_ident -> dest_sink(#sink);
1215                                },
1216                                None,
1217                                Some(&next_stmt_id.to_string()),
1218                            );
1219                    }
1220                    BuildersOrCallback::Callback(leaf_callback, _) => {
1221                        leaf_callback(self, next_stmt_id);
1222                    }
1223                }
1224
1225                *next_stmt_id += 1;
1226            }
1227
1228            HydroRoot::CycleSink {
1229                cycle_id, input, ..
1230            } => {
1231                let input_ident =
1232                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1233
1234                match builders_or_callback {
1235                    BuildersOrCallback::Builders(graph_builders) => {
1236                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1237                            CollectionKind::KeyedSingleton {
1238                                key_type,
1239                                value_type,
1240                                ..
1241                            }
1242                            | CollectionKind::KeyedStream {
1243                                key_type,
1244                                value_type,
1245                                ..
1246                            } => {
1247                                parse_quote!((#key_type, #value_type))
1248                            }
1249                            CollectionKind::Stream { element_type, .. }
1250                            | CollectionKind::Singleton { element_type, .. }
1251                            | CollectionKind::Optional { element_type, .. } => {
1252                                parse_quote!(#element_type)
1253                            }
1254                        };
1255
1256                        let cycle_id_ident = cycle_id.as_ident();
1257                        graph_builders
1258                            .get_dfir_mut(&input.metadata().location_id)
1259                            .add_dfir(
1260                                parse_quote! {
1261                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1262                                },
1263                                None,
1264                                None,
1265                            );
1266                    }
1267                    // No ID, no callback
1268                    BuildersOrCallback::Callback(_, _) => {}
1269                }
1270            }
1271
1272            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1273                let input_ident =
1274                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1275
1276                match builders_or_callback {
1277                    BuildersOrCallback::Builders(graph_builders) => {
1278                        graph_builders
1279                            .get_dfir_mut(&input.metadata().location_id)
1280                            .add_dfir(
1281                                parse_quote! {
1282                                    #input_ident -> for_each(&mut #ident);
1283                                },
1284                                None,
1285                                Some(&next_stmt_id.to_string()),
1286                            );
1287                    }
1288                    BuildersOrCallback::Callback(leaf_callback, _) => {
1289                        leaf_callback(self, next_stmt_id);
1290                    }
1291                }
1292
1293                *next_stmt_id += 1;
1294            }
1295        }
1296    }
1297
1298    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1299        match self {
1300            HydroRoot::ForEach { op_metadata, .. }
1301            | HydroRoot::SendExternal { op_metadata, .. }
1302            | HydroRoot::DestSink { op_metadata, .. }
1303            | HydroRoot::CycleSink { op_metadata, .. }
1304            | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1305        }
1306    }
1307
1308    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1309        match self {
1310            HydroRoot::ForEach { op_metadata, .. }
1311            | HydroRoot::SendExternal { op_metadata, .. }
1312            | HydroRoot::DestSink { op_metadata, .. }
1313            | HydroRoot::CycleSink { op_metadata, .. }
1314            | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1315        }
1316    }
1317
1318    pub fn input(&self) -> &HydroNode {
1319        match self {
1320            HydroRoot::ForEach { input, .. }
1321            | HydroRoot::SendExternal { input, .. }
1322            | HydroRoot::DestSink { input, .. }
1323            | HydroRoot::CycleSink { input, .. }
1324            | HydroRoot::EmbeddedOutput { input, .. } => input,
1325        }
1326    }
1327
1328    pub fn input_metadata(&self) -> &HydroIrMetadata {
1329        self.input().metadata()
1330    }
1331
1332    pub fn print_root(&self) -> String {
1333        match self {
1334            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1335            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1336            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1337            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1338            HydroRoot::EmbeddedOutput { ident, .. } => {
1339                format!("EmbeddedOutput({})", ident)
1340            }
1341        }
1342    }
1343
1344    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1345        match self {
1346            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1347                transform(f);
1348            }
1349            HydroRoot::SendExternal { .. }
1350            | HydroRoot::CycleSink { .. }
1351            | HydroRoot::EmbeddedOutput { .. } => {}
1352        }
1353    }
1354}
1355
1356#[cfg(feature = "build")]
1357pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1358    let mut builders = SecondaryMap::new();
1359    let mut seen_tees = HashMap::new();
1360    let mut built_tees = HashMap::new();
1361    let mut next_stmt_id = 0;
1362    for leaf in ir {
1363        leaf.emit(
1364            &mut builders,
1365            &mut seen_tees,
1366            &mut built_tees,
1367            &mut next_stmt_id,
1368        );
1369    }
1370    builders
1371}
1372
1373#[cfg(feature = "build")]
1374pub fn traverse_dfir(
1375    ir: &mut [HydroRoot],
1376    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1377    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1378) {
1379    let mut seen_tees = HashMap::new();
1380    let mut built_tees = HashMap::new();
1381    let mut next_stmt_id = 0;
1382    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1383    for leaf in ir.iter_mut() {
1384        leaf.emit_core(
1385            &mut callback,
1386            &mut seen_tees,
1387            &mut built_tees,
1388            &mut next_stmt_id,
1389        );
1390    }
1391}
1392
1393pub fn transform_bottom_up(
1394    ir: &mut [HydroRoot],
1395    transform_root: &mut impl FnMut(&mut HydroRoot),
1396    transform_node: &mut impl FnMut(&mut HydroNode),
1397    check_well_formed: bool,
1398) {
1399    let mut seen_tees = HashMap::new();
1400    for leaf in ir.iter_mut() {
1401        leaf.transform_bottom_up(
1402            transform_root,
1403            transform_node,
1404            &mut seen_tees,
1405            check_well_formed,
1406        );
1407    }
1408}
1409
1410pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1411    let mut seen_tees = HashMap::new();
1412    ir.iter()
1413        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1414        .collect()
1415}
1416
1417type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1418thread_local! {
1419    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1420}
1421
1422pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1423    PRINTED_TEES.with(|printed_tees| {
1424        let mut printed_tees_mut = printed_tees.borrow_mut();
1425        *printed_tees_mut = Some((0, HashMap::new()));
1426        drop(printed_tees_mut);
1427
1428        let ret = f();
1429
1430        let mut printed_tees_mut = printed_tees.borrow_mut();
1431        *printed_tees_mut = None;
1432
1433        ret
1434    })
1435}
1436
1437pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1438
1439impl SharedNode {
1440    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1441        Rc::as_ptr(&self.0)
1442    }
1443}
1444
1445impl Debug for SharedNode {
1446    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1447        PRINTED_TEES.with(|printed_tees| {
1448            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1449            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1450
1451            if let Some(printed_tees_mut) = printed_tees_mut {
1452                if let Some(existing) = printed_tees_mut
1453                    .1
1454                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1455                {
1456                    write!(f, "<shared {}>", existing)
1457                } else {
1458                    let next_id = printed_tees_mut.0;
1459                    printed_tees_mut.0 += 1;
1460                    printed_tees_mut
1461                        .1
1462                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1463                    drop(printed_tees_mut_borrow);
1464                    write!(f, "<shared {}>: ", next_id)?;
1465                    Debug::fmt(&self.0.borrow(), f)
1466                }
1467            } else {
1468                drop(printed_tees_mut_borrow);
1469                write!(f, "<shared>: ")?;
1470                Debug::fmt(&self.0.borrow(), f)
1471            }
1472        })
1473    }
1474}
1475
1476impl Hash for SharedNode {
1477    fn hash<H: Hasher>(&self, state: &mut H) {
1478        self.0.borrow_mut().hash(state);
1479    }
1480}
1481
1482#[derive(Clone, PartialEq, Eq, Debug)]
1483pub enum BoundKind {
1484    Unbounded,
1485    Bounded,
1486}
1487
1488#[derive(Clone, PartialEq, Eq, Debug)]
1489pub enum StreamOrder {
1490    NoOrder,
1491    TotalOrder,
1492}
1493
1494#[derive(Clone, PartialEq, Eq, Debug)]
1495pub enum StreamRetry {
1496    AtLeastOnce,
1497    ExactlyOnce,
1498}
1499
1500#[derive(Clone, PartialEq, Eq, Debug)]
1501pub enum KeyedSingletonBoundKind {
1502    Unbounded,
1503    BoundedValue,
1504    Bounded,
1505}
1506
1507#[derive(Clone, PartialEq, Eq, Debug)]
1508pub enum CollectionKind {
1509    Stream {
1510        bound: BoundKind,
1511        order: StreamOrder,
1512        retry: StreamRetry,
1513        element_type: DebugType,
1514    },
1515    Singleton {
1516        bound: BoundKind,
1517        element_type: DebugType,
1518    },
1519    Optional {
1520        bound: BoundKind,
1521        element_type: DebugType,
1522    },
1523    KeyedStream {
1524        bound: BoundKind,
1525        value_order: StreamOrder,
1526        value_retry: StreamRetry,
1527        key_type: DebugType,
1528        value_type: DebugType,
1529    },
1530    KeyedSingleton {
1531        bound: KeyedSingletonBoundKind,
1532        key_type: DebugType,
1533        value_type: DebugType,
1534    },
1535}
1536
1537impl CollectionKind {
1538    pub fn is_bounded(&self) -> bool {
1539        matches!(
1540            self,
1541            CollectionKind::Stream {
1542                bound: BoundKind::Bounded,
1543                ..
1544            } | CollectionKind::Singleton {
1545                bound: BoundKind::Bounded,
1546                ..
1547            } | CollectionKind::Optional {
1548                bound: BoundKind::Bounded,
1549                ..
1550            } | CollectionKind::KeyedStream {
1551                bound: BoundKind::Bounded,
1552                ..
1553            } | CollectionKind::KeyedSingleton {
1554                bound: KeyedSingletonBoundKind::Bounded,
1555                ..
1556            }
1557        )
1558    }
1559}
1560
1561#[derive(Clone)]
1562pub struct HydroIrMetadata {
1563    pub location_id: LocationId,
1564    pub collection_kind: CollectionKind,
1565    pub cardinality: Option<usize>,
1566    pub tag: Option<String>,
1567    pub op: HydroIrOpMetadata,
1568}
1569
1570// HydroIrMetadata shouldn't be used to hash or compare
1571impl Hash for HydroIrMetadata {
1572    fn hash<H: Hasher>(&self, _: &mut H) {}
1573}
1574
1575impl PartialEq for HydroIrMetadata {
1576    fn eq(&self, _: &Self) -> bool {
1577        true
1578    }
1579}
1580
1581impl Eq for HydroIrMetadata {}
1582
1583impl Debug for HydroIrMetadata {
1584    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1585        f.debug_struct("HydroIrMetadata")
1586            .field("location_id", &self.location_id)
1587            .field("collection_kind", &self.collection_kind)
1588            .finish()
1589    }
1590}
1591
1592/// Metadata that is specific to the operator itself, rather than its outputs.
1593/// This is available on _both_ inner nodes and roots.
1594#[derive(Clone)]
1595pub struct HydroIrOpMetadata {
1596    pub backtrace: Backtrace,
1597    pub cpu_usage: Option<f64>,
1598    pub network_recv_cpu_usage: Option<f64>,
1599    pub id: Option<usize>,
1600}
1601
1602impl HydroIrOpMetadata {
1603    #[expect(
1604        clippy::new_without_default,
1605        reason = "explicit calls to new ensure correct backtrace bounds"
1606    )]
1607    pub fn new() -> HydroIrOpMetadata {
1608        Self::new_with_skip(1)
1609    }
1610
1611    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1612        HydroIrOpMetadata {
1613            backtrace: Backtrace::get_backtrace(2 + skip_count),
1614            cpu_usage: None,
1615            network_recv_cpu_usage: None,
1616            id: None,
1617        }
1618    }
1619}
1620
1621impl Debug for HydroIrOpMetadata {
1622    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1623        f.debug_struct("HydroIrOpMetadata").finish()
1624    }
1625}
1626
1627impl Hash for HydroIrOpMetadata {
1628    fn hash<H: Hasher>(&self, _: &mut H) {}
1629}
1630
1631/// An intermediate node in a Hydro graph, which consumes data
1632/// from upstream nodes and emits data to downstream nodes.
1633#[derive(Debug, Hash)]
1634pub enum HydroNode {
1635    Placeholder,
1636
1637    /// Manually "casts" between two different collection kinds.
1638    ///
1639    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1640    /// correctness checks. In particular, the user must ensure that every possible
1641    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1642    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1643    /// collection. This ensures that the simulator does not miss any possible outputs.
1644    Cast {
1645        inner: Box<HydroNode>,
1646        metadata: HydroIrMetadata,
1647    },
1648
1649    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1650    /// interpretation of the input stream.
1651    ///
1652    /// In production, this simply passes through the input, but in simulation, this operator
1653    /// explicitly selects a randomized interpretation.
1654    ObserveNonDet {
1655        inner: Box<HydroNode>,
1656        trusted: bool, // if true, we do not need to simulate non-determinism
1657        metadata: HydroIrMetadata,
1658    },
1659
1660    Source {
1661        source: HydroSource,
1662        metadata: HydroIrMetadata,
1663    },
1664
1665    SingletonSource {
1666        value: DebugExpr,
1667        first_tick_only: bool,
1668        metadata: HydroIrMetadata,
1669    },
1670
1671    CycleSource {
1672        cycle_id: CycleId,
1673        metadata: HydroIrMetadata,
1674    },
1675
1676    Tee {
1677        inner: SharedNode,
1678        metadata: HydroIrMetadata,
1679    },
1680
1681    Partition {
1682        inner: SharedNode,
1683        f: DebugExpr,
1684        is_true: bool,
1685        metadata: HydroIrMetadata,
1686    },
1687
1688    BeginAtomic {
1689        inner: Box<HydroNode>,
1690        metadata: HydroIrMetadata,
1691    },
1692
1693    EndAtomic {
1694        inner: Box<HydroNode>,
1695        metadata: HydroIrMetadata,
1696    },
1697
1698    Batch {
1699        inner: Box<HydroNode>,
1700        metadata: HydroIrMetadata,
1701    },
1702
1703    YieldConcat {
1704        inner: Box<HydroNode>,
1705        metadata: HydroIrMetadata,
1706    },
1707
1708    Chain {
1709        first: Box<HydroNode>,
1710        second: Box<HydroNode>,
1711        metadata: HydroIrMetadata,
1712    },
1713
1714    ChainFirst {
1715        first: Box<HydroNode>,
1716        second: Box<HydroNode>,
1717        metadata: HydroIrMetadata,
1718    },
1719
1720    CrossProduct {
1721        left: Box<HydroNode>,
1722        right: Box<HydroNode>,
1723        metadata: HydroIrMetadata,
1724    },
1725
1726    CrossSingleton {
1727        left: Box<HydroNode>,
1728        right: Box<HydroNode>,
1729        metadata: HydroIrMetadata,
1730    },
1731
1732    Join {
1733        left: Box<HydroNode>,
1734        right: Box<HydroNode>,
1735        metadata: HydroIrMetadata,
1736    },
1737
1738    Difference {
1739        pos: Box<HydroNode>,
1740        neg: Box<HydroNode>,
1741        metadata: HydroIrMetadata,
1742    },
1743
1744    AntiJoin {
1745        pos: Box<HydroNode>,
1746        neg: Box<HydroNode>,
1747        metadata: HydroIrMetadata,
1748    },
1749
1750    ResolveFutures {
1751        input: Box<HydroNode>,
1752        metadata: HydroIrMetadata,
1753    },
1754    ResolveFuturesOrdered {
1755        input: Box<HydroNode>,
1756        metadata: HydroIrMetadata,
1757    },
1758
1759    Map {
1760        f: DebugExpr,
1761        input: Box<HydroNode>,
1762        metadata: HydroIrMetadata,
1763    },
1764    FlatMap {
1765        f: DebugExpr,
1766        input: Box<HydroNode>,
1767        metadata: HydroIrMetadata,
1768    },
1769    Filter {
1770        f: DebugExpr,
1771        input: Box<HydroNode>,
1772        metadata: HydroIrMetadata,
1773    },
1774    FilterMap {
1775        f: DebugExpr,
1776        input: Box<HydroNode>,
1777        metadata: HydroIrMetadata,
1778    },
1779
1780    DeferTick {
1781        input: Box<HydroNode>,
1782        metadata: HydroIrMetadata,
1783    },
1784    Enumerate {
1785        input: Box<HydroNode>,
1786        metadata: HydroIrMetadata,
1787    },
1788    Inspect {
1789        f: DebugExpr,
1790        input: Box<HydroNode>,
1791        metadata: HydroIrMetadata,
1792    },
1793
1794    Unique {
1795        input: Box<HydroNode>,
1796        metadata: HydroIrMetadata,
1797    },
1798
1799    Sort {
1800        input: Box<HydroNode>,
1801        metadata: HydroIrMetadata,
1802    },
1803    Fold {
1804        init: DebugExpr,
1805        acc: DebugExpr,
1806        input: Box<HydroNode>,
1807        metadata: HydroIrMetadata,
1808    },
1809
1810    Scan {
1811        init: DebugExpr,
1812        acc: DebugExpr,
1813        input: Box<HydroNode>,
1814        metadata: HydroIrMetadata,
1815    },
1816    FoldKeyed {
1817        init: DebugExpr,
1818        acc: DebugExpr,
1819        input: Box<HydroNode>,
1820        metadata: HydroIrMetadata,
1821    },
1822
1823    Reduce {
1824        f: DebugExpr,
1825        input: Box<HydroNode>,
1826        metadata: HydroIrMetadata,
1827    },
1828    ReduceKeyed {
1829        f: DebugExpr,
1830        input: Box<HydroNode>,
1831        metadata: HydroIrMetadata,
1832    },
1833    ReduceKeyedWatermark {
1834        f: DebugExpr,
1835        input: Box<HydroNode>,
1836        watermark: Box<HydroNode>,
1837        metadata: HydroIrMetadata,
1838    },
1839
1840    Network {
1841        name: Option<String>,
1842        networking_info: crate::networking::NetworkingInfo,
1843        serialize_fn: Option<DebugExpr>,
1844        instantiate_fn: DebugInstantiate,
1845        deserialize_fn: Option<DebugExpr>,
1846        input: Box<HydroNode>,
1847        metadata: HydroIrMetadata,
1848    },
1849
1850    ExternalInput {
1851        from_external_key: LocationKey,
1852        from_port_id: ExternalPortId,
1853        from_many: bool,
1854        codec_type: DebugType,
1855        port_hint: NetworkHint,
1856        instantiate_fn: DebugInstantiate,
1857        deserialize_fn: Option<DebugExpr>,
1858        metadata: HydroIrMetadata,
1859    },
1860
1861    Counter {
1862        tag: String,
1863        duration: DebugExpr,
1864        prefix: String,
1865        input: Box<HydroNode>,
1866        metadata: HydroIrMetadata,
1867    },
1868}
1869
1870pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1871pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1872
1873impl HydroNode {
1874    pub fn transform_bottom_up(
1875        &mut self,
1876        transform: &mut impl FnMut(&mut HydroNode),
1877        seen_tees: &mut SeenSharedNodes,
1878        check_well_formed: bool,
1879    ) {
1880        self.transform_children(
1881            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1882            seen_tees,
1883        );
1884
1885        transform(self);
1886
1887        let self_location = self.metadata().location_id.root();
1888
1889        if check_well_formed {
1890            match &*self {
1891                HydroNode::Network { .. } => {}
1892                _ => {
1893                    self.input_metadata().iter().for_each(|i| {
1894                        if i.location_id.root() != self_location {
1895                            panic!(
1896                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1897                                i,
1898                                i.location_id.root(),
1899                                self,
1900                                self_location
1901                            )
1902                        }
1903                    });
1904                }
1905            }
1906        }
1907    }
1908
1909    #[inline(always)]
1910    pub fn transform_children(
1911        &mut self,
1912        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1913        seen_tees: &mut SeenSharedNodes,
1914    ) {
1915        match self {
1916            HydroNode::Placeholder => {
1917                panic!();
1918            }
1919
1920            HydroNode::Source { .. }
1921            | HydroNode::SingletonSource { .. }
1922            | HydroNode::CycleSource { .. }
1923            | HydroNode::ExternalInput { .. } => {}
1924
1925            HydroNode::Tee { inner, .. } => {
1926                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1927                    *inner = SharedNode(transformed.clone());
1928                } else {
1929                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1930                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1931                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1932                    transform(&mut orig, seen_tees);
1933                    *transformed_cell.borrow_mut() = orig;
1934                    *inner = SharedNode(transformed_cell);
1935                }
1936            }
1937
1938            HydroNode::Partition { inner, .. } => {
1939                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1940                    *inner = SharedNode(transformed.clone());
1941                } else {
1942                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1943                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1944                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1945                    transform(&mut orig, seen_tees);
1946                    *transformed_cell.borrow_mut() = orig;
1947                    *inner = SharedNode(transformed_cell);
1948                }
1949            }
1950
1951            HydroNode::Cast { inner, .. }
1952            | HydroNode::ObserveNonDet { inner, .. }
1953            | HydroNode::BeginAtomic { inner, .. }
1954            | HydroNode::EndAtomic { inner, .. }
1955            | HydroNode::Batch { inner, .. }
1956            | HydroNode::YieldConcat { inner, .. } => {
1957                transform(inner.as_mut(), seen_tees);
1958            }
1959
1960            HydroNode::Chain { first, second, .. } => {
1961                transform(first.as_mut(), seen_tees);
1962                transform(second.as_mut(), seen_tees);
1963            }
1964
1965            HydroNode::ChainFirst { first, second, .. } => {
1966                transform(first.as_mut(), seen_tees);
1967                transform(second.as_mut(), seen_tees);
1968            }
1969
1970            HydroNode::CrossSingleton { left, right, .. }
1971            | HydroNode::CrossProduct { left, right, .. }
1972            | HydroNode::Join { left, right, .. } => {
1973                transform(left.as_mut(), seen_tees);
1974                transform(right.as_mut(), seen_tees);
1975            }
1976
1977            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1978                transform(pos.as_mut(), seen_tees);
1979                transform(neg.as_mut(), seen_tees);
1980            }
1981
1982            HydroNode::ReduceKeyedWatermark {
1983                input, watermark, ..
1984            } => {
1985                transform(input.as_mut(), seen_tees);
1986                transform(watermark.as_mut(), seen_tees);
1987            }
1988
1989            HydroNode::Map { input, .. }
1990            | HydroNode::ResolveFutures { input, .. }
1991            | HydroNode::ResolveFuturesOrdered { input, .. }
1992            | HydroNode::FlatMap { input, .. }
1993            | HydroNode::Filter { input, .. }
1994            | HydroNode::FilterMap { input, .. }
1995            | HydroNode::Sort { input, .. }
1996            | HydroNode::DeferTick { input, .. }
1997            | HydroNode::Enumerate { input, .. }
1998            | HydroNode::Inspect { input, .. }
1999            | HydroNode::Unique { input, .. }
2000            | HydroNode::Network { input, .. }
2001            | HydroNode::Fold { input, .. }
2002            | HydroNode::Scan { input, .. }
2003            | HydroNode::FoldKeyed { input, .. }
2004            | HydroNode::Reduce { input, .. }
2005            | HydroNode::ReduceKeyed { input, .. }
2006            | HydroNode::Counter { input, .. } => {
2007                transform(input.as_mut(), seen_tees);
2008            }
2009        }
2010    }
2011
2012    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2013        match self {
2014            HydroNode::Placeholder => HydroNode::Placeholder,
2015            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2016                inner: Box::new(inner.deep_clone(seen_tees)),
2017                metadata: metadata.clone(),
2018            },
2019            HydroNode::ObserveNonDet {
2020                inner,
2021                trusted,
2022                metadata,
2023            } => HydroNode::ObserveNonDet {
2024                inner: Box::new(inner.deep_clone(seen_tees)),
2025                trusted: *trusted,
2026                metadata: metadata.clone(),
2027            },
2028            HydroNode::Source { source, metadata } => HydroNode::Source {
2029                source: source.clone(),
2030                metadata: metadata.clone(),
2031            },
2032            HydroNode::SingletonSource {
2033                value,
2034                first_tick_only,
2035                metadata,
2036            } => HydroNode::SingletonSource {
2037                value: value.clone(),
2038                first_tick_only: *first_tick_only,
2039                metadata: metadata.clone(),
2040            },
2041            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2042                cycle_id: *cycle_id,
2043                metadata: metadata.clone(),
2044            },
2045            HydroNode::Tee { inner, metadata } => {
2046                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2047                    HydroNode::Tee {
2048                        inner: SharedNode(transformed.clone()),
2049                        metadata: metadata.clone(),
2050                    }
2051                } else {
2052                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2053                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2054                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2055                    *new_rc.borrow_mut() = cloned;
2056                    HydroNode::Tee {
2057                        inner: SharedNode(new_rc),
2058                        metadata: metadata.clone(),
2059                    }
2060                }
2061            }
2062            HydroNode::Partition {
2063                inner,
2064                f,
2065                is_true,
2066                metadata,
2067            } => {
2068                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2069                    HydroNode::Partition {
2070                        inner: SharedNode(transformed.clone()),
2071                        f: f.clone(),
2072                        is_true: *is_true,
2073                        metadata: metadata.clone(),
2074                    }
2075                } else {
2076                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2077                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2078                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2079                    *new_rc.borrow_mut() = cloned;
2080                    HydroNode::Partition {
2081                        inner: SharedNode(new_rc),
2082                        f: f.clone(),
2083                        is_true: *is_true,
2084                        metadata: metadata.clone(),
2085                    }
2086                }
2087            }
2088            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2089                inner: Box::new(inner.deep_clone(seen_tees)),
2090                metadata: metadata.clone(),
2091            },
2092            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2093                inner: Box::new(inner.deep_clone(seen_tees)),
2094                metadata: metadata.clone(),
2095            },
2096            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2097                inner: Box::new(inner.deep_clone(seen_tees)),
2098                metadata: metadata.clone(),
2099            },
2100            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2101                inner: Box::new(inner.deep_clone(seen_tees)),
2102                metadata: metadata.clone(),
2103            },
2104            HydroNode::Chain {
2105                first,
2106                second,
2107                metadata,
2108            } => HydroNode::Chain {
2109                first: Box::new(first.deep_clone(seen_tees)),
2110                second: Box::new(second.deep_clone(seen_tees)),
2111                metadata: metadata.clone(),
2112            },
2113            HydroNode::ChainFirst {
2114                first,
2115                second,
2116                metadata,
2117            } => HydroNode::ChainFirst {
2118                first: Box::new(first.deep_clone(seen_tees)),
2119                second: Box::new(second.deep_clone(seen_tees)),
2120                metadata: metadata.clone(),
2121            },
2122            HydroNode::CrossProduct {
2123                left,
2124                right,
2125                metadata,
2126            } => HydroNode::CrossProduct {
2127                left: Box::new(left.deep_clone(seen_tees)),
2128                right: Box::new(right.deep_clone(seen_tees)),
2129                metadata: metadata.clone(),
2130            },
2131            HydroNode::CrossSingleton {
2132                left,
2133                right,
2134                metadata,
2135            } => HydroNode::CrossSingleton {
2136                left: Box::new(left.deep_clone(seen_tees)),
2137                right: Box::new(right.deep_clone(seen_tees)),
2138                metadata: metadata.clone(),
2139            },
2140            HydroNode::Join {
2141                left,
2142                right,
2143                metadata,
2144            } => HydroNode::Join {
2145                left: Box::new(left.deep_clone(seen_tees)),
2146                right: Box::new(right.deep_clone(seen_tees)),
2147                metadata: metadata.clone(),
2148            },
2149            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2150                pos: Box::new(pos.deep_clone(seen_tees)),
2151                neg: Box::new(neg.deep_clone(seen_tees)),
2152                metadata: metadata.clone(),
2153            },
2154            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2155                pos: Box::new(pos.deep_clone(seen_tees)),
2156                neg: Box::new(neg.deep_clone(seen_tees)),
2157                metadata: metadata.clone(),
2158            },
2159            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2160                input: Box::new(input.deep_clone(seen_tees)),
2161                metadata: metadata.clone(),
2162            },
2163            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2164                HydroNode::ResolveFuturesOrdered {
2165                    input: Box::new(input.deep_clone(seen_tees)),
2166                    metadata: metadata.clone(),
2167                }
2168            }
2169            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2170                f: f.clone(),
2171                input: Box::new(input.deep_clone(seen_tees)),
2172                metadata: metadata.clone(),
2173            },
2174            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2175                f: f.clone(),
2176                input: Box::new(input.deep_clone(seen_tees)),
2177                metadata: metadata.clone(),
2178            },
2179            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2180                f: f.clone(),
2181                input: Box::new(input.deep_clone(seen_tees)),
2182                metadata: metadata.clone(),
2183            },
2184            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2185                f: f.clone(),
2186                input: Box::new(input.deep_clone(seen_tees)),
2187                metadata: metadata.clone(),
2188            },
2189            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2190                input: Box::new(input.deep_clone(seen_tees)),
2191                metadata: metadata.clone(),
2192            },
2193            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2194                input: Box::new(input.deep_clone(seen_tees)),
2195                metadata: metadata.clone(),
2196            },
2197            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2198                f: f.clone(),
2199                input: Box::new(input.deep_clone(seen_tees)),
2200                metadata: metadata.clone(),
2201            },
2202            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2203                input: Box::new(input.deep_clone(seen_tees)),
2204                metadata: metadata.clone(),
2205            },
2206            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2207                input: Box::new(input.deep_clone(seen_tees)),
2208                metadata: metadata.clone(),
2209            },
2210            HydroNode::Fold {
2211                init,
2212                acc,
2213                input,
2214                metadata,
2215            } => HydroNode::Fold {
2216                init: init.clone(),
2217                acc: acc.clone(),
2218                input: Box::new(input.deep_clone(seen_tees)),
2219                metadata: metadata.clone(),
2220            },
2221            HydroNode::Scan {
2222                init,
2223                acc,
2224                input,
2225                metadata,
2226            } => HydroNode::Scan {
2227                init: init.clone(),
2228                acc: acc.clone(),
2229                input: Box::new(input.deep_clone(seen_tees)),
2230                metadata: metadata.clone(),
2231            },
2232            HydroNode::FoldKeyed {
2233                init,
2234                acc,
2235                input,
2236                metadata,
2237            } => HydroNode::FoldKeyed {
2238                init: init.clone(),
2239                acc: acc.clone(),
2240                input: Box::new(input.deep_clone(seen_tees)),
2241                metadata: metadata.clone(),
2242            },
2243            HydroNode::ReduceKeyedWatermark {
2244                f,
2245                input,
2246                watermark,
2247                metadata,
2248            } => HydroNode::ReduceKeyedWatermark {
2249                f: f.clone(),
2250                input: Box::new(input.deep_clone(seen_tees)),
2251                watermark: Box::new(watermark.deep_clone(seen_tees)),
2252                metadata: metadata.clone(),
2253            },
2254            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2255                f: f.clone(),
2256                input: Box::new(input.deep_clone(seen_tees)),
2257                metadata: metadata.clone(),
2258            },
2259            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2260                f: f.clone(),
2261                input: Box::new(input.deep_clone(seen_tees)),
2262                metadata: metadata.clone(),
2263            },
2264            HydroNode::Network {
2265                name,
2266                networking_info,
2267                serialize_fn,
2268                instantiate_fn,
2269                deserialize_fn,
2270                input,
2271                metadata,
2272            } => HydroNode::Network {
2273                name: name.clone(),
2274                networking_info: networking_info.clone(),
2275                serialize_fn: serialize_fn.clone(),
2276                instantiate_fn: instantiate_fn.clone(),
2277                deserialize_fn: deserialize_fn.clone(),
2278                input: Box::new(input.deep_clone(seen_tees)),
2279                metadata: metadata.clone(),
2280            },
2281            HydroNode::ExternalInput {
2282                from_external_key,
2283                from_port_id,
2284                from_many,
2285                codec_type,
2286                port_hint,
2287                instantiate_fn,
2288                deserialize_fn,
2289                metadata,
2290            } => HydroNode::ExternalInput {
2291                from_external_key: *from_external_key,
2292                from_port_id: *from_port_id,
2293                from_many: *from_many,
2294                codec_type: codec_type.clone(),
2295                port_hint: *port_hint,
2296                instantiate_fn: instantiate_fn.clone(),
2297                deserialize_fn: deserialize_fn.clone(),
2298                metadata: metadata.clone(),
2299            },
2300            HydroNode::Counter {
2301                tag,
2302                duration,
2303                prefix,
2304                input,
2305                metadata,
2306            } => HydroNode::Counter {
2307                tag: tag.clone(),
2308                duration: duration.clone(),
2309                prefix: prefix.clone(),
2310                input: Box::new(input.deep_clone(seen_tees)),
2311                metadata: metadata.clone(),
2312            },
2313        }
2314    }
2315
2316    #[cfg(feature = "build")]
2317    pub fn emit_core(
2318        &mut self,
2319        builders_or_callback: &mut BuildersOrCallback<
2320            impl FnMut(&mut HydroRoot, &mut usize),
2321            impl FnMut(&mut HydroNode, &mut usize),
2322        >,
2323        seen_tees: &mut SeenSharedNodes,
2324        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2325        next_stmt_id: &mut usize,
2326    ) -> syn::Ident {
2327        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2328
2329        self.transform_bottom_up(
2330            &mut |node: &mut HydroNode| {
2331                let out_location = node.metadata().location_id.clone();
2332                match node {
2333                    HydroNode::Placeholder => {
2334                        panic!()
2335                    }
2336
2337                    HydroNode::Cast { .. } => {
2338                        // Cast passes through the input ident unchanged
2339                        // The input ident is already on the stack from processing the child
2340                        match builders_or_callback {
2341                            BuildersOrCallback::Builders(_) => {}
2342                            BuildersOrCallback::Callback(_, node_callback) => {
2343                                node_callback(node, next_stmt_id);
2344                            }
2345                        }
2346
2347                        *next_stmt_id += 1;
2348                        // input_ident stays on stack as output
2349                    }
2350
2351                    HydroNode::ObserveNonDet {
2352                        inner,
2353                        trusted,
2354                        metadata,
2355                        ..
2356                    } => {
2357                        let inner_ident = ident_stack.pop().unwrap();
2358
2359                        let observe_ident =
2360                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2361
2362                        match builders_or_callback {
2363                            BuildersOrCallback::Builders(graph_builders) => {
2364                                graph_builders.observe_nondet(
2365                                    *trusted,
2366                                    &inner.metadata().location_id,
2367                                    inner_ident,
2368                                    &inner.metadata().collection_kind,
2369                                    &observe_ident,
2370                                    &metadata.collection_kind,
2371                                    &metadata.op,
2372                                );
2373                            }
2374                            BuildersOrCallback::Callback(_, node_callback) => {
2375                                node_callback(node, next_stmt_id);
2376                            }
2377                        }
2378
2379                        *next_stmt_id += 1;
2380
2381                        ident_stack.push(observe_ident);
2382                    }
2383
2384                    HydroNode::Batch {
2385                        inner, metadata, ..
2386                    } => {
2387                        let inner_ident = ident_stack.pop().unwrap();
2388
2389                        let batch_ident =
2390                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2391
2392                        match builders_or_callback {
2393                            BuildersOrCallback::Builders(graph_builders) => {
2394                                graph_builders.batch(
2395                                    inner_ident,
2396                                    &inner.metadata().location_id,
2397                                    &inner.metadata().collection_kind,
2398                                    &batch_ident,
2399                                    &out_location,
2400                                    &metadata.op,
2401                                );
2402                            }
2403                            BuildersOrCallback::Callback(_, node_callback) => {
2404                                node_callback(node, next_stmt_id);
2405                            }
2406                        }
2407
2408                        *next_stmt_id += 1;
2409
2410                        ident_stack.push(batch_ident);
2411                    }
2412
2413                    HydroNode::YieldConcat { inner, .. } => {
2414                        let inner_ident = ident_stack.pop().unwrap();
2415
2416                        let yield_ident =
2417                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2418
2419                        match builders_or_callback {
2420                            BuildersOrCallback::Builders(graph_builders) => {
2421                                graph_builders.yield_from_tick(
2422                                    inner_ident,
2423                                    &inner.metadata().location_id,
2424                                    &inner.metadata().collection_kind,
2425                                    &yield_ident,
2426                                    &out_location,
2427                                );
2428                            }
2429                            BuildersOrCallback::Callback(_, node_callback) => {
2430                                node_callback(node, next_stmt_id);
2431                            }
2432                        }
2433
2434                        *next_stmt_id += 1;
2435
2436                        ident_stack.push(yield_ident);
2437                    }
2438
2439                    HydroNode::BeginAtomic { inner, metadata } => {
2440                        let inner_ident = ident_stack.pop().unwrap();
2441
2442                        let begin_ident =
2443                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2444
2445                        match builders_or_callback {
2446                            BuildersOrCallback::Builders(graph_builders) => {
2447                                graph_builders.begin_atomic(
2448                                    inner_ident,
2449                                    &inner.metadata().location_id,
2450                                    &inner.metadata().collection_kind,
2451                                    &begin_ident,
2452                                    &out_location,
2453                                    &metadata.op,
2454                                );
2455                            }
2456                            BuildersOrCallback::Callback(_, node_callback) => {
2457                                node_callback(node, next_stmt_id);
2458                            }
2459                        }
2460
2461                        *next_stmt_id += 1;
2462
2463                        ident_stack.push(begin_ident);
2464                    }
2465
2466                    HydroNode::EndAtomic { inner, .. } => {
2467                        let inner_ident = ident_stack.pop().unwrap();
2468
2469                        let end_ident =
2470                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2471
2472                        match builders_or_callback {
2473                            BuildersOrCallback::Builders(graph_builders) => {
2474                                graph_builders.end_atomic(
2475                                    inner_ident,
2476                                    &inner.metadata().location_id,
2477                                    &inner.metadata().collection_kind,
2478                                    &end_ident,
2479                                );
2480                            }
2481                            BuildersOrCallback::Callback(_, node_callback) => {
2482                                node_callback(node, next_stmt_id);
2483                            }
2484                        }
2485
2486                        *next_stmt_id += 1;
2487
2488                        ident_stack.push(end_ident);
2489                    }
2490
2491                    HydroNode::Source {
2492                        source, metadata, ..
2493                    } => {
2494                        if let HydroSource::ExternalNetwork() = source {
2495                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2496                        } else {
2497                            let source_ident =
2498                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2499
2500                            let source_stmt = match source {
2501                                HydroSource::Stream(expr) => {
2502                                    debug_assert!(metadata.location_id.is_top_level());
2503                                    parse_quote! {
2504                                        #source_ident = source_stream(#expr);
2505                                    }
2506                                }
2507
2508                                HydroSource::ExternalNetwork() => {
2509                                    unreachable!()
2510                                }
2511
2512                                HydroSource::Iter(expr) => {
2513                                    if metadata.location_id.is_top_level() {
2514                                        parse_quote! {
2515                                            #source_ident = source_iter(#expr);
2516                                        }
2517                                    } else {
2518                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2519                                        parse_quote! {
2520                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2521                                        }
2522                                    }
2523                                }
2524
2525                                HydroSource::Spin() => {
2526                                    debug_assert!(metadata.location_id.is_top_level());
2527                                    parse_quote! {
2528                                        #source_ident = spin();
2529                                    }
2530                                }
2531
2532                                HydroSource::ClusterMembers(target_loc, state) => {
2533                                    debug_assert!(metadata.location_id.is_top_level());
2534
2535                                    let members_tee_ident = syn::Ident::new(
2536                                        &format!(
2537                                            "__cluster_members_tee_{}_{}",
2538                                            metadata.location_id.root().key(),
2539                                            target_loc.key(),
2540                                        ),
2541                                        Span::call_site(),
2542                                    );
2543
2544                                    match state {
2545                                        ClusterMembersState::Stream(d) => {
2546                                            parse_quote! {
2547                                                #members_tee_ident = source_stream(#d) -> tee();
2548                                                #source_ident = #members_tee_ident;
2549                                            }
2550                                        },
2551                                        ClusterMembersState::Uninit => syn::parse_quote! {
2552                                            #source_ident = source_stream(DUMMY);
2553                                        },
2554                                        ClusterMembersState::Tee(..) => parse_quote! {
2555                                            #source_ident = #members_tee_ident;
2556                                        },
2557                                    }
2558                                }
2559
2560                                HydroSource::Embedded(ident) => {
2561                                    parse_quote! {
2562                                        #source_ident = source_stream(#ident);
2563                                    }
2564                                }
2565                            };
2566
2567                            match builders_or_callback {
2568                                BuildersOrCallback::Builders(graph_builders) => {
2569                                    let builder = graph_builders.get_dfir_mut(&out_location);
2570                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2571                                }
2572                                BuildersOrCallback::Callback(_, node_callback) => {
2573                                    node_callback(node, next_stmt_id);
2574                                }
2575                            }
2576
2577                            *next_stmt_id += 1;
2578
2579                            ident_stack.push(source_ident);
2580                        }
2581                    }
2582
2583                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2584                        let source_ident =
2585                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2586
2587                        match builders_or_callback {
2588                            BuildersOrCallback::Builders(graph_builders) => {
2589                                let builder = graph_builders.get_dfir_mut(&out_location);
2590
2591                                if *first_tick_only {
2592                                    assert!(
2593                                        !metadata.location_id.is_top_level(),
2594                                        "first_tick_only SingletonSource must be inside a tick"
2595                                    );
2596                                }
2597
2598                                if *first_tick_only
2599                                    || (metadata.location_id.is_top_level()
2600                                        && metadata.collection_kind.is_bounded())
2601                                {
2602                                    builder.add_dfir(
2603                                        parse_quote! {
2604                                            #source_ident = source_iter([#value]);
2605                                        },
2606                                        None,
2607                                        Some(&next_stmt_id.to_string()),
2608                                    );
2609                                } else {
2610                                    builder.add_dfir(
2611                                        parse_quote! {
2612                                            #source_ident = source_iter([#value]) -> persist::<'static>();
2613                                        },
2614                                        None,
2615                                        Some(&next_stmt_id.to_string()),
2616                                    );
2617                                }
2618                            }
2619                            BuildersOrCallback::Callback(_, node_callback) => {
2620                                node_callback(node, next_stmt_id);
2621                            }
2622                        }
2623
2624                        *next_stmt_id += 1;
2625
2626                        ident_stack.push(source_ident);
2627                    }
2628
2629                    HydroNode::CycleSource { cycle_id, .. } => {
2630                        let ident = cycle_id.as_ident();
2631
2632                        match builders_or_callback {
2633                            BuildersOrCallback::Builders(_) => {}
2634                            BuildersOrCallback::Callback(_, node_callback) => {
2635                                node_callback(node, next_stmt_id);
2636                            }
2637                        }
2638
2639                        // consume a stmt id even though we did not emit anything so that we can instrument this
2640                        *next_stmt_id += 1;
2641
2642                        ident_stack.push(ident);
2643                    }
2644
2645                    HydroNode::Tee { inner, .. } => {
2646                        let ret_ident = if let Some(built_idents) =
2647                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2648                        {
2649                            match builders_or_callback {
2650                                BuildersOrCallback::Builders(_) => {}
2651                                BuildersOrCallback::Callback(_, node_callback) => {
2652                                    node_callback(node, next_stmt_id);
2653                                }
2654                            }
2655
2656                            built_idents[0].clone()
2657                        } else {
2658                            // The inner node was already processed by transform_bottom_up,
2659                            // so its ident is on the stack
2660                            let inner_ident = ident_stack.pop().unwrap();
2661
2662                            let tee_ident =
2663                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2664
2665                            built_tees.insert(
2666                                inner.0.as_ref() as *const RefCell<HydroNode>,
2667                                vec![tee_ident.clone()],
2668                            );
2669
2670                            match builders_or_callback {
2671                                BuildersOrCallback::Builders(graph_builders) => {
2672                                    let builder = graph_builders.get_dfir_mut(&out_location);
2673                                    builder.add_dfir(
2674                                        parse_quote! {
2675                                            #tee_ident = #inner_ident -> tee();
2676                                        },
2677                                        None,
2678                                        Some(&next_stmt_id.to_string()),
2679                                    );
2680                                }
2681                                BuildersOrCallback::Callback(_, node_callback) => {
2682                                    node_callback(node, next_stmt_id);
2683                                }
2684                            }
2685
2686                            tee_ident
2687                        };
2688
2689                        // we consume a stmt id regardless of if we emit the tee() operator,
2690                        // so that during rewrites we touch all recipients of the tee()
2691
2692                        *next_stmt_id += 1;
2693                        ident_stack.push(ret_ident);
2694                    }
2695
2696                    HydroNode::Partition {
2697                        inner, f, is_true, ..
2698                    } => {
2699                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
2700                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
2701                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
2702                            match builders_or_callback {
2703                                BuildersOrCallback::Builders(_) => {}
2704                                BuildersOrCallback::Callback(_, node_callback) => {
2705                                    node_callback(node, next_stmt_id);
2706                                }
2707                            }
2708
2709                            let idx = if is_true { 0 } else { 1 };
2710                            built_idents[idx].clone()
2711                        } else {
2712                            // The inner node was already processed by transform_bottom_up,
2713                            // so its ident is on the stack
2714                            let inner_ident = ident_stack.pop().unwrap();
2715
2716                            let partition_ident = syn::Ident::new(
2717                                &format!("stream_{}_partition", *next_stmt_id),
2718                                Span::call_site(),
2719                            );
2720                            let true_ident = syn::Ident::new(
2721                                &format!("stream_{}_true", *next_stmt_id),
2722                                Span::call_site(),
2723                            );
2724                            let false_ident = syn::Ident::new(
2725                                &format!("stream_{}_false", *next_stmt_id),
2726                                Span::call_site(),
2727                            );
2728
2729                            built_tees.insert(
2730                                ptr,
2731                                vec![true_ident.clone(), false_ident.clone()],
2732                            );
2733
2734                            match builders_or_callback {
2735                                BuildersOrCallback::Builders(graph_builders) => {
2736                                    let builder = graph_builders.get_dfir_mut(&out_location);
2737                                    builder.add_dfir(
2738                                        parse_quote! {
2739                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
2740                                            #true_ident = #partition_ident[0];
2741                                            #false_ident = #partition_ident[1];
2742                                        },
2743                                        None,
2744                                        Some(&next_stmt_id.to_string()),
2745                                    );
2746                                }
2747                                BuildersOrCallback::Callback(_, node_callback) => {
2748                                    node_callback(node, next_stmt_id);
2749                                }
2750                            }
2751
2752                            if is_true { true_ident } else { false_ident }
2753                        };
2754
2755                        *next_stmt_id += 1;
2756                        ident_stack.push(ret_ident);
2757                    }
2758
2759                    HydroNode::Chain { .. } => {
2760                        // Children are processed left-to-right, so second is on top
2761                        let second_ident = ident_stack.pop().unwrap();
2762                        let first_ident = ident_stack.pop().unwrap();
2763
2764                        let chain_ident =
2765                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2766
2767                        match builders_or_callback {
2768                            BuildersOrCallback::Builders(graph_builders) => {
2769                                let builder = graph_builders.get_dfir_mut(&out_location);
2770                                builder.add_dfir(
2771                                    parse_quote! {
2772                                        #chain_ident = chain();
2773                                        #first_ident -> [0]#chain_ident;
2774                                        #second_ident -> [1]#chain_ident;
2775                                    },
2776                                    None,
2777                                    Some(&next_stmt_id.to_string()),
2778                                );
2779                            }
2780                            BuildersOrCallback::Callback(_, node_callback) => {
2781                                node_callback(node, next_stmt_id);
2782                            }
2783                        }
2784
2785                        *next_stmt_id += 1;
2786
2787                        ident_stack.push(chain_ident);
2788                    }
2789
2790                    HydroNode::ChainFirst { .. } => {
2791                        let second_ident = ident_stack.pop().unwrap();
2792                        let first_ident = ident_stack.pop().unwrap();
2793
2794                        let chain_ident =
2795                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2796
2797                        match builders_or_callback {
2798                            BuildersOrCallback::Builders(graph_builders) => {
2799                                let builder = graph_builders.get_dfir_mut(&out_location);
2800                                builder.add_dfir(
2801                                    parse_quote! {
2802                                        #chain_ident = chain_first_n(1);
2803                                        #first_ident -> [0]#chain_ident;
2804                                        #second_ident -> [1]#chain_ident;
2805                                    },
2806                                    None,
2807                                    Some(&next_stmt_id.to_string()),
2808                                );
2809                            }
2810                            BuildersOrCallback::Callback(_, node_callback) => {
2811                                node_callback(node, next_stmt_id);
2812                            }
2813                        }
2814
2815                        *next_stmt_id += 1;
2816
2817                        ident_stack.push(chain_ident);
2818                    }
2819
2820                    HydroNode::CrossSingleton { right, .. } => {
2821                        let right_ident = ident_stack.pop().unwrap();
2822                        let left_ident = ident_stack.pop().unwrap();
2823
2824                        let cross_ident =
2825                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2826
2827                        match builders_or_callback {
2828                            BuildersOrCallback::Builders(graph_builders) => {
2829                                let builder = graph_builders.get_dfir_mut(&out_location);
2830
2831                                if right.metadata().location_id.is_top_level()
2832                                    && right.metadata().collection_kind.is_bounded()
2833                                {
2834                                    builder.add_dfir(
2835                                        parse_quote! {
2836                                            #cross_ident = cross_singleton();
2837                                            #left_ident -> [input]#cross_ident;
2838                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
2839                                        },
2840                                        None,
2841                                        Some(&next_stmt_id.to_string()),
2842                                    );
2843                                } else {
2844                                    builder.add_dfir(
2845                                        parse_quote! {
2846                                            #cross_ident = cross_singleton();
2847                                            #left_ident -> [input]#cross_ident;
2848                                            #right_ident -> [single]#cross_ident;
2849                                        },
2850                                        None,
2851                                        Some(&next_stmt_id.to_string()),
2852                                    );
2853                                }
2854                            }
2855                            BuildersOrCallback::Callback(_, node_callback) => {
2856                                node_callback(node, next_stmt_id);
2857                            }
2858                        }
2859
2860                        *next_stmt_id += 1;
2861
2862                        ident_stack.push(cross_ident);
2863                    }
2864
2865                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2866                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2867                            parse_quote!(cross_join_multiset)
2868                        } else {
2869                            parse_quote!(join_multiset)
2870                        };
2871
2872                        let (HydroNode::CrossProduct { left, right, .. }
2873                        | HydroNode::Join { left, right, .. }) = node
2874                        else {
2875                            unreachable!()
2876                        };
2877
2878                        let is_top_level = left.metadata().location_id.is_top_level()
2879                            && right.metadata().location_id.is_top_level();
2880                        let left_lifetime = if left.metadata().location_id.is_top_level() {
2881                            quote!('static)
2882                        } else {
2883                            quote!('tick)
2884                        };
2885
2886                        let right_lifetime = if right.metadata().location_id.is_top_level() {
2887                            quote!('static)
2888                        } else {
2889                            quote!('tick)
2890                        };
2891
2892                        let right_ident = ident_stack.pop().unwrap();
2893                        let left_ident = ident_stack.pop().unwrap();
2894
2895                        let stream_ident =
2896                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2897
2898                        match builders_or_callback {
2899                            BuildersOrCallback::Builders(graph_builders) => {
2900                                let builder = graph_builders.get_dfir_mut(&out_location);
2901                                builder.add_dfir(
2902                                    if is_top_level {
2903                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
2904                                        // a multiset_delta() to negate the replay behavior
2905                                        parse_quote! {
2906                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2907                                            #left_ident -> [0]#stream_ident;
2908                                            #right_ident -> [1]#stream_ident;
2909                                        }
2910                                    } else {
2911                                        parse_quote! {
2912                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2913                                            #left_ident -> [0]#stream_ident;
2914                                            #right_ident -> [1]#stream_ident;
2915                                        }
2916                                    }
2917                                    ,
2918                                    None,
2919                                    Some(&next_stmt_id.to_string()),
2920                                );
2921                            }
2922                            BuildersOrCallback::Callback(_, node_callback) => {
2923                                node_callback(node, next_stmt_id);
2924                            }
2925                        }
2926
2927                        *next_stmt_id += 1;
2928
2929                        ident_stack.push(stream_ident);
2930                    }
2931
2932                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2933                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2934                            parse_quote!(difference)
2935                        } else {
2936                            parse_quote!(anti_join)
2937                        };
2938
2939                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2940                            node
2941                        else {
2942                            unreachable!()
2943                        };
2944
2945                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2946                            quote!('static)
2947                        } else {
2948                            quote!('tick)
2949                        };
2950
2951                        let neg_ident = ident_stack.pop().unwrap();
2952                        let pos_ident = ident_stack.pop().unwrap();
2953
2954                        let stream_ident =
2955                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2956
2957                        match builders_or_callback {
2958                            BuildersOrCallback::Builders(graph_builders) => {
2959                                let builder = graph_builders.get_dfir_mut(&out_location);
2960                                builder.add_dfir(
2961                                    parse_quote! {
2962                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
2963                                        #pos_ident -> [pos]#stream_ident;
2964                                        #neg_ident -> [neg]#stream_ident;
2965                                    },
2966                                    None,
2967                                    Some(&next_stmt_id.to_string()),
2968                                );
2969                            }
2970                            BuildersOrCallback::Callback(_, node_callback) => {
2971                                node_callback(node, next_stmt_id);
2972                            }
2973                        }
2974
2975                        *next_stmt_id += 1;
2976
2977                        ident_stack.push(stream_ident);
2978                    }
2979
2980                    HydroNode::ResolveFutures { .. } => {
2981                        let input_ident = ident_stack.pop().unwrap();
2982
2983                        let futures_ident =
2984                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2985
2986                        match builders_or_callback {
2987                            BuildersOrCallback::Builders(graph_builders) => {
2988                                let builder = graph_builders.get_dfir_mut(&out_location);
2989                                builder.add_dfir(
2990                                    parse_quote! {
2991                                        #futures_ident = #input_ident -> resolve_futures();
2992                                    },
2993                                    None,
2994                                    Some(&next_stmt_id.to_string()),
2995                                );
2996                            }
2997                            BuildersOrCallback::Callback(_, node_callback) => {
2998                                node_callback(node, next_stmt_id);
2999                            }
3000                        }
3001
3002                        *next_stmt_id += 1;
3003
3004                        ident_stack.push(futures_ident);
3005                    }
3006
3007                    HydroNode::ResolveFuturesOrdered { .. } => {
3008                        let input_ident = ident_stack.pop().unwrap();
3009
3010                        let futures_ident =
3011                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3012
3013                        match builders_or_callback {
3014                            BuildersOrCallback::Builders(graph_builders) => {
3015                                let builder = graph_builders.get_dfir_mut(&out_location);
3016                                builder.add_dfir(
3017                                    parse_quote! {
3018                                        #futures_ident = #input_ident -> resolve_futures_ordered();
3019                                    },
3020                                    None,
3021                                    Some(&next_stmt_id.to_string()),
3022                                );
3023                            }
3024                            BuildersOrCallback::Callback(_, node_callback) => {
3025                                node_callback(node, next_stmt_id);
3026                            }
3027                        }
3028
3029                        *next_stmt_id += 1;
3030
3031                        ident_stack.push(futures_ident);
3032                    }
3033
3034                    HydroNode::Map { f, .. } => {
3035                        let input_ident = ident_stack.pop().unwrap();
3036
3037                        let map_ident =
3038                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3039
3040                        match builders_or_callback {
3041                            BuildersOrCallback::Builders(graph_builders) => {
3042                                let builder = graph_builders.get_dfir_mut(&out_location);
3043                                builder.add_dfir(
3044                                    parse_quote! {
3045                                        #map_ident = #input_ident -> map(#f);
3046                                    },
3047                                    None,
3048                                    Some(&next_stmt_id.to_string()),
3049                                );
3050                            }
3051                            BuildersOrCallback::Callback(_, node_callback) => {
3052                                node_callback(node, next_stmt_id);
3053                            }
3054                        }
3055
3056                        *next_stmt_id += 1;
3057
3058                        ident_stack.push(map_ident);
3059                    }
3060
3061                    HydroNode::FlatMap { f, .. } => {
3062                        let input_ident = ident_stack.pop().unwrap();
3063
3064                        let flat_map_ident =
3065                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3066
3067                        match builders_or_callback {
3068                            BuildersOrCallback::Builders(graph_builders) => {
3069                                let builder = graph_builders.get_dfir_mut(&out_location);
3070                                builder.add_dfir(
3071                                    parse_quote! {
3072                                        #flat_map_ident = #input_ident -> flat_map(#f);
3073                                    },
3074                                    None,
3075                                    Some(&next_stmt_id.to_string()),
3076                                );
3077                            }
3078                            BuildersOrCallback::Callback(_, node_callback) => {
3079                                node_callback(node, next_stmt_id);
3080                            }
3081                        }
3082
3083                        *next_stmt_id += 1;
3084
3085                        ident_stack.push(flat_map_ident);
3086                    }
3087
3088                    HydroNode::Filter { f, .. } => {
3089                        let input_ident = ident_stack.pop().unwrap();
3090
3091                        let filter_ident =
3092                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3093
3094                        match builders_or_callback {
3095                            BuildersOrCallback::Builders(graph_builders) => {
3096                                let builder = graph_builders.get_dfir_mut(&out_location);
3097                                builder.add_dfir(
3098                                    parse_quote! {
3099                                        #filter_ident = #input_ident -> filter(#f);
3100                                    },
3101                                    None,
3102                                    Some(&next_stmt_id.to_string()),
3103                                );
3104                            }
3105                            BuildersOrCallback::Callback(_, node_callback) => {
3106                                node_callback(node, next_stmt_id);
3107                            }
3108                        }
3109
3110                        *next_stmt_id += 1;
3111
3112                        ident_stack.push(filter_ident);
3113                    }
3114
3115                    HydroNode::FilterMap { f, .. } => {
3116                        let input_ident = ident_stack.pop().unwrap();
3117
3118                        let filter_map_ident =
3119                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3120
3121                        match builders_or_callback {
3122                            BuildersOrCallback::Builders(graph_builders) => {
3123                                let builder = graph_builders.get_dfir_mut(&out_location);
3124                                builder.add_dfir(
3125                                    parse_quote! {
3126                                        #filter_map_ident = #input_ident -> filter_map(#f);
3127                                    },
3128                                    None,
3129                                    Some(&next_stmt_id.to_string()),
3130                                );
3131                            }
3132                            BuildersOrCallback::Callback(_, node_callback) => {
3133                                node_callback(node, next_stmt_id);
3134                            }
3135                        }
3136
3137                        *next_stmt_id += 1;
3138
3139                        ident_stack.push(filter_map_ident);
3140                    }
3141
3142                    HydroNode::Sort { .. } => {
3143                        let input_ident = ident_stack.pop().unwrap();
3144
3145                        let sort_ident =
3146                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3147
3148                        match builders_or_callback {
3149                            BuildersOrCallback::Builders(graph_builders) => {
3150                                let builder = graph_builders.get_dfir_mut(&out_location);
3151                                builder.add_dfir(
3152                                    parse_quote! {
3153                                        #sort_ident = #input_ident -> sort();
3154                                    },
3155                                    None,
3156                                    Some(&next_stmt_id.to_string()),
3157                                );
3158                            }
3159                            BuildersOrCallback::Callback(_, node_callback) => {
3160                                node_callback(node, next_stmt_id);
3161                            }
3162                        }
3163
3164                        *next_stmt_id += 1;
3165
3166                        ident_stack.push(sort_ident);
3167                    }
3168
3169                    HydroNode::DeferTick { .. } => {
3170                        let input_ident = ident_stack.pop().unwrap();
3171
3172                        let defer_tick_ident =
3173                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3174
3175                        match builders_or_callback {
3176                            BuildersOrCallback::Builders(graph_builders) => {
3177                                let builder = graph_builders.get_dfir_mut(&out_location);
3178                                builder.add_dfir(
3179                                    parse_quote! {
3180                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
3181                                    },
3182                                    None,
3183                                    Some(&next_stmt_id.to_string()),
3184                                );
3185                            }
3186                            BuildersOrCallback::Callback(_, node_callback) => {
3187                                node_callback(node, next_stmt_id);
3188                            }
3189                        }
3190
3191                        *next_stmt_id += 1;
3192
3193                        ident_stack.push(defer_tick_ident);
3194                    }
3195
3196                    HydroNode::Enumerate { input, .. } => {
3197                        let input_ident = ident_stack.pop().unwrap();
3198
3199                        let enumerate_ident =
3200                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3201
3202                        match builders_or_callback {
3203                            BuildersOrCallback::Builders(graph_builders) => {
3204                                let builder = graph_builders.get_dfir_mut(&out_location);
3205                                let lifetime = if input.metadata().location_id.is_top_level() {
3206                                    quote!('static)
3207                                } else {
3208                                    quote!('tick)
3209                                };
3210                                builder.add_dfir(
3211                                    parse_quote! {
3212                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3213                                    },
3214                                    None,
3215                                    Some(&next_stmt_id.to_string()),
3216                                );
3217                            }
3218                            BuildersOrCallback::Callback(_, node_callback) => {
3219                                node_callback(node, next_stmt_id);
3220                            }
3221                        }
3222
3223                        *next_stmt_id += 1;
3224
3225                        ident_stack.push(enumerate_ident);
3226                    }
3227
3228                    HydroNode::Inspect { f, .. } => {
3229                        let input_ident = ident_stack.pop().unwrap();
3230
3231                        let inspect_ident =
3232                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3233
3234                        match builders_or_callback {
3235                            BuildersOrCallback::Builders(graph_builders) => {
3236                                let builder = graph_builders.get_dfir_mut(&out_location);
3237                                builder.add_dfir(
3238                                    parse_quote! {
3239                                        #inspect_ident = #input_ident -> inspect(#f);
3240                                    },
3241                                    None,
3242                                    Some(&next_stmt_id.to_string()),
3243                                );
3244                            }
3245                            BuildersOrCallback::Callback(_, node_callback) => {
3246                                node_callback(node, next_stmt_id);
3247                            }
3248                        }
3249
3250                        *next_stmt_id += 1;
3251
3252                        ident_stack.push(inspect_ident);
3253                    }
3254
3255                    HydroNode::Unique { input, .. } => {
3256                        let input_ident = ident_stack.pop().unwrap();
3257
3258                        let unique_ident =
3259                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3260
3261                        match builders_or_callback {
3262                            BuildersOrCallback::Builders(graph_builders) => {
3263                                let builder = graph_builders.get_dfir_mut(&out_location);
3264                                let lifetime = if input.metadata().location_id.is_top_level() {
3265                                    quote!('static)
3266                                } else {
3267                                    quote!('tick)
3268                                };
3269
3270                                builder.add_dfir(
3271                                    parse_quote! {
3272                                        #unique_ident = #input_ident -> unique::<#lifetime>();
3273                                    },
3274                                    None,
3275                                    Some(&next_stmt_id.to_string()),
3276                                );
3277                            }
3278                            BuildersOrCallback::Callback(_, node_callback) => {
3279                                node_callback(node, next_stmt_id);
3280                            }
3281                        }
3282
3283                        *next_stmt_id += 1;
3284
3285                        ident_stack.push(unique_ident);
3286                    }
3287
3288                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3289                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3290                            if input.metadata().location_id.is_top_level()
3291                                && input.metadata().collection_kind.is_bounded()
3292                            {
3293                                parse_quote!(fold_no_replay)
3294                            } else {
3295                                parse_quote!(fold)
3296                            }
3297                        } else if matches!(node, HydroNode::Scan { .. }) {
3298                            parse_quote!(scan)
3299                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3300                            if input.metadata().location_id.is_top_level()
3301                                && input.metadata().collection_kind.is_bounded()
3302                            {
3303                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3304                            } else {
3305                                parse_quote!(fold_keyed)
3306                            }
3307                        } else {
3308                            unreachable!()
3309                        };
3310
3311                        let (HydroNode::Fold { input, .. }
3312                        | HydroNode::FoldKeyed { input, .. }
3313                        | HydroNode::Scan { input, .. }) = node
3314                        else {
3315                            unreachable!()
3316                        };
3317
3318                        let lifetime = if input.metadata().location_id.is_top_level() {
3319                            quote!('static)
3320                        } else {
3321                            quote!('tick)
3322                        };
3323
3324                        let input_ident = ident_stack.pop().unwrap();
3325
3326                        let (HydroNode::Fold { init, acc, .. }
3327                        | HydroNode::FoldKeyed { init, acc, .. }
3328                        | HydroNode::Scan { init, acc, .. }) = &*node
3329                        else {
3330                            unreachable!()
3331                        };
3332
3333                        let fold_ident =
3334                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3335
3336                        match builders_or_callback {
3337                            BuildersOrCallback::Builders(graph_builders) => {
3338                                if matches!(node, HydroNode::Fold { .. })
3339                                    && node.metadata().location_id.is_top_level()
3340                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3341                                    && graph_builders.singleton_intermediates()
3342                                    && !node.metadata().collection_kind.is_bounded()
3343                                {
3344                                    let builder = graph_builders.get_dfir_mut(&out_location);
3345
3346                                    let acc: syn::Expr = parse_quote!({
3347                                        let mut __inner = #acc;
3348                                        move |__state, __value| {
3349                                            __inner(__state, __value);
3350                                            Some(__state.clone())
3351                                        }
3352                                    });
3353
3354                                    builder.add_dfir(
3355                                        parse_quote! {
3356                                            source_iter([(#init)()]) -> [0]#fold_ident;
3357                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3358                                            #fold_ident = chain();
3359                                        },
3360                                        None,
3361                                        Some(&next_stmt_id.to_string()),
3362                                    );
3363                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3364                                    && node.metadata().location_id.is_top_level()
3365                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3366                                    && graph_builders.singleton_intermediates()
3367                                    && !node.metadata().collection_kind.is_bounded()
3368                                {
3369                                    let builder = graph_builders.get_dfir_mut(&out_location);
3370
3371                                    let acc: syn::Expr = parse_quote!({
3372                                        let mut __init = #init;
3373                                        let mut __inner = #acc;
3374                                        move |__state, __kv: (_, _)| {
3375                                            // TODO(shadaj): we can avoid the clone when the entry exists
3376                                            let __state = __state
3377                                                .entry(::std::clone::Clone::clone(&__kv.0))
3378                                                .or_insert_with(|| (__init)());
3379                                            __inner(__state, __kv.1);
3380                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3381                                        }
3382                                    });
3383
3384                                    builder.add_dfir(
3385                                        parse_quote! {
3386                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3387                                        },
3388                                        None,
3389                                        Some(&next_stmt_id.to_string()),
3390                                    );
3391                                } else {
3392                                    let builder = graph_builders.get_dfir_mut(&out_location);
3393                                    builder.add_dfir(
3394                                        parse_quote! {
3395                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3396                                        },
3397                                        None,
3398                                        Some(&next_stmt_id.to_string()),
3399                                    );
3400                                }
3401                            }
3402                            BuildersOrCallback::Callback(_, node_callback) => {
3403                                node_callback(node, next_stmt_id);
3404                            }
3405                        }
3406
3407                        *next_stmt_id += 1;
3408
3409                        ident_stack.push(fold_ident);
3410                    }
3411
3412                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3413                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3414                            if input.metadata().location_id.is_top_level()
3415                                && input.metadata().collection_kind.is_bounded()
3416                            {
3417                                parse_quote!(reduce_no_replay)
3418                            } else {
3419                                parse_quote!(reduce)
3420                            }
3421                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
3422                            if input.metadata().location_id.is_top_level()
3423                                && input.metadata().collection_kind.is_bounded()
3424                            {
3425                                todo!(
3426                                    "Calling keyed reduce on a top-level bounded collection is not supported"
3427                                )
3428                            } else {
3429                                parse_quote!(reduce_keyed)
3430                            }
3431                        } else {
3432                            unreachable!()
3433                        };
3434
3435                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3436                        else {
3437                            unreachable!()
3438                        };
3439
3440                        let lifetime = if input.metadata().location_id.is_top_level() {
3441                            quote!('static)
3442                        } else {
3443                            quote!('tick)
3444                        };
3445
3446                        let input_ident = ident_stack.pop().unwrap();
3447
3448                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3449                        else {
3450                            unreachable!()
3451                        };
3452
3453                        let reduce_ident =
3454                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3455
3456                        match builders_or_callback {
3457                            BuildersOrCallback::Builders(graph_builders) => {
3458                                if matches!(node, HydroNode::Reduce { .. })
3459                                    && node.metadata().location_id.is_top_level()
3460                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3461                                    && graph_builders.singleton_intermediates()
3462                                    && !node.metadata().collection_kind.is_bounded()
3463                                {
3464                                    todo!(
3465                                        "Reduce with optional intermediates is not yet supported in simulator"
3466                                    );
3467                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
3468                                    && node.metadata().location_id.is_top_level()
3469                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3470                                    && graph_builders.singleton_intermediates()
3471                                    && !node.metadata().collection_kind.is_bounded()
3472                                {
3473                                    todo!(
3474                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
3475                                    );
3476                                } else {
3477                                    let builder = graph_builders.get_dfir_mut(&out_location);
3478                                    builder.add_dfir(
3479                                        parse_quote! {
3480                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3481                                        },
3482                                        None,
3483                                        Some(&next_stmt_id.to_string()),
3484                                    );
3485                                }
3486                            }
3487                            BuildersOrCallback::Callback(_, node_callback) => {
3488                                node_callback(node, next_stmt_id);
3489                            }
3490                        }
3491
3492                        *next_stmt_id += 1;
3493
3494                        ident_stack.push(reduce_ident);
3495                    }
3496
3497                    HydroNode::ReduceKeyedWatermark {
3498                        f,
3499                        input,
3500                        metadata,
3501                        ..
3502                    } => {
3503                        let lifetime = if input.metadata().location_id.is_top_level() {
3504                            quote!('static)
3505                        } else {
3506                            quote!('tick)
3507                        };
3508
3509                        // watermark is processed second, so it's on top
3510                        let watermark_ident = ident_stack.pop().unwrap();
3511                        let input_ident = ident_stack.pop().unwrap();
3512
3513                        let chain_ident = syn::Ident::new(
3514                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3515                            Span::call_site(),
3516                        );
3517
3518                        let fold_ident =
3519                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3520
3521                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3522                            && input.metadata().collection_kind.is_bounded()
3523                        {
3524                            parse_quote!(fold_no_replay)
3525                        } else {
3526                            parse_quote!(fold)
3527                        };
3528
3529                        match builders_or_callback {
3530                            BuildersOrCallback::Builders(graph_builders) => {
3531                                if metadata.location_id.is_top_level()
3532                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3533                                    && graph_builders.singleton_intermediates()
3534                                    && !metadata.collection_kind.is_bounded()
3535                                {
3536                                    todo!(
3537                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3538                                    )
3539                                } else {
3540                                    let builder = graph_builders.get_dfir_mut(&out_location);
3541                                    builder.add_dfir(
3542                                        parse_quote! {
3543                                            #chain_ident = chain();
3544                                            #input_ident
3545                                                -> map(|x| (Some(x), None))
3546                                                -> [0]#chain_ident;
3547                                            #watermark_ident
3548                                                -> map(|watermark| (None, Some(watermark)))
3549                                                -> [1]#chain_ident;
3550
3551                                            #fold_ident = #chain_ident
3552                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3553                                                    let __reduce_keyed_fn = #f;
3554                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3555                                                        if let Some((k, v)) = opt_payload {
3556                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3557                                                                if k < curr_watermark {
3558                                                                    return;
3559                                                                }
3560                                                            }
3561                                                            match map.entry(k) {
3562                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
3563                                                                    e.insert(v);
3564                                                                }
3565                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
3566                                                                    __reduce_keyed_fn(e.get_mut(), v);
3567                                                                }
3568                                                            }
3569                                                        } else {
3570                                                            let watermark = opt_watermark.unwrap();
3571                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3572                                                                if watermark <= curr_watermark {
3573                                                                    return;
3574                                                                }
3575                                                            }
3576                                                            *opt_curr_watermark = opt_watermark;
3577                                                            map.retain(|k, _| *k >= watermark);
3578                                                        }
3579                                                    }
3580                                                })
3581                                                -> flat_map(|(map, _curr_watermark)| map);
3582                                        },
3583                                        None,
3584                                        Some(&next_stmt_id.to_string()),
3585                                    );
3586                                }
3587                            }
3588                            BuildersOrCallback::Callback(_, node_callback) => {
3589                                node_callback(node, next_stmt_id);
3590                            }
3591                        }
3592
3593                        *next_stmt_id += 1;
3594
3595                        ident_stack.push(fold_ident);
3596                    }
3597
3598                    HydroNode::Network {
3599                        networking_info,
3600                        serialize_fn: serialize_pipeline,
3601                        instantiate_fn,
3602                        deserialize_fn: deserialize_pipeline,
3603                        input,
3604                        ..
3605                    } => {
3606                        let input_ident = ident_stack.pop().unwrap();
3607
3608                        let receiver_stream_ident =
3609                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3610
3611                        match builders_or_callback {
3612                            BuildersOrCallback::Builders(graph_builders) => {
3613                                let (sink_expr, source_expr) = match instantiate_fn {
3614                                    DebugInstantiate::Building => (
3615                                        syn::parse_quote!(DUMMY_SINK),
3616                                        syn::parse_quote!(DUMMY_SOURCE),
3617                                    ),
3618
3619                                    DebugInstantiate::Finalized(finalized) => {
3620                                        (finalized.sink.clone(), finalized.source.clone())
3621                                    }
3622                                };
3623
3624                                graph_builders.create_network(
3625                                    &input.metadata().location_id,
3626                                    &out_location,
3627                                    input_ident,
3628                                    &receiver_stream_ident,
3629                                    serialize_pipeline.as_ref(),
3630                                    sink_expr,
3631                                    source_expr,
3632                                    deserialize_pipeline.as_ref(),
3633                                    *next_stmt_id,
3634                                    networking_info,
3635                                );
3636                            }
3637                            BuildersOrCallback::Callback(_, node_callback) => {
3638                                node_callback(node, next_stmt_id);
3639                            }
3640                        }
3641
3642                        *next_stmt_id += 1;
3643
3644                        ident_stack.push(receiver_stream_ident);
3645                    }
3646
3647                    HydroNode::ExternalInput {
3648                        instantiate_fn,
3649                        deserialize_fn: deserialize_pipeline,
3650                        ..
3651                    } => {
3652                        let receiver_stream_ident =
3653                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3654
3655                        match builders_or_callback {
3656                            BuildersOrCallback::Builders(graph_builders) => {
3657                                let (_, source_expr) = match instantiate_fn {
3658                                    DebugInstantiate::Building => (
3659                                        syn::parse_quote!(DUMMY_SINK),
3660                                        syn::parse_quote!(DUMMY_SOURCE),
3661                                    ),
3662
3663                                    DebugInstantiate::Finalized(finalized) => {
3664                                        (finalized.sink.clone(), finalized.source.clone())
3665                                    }
3666                                };
3667
3668                                graph_builders.create_external_source(
3669                                    &out_location,
3670                                    source_expr,
3671                                    &receiver_stream_ident,
3672                                    deserialize_pipeline.as_ref(),
3673                                    *next_stmt_id,
3674                                );
3675                            }
3676                            BuildersOrCallback::Callback(_, node_callback) => {
3677                                node_callback(node, next_stmt_id);
3678                            }
3679                        }
3680
3681                        *next_stmt_id += 1;
3682
3683                        ident_stack.push(receiver_stream_ident);
3684                    }
3685
3686                    HydroNode::Counter {
3687                        tag,
3688                        duration,
3689                        prefix,
3690                        ..
3691                    } => {
3692                        let input_ident = ident_stack.pop().unwrap();
3693
3694                        let counter_ident =
3695                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3696
3697                        match builders_or_callback {
3698                            BuildersOrCallback::Builders(graph_builders) => {
3699                                let arg = format!("{}({})", prefix, tag);
3700                                let builder = graph_builders.get_dfir_mut(&out_location);
3701                                builder.add_dfir(
3702                                    parse_quote! {
3703                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
3704                                    },
3705                                    None,
3706                                    Some(&next_stmt_id.to_string()),
3707                                );
3708                            }
3709                            BuildersOrCallback::Callback(_, node_callback) => {
3710                                node_callback(node, next_stmt_id);
3711                            }
3712                        }
3713
3714                        *next_stmt_id += 1;
3715
3716                        ident_stack.push(counter_ident);
3717                    }
3718                }
3719            },
3720            seen_tees,
3721            false,
3722        );
3723
3724        ident_stack
3725            .pop()
3726            .expect("ident_stack should have exactly one element after traversal")
3727    }
3728
3729    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3730        match self {
3731            HydroNode::Placeholder => {
3732                panic!()
3733            }
3734            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3735            HydroNode::Source { source, .. } => match source {
3736                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3737                HydroSource::ExternalNetwork()
3738                | HydroSource::Spin()
3739                | HydroSource::ClusterMembers(_, _)
3740                | HydroSource::Embedded(_) => {} // TODO: what goes here?
3741            },
3742            HydroNode::SingletonSource { value, .. } => {
3743                transform(value);
3744            }
3745            HydroNode::CycleSource { .. }
3746            | HydroNode::Tee { .. }
3747            | HydroNode::YieldConcat { .. }
3748            | HydroNode::BeginAtomic { .. }
3749            | HydroNode::EndAtomic { .. }
3750            | HydroNode::Batch { .. }
3751            | HydroNode::Chain { .. }
3752            | HydroNode::ChainFirst { .. }
3753            | HydroNode::CrossProduct { .. }
3754            | HydroNode::CrossSingleton { .. }
3755            | HydroNode::ResolveFutures { .. }
3756            | HydroNode::ResolveFuturesOrdered { .. }
3757            | HydroNode::Join { .. }
3758            | HydroNode::Difference { .. }
3759            | HydroNode::AntiJoin { .. }
3760            | HydroNode::DeferTick { .. }
3761            | HydroNode::Enumerate { .. }
3762            | HydroNode::Unique { .. }
3763            | HydroNode::Sort { .. } => {}
3764            HydroNode::Map { f, .. }
3765            | HydroNode::FlatMap { f, .. }
3766            | HydroNode::Filter { f, .. }
3767            | HydroNode::FilterMap { f, .. }
3768            | HydroNode::Inspect { f, .. }
3769            | HydroNode::Partition { f, .. }
3770            | HydroNode::Reduce { f, .. }
3771            | HydroNode::ReduceKeyed { f, .. }
3772            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3773                transform(f);
3774            }
3775            HydroNode::Fold { init, acc, .. }
3776            | HydroNode::Scan { init, acc, .. }
3777            | HydroNode::FoldKeyed { init, acc, .. } => {
3778                transform(init);
3779                transform(acc);
3780            }
3781            HydroNode::Network {
3782                serialize_fn,
3783                deserialize_fn,
3784                ..
3785            } => {
3786                if let Some(serialize_fn) = serialize_fn {
3787                    transform(serialize_fn);
3788                }
3789                if let Some(deserialize_fn) = deserialize_fn {
3790                    transform(deserialize_fn);
3791                }
3792            }
3793            HydroNode::ExternalInput { deserialize_fn, .. } => {
3794                if let Some(deserialize_fn) = deserialize_fn {
3795                    transform(deserialize_fn);
3796                }
3797            }
3798            HydroNode::Counter { duration, .. } => {
3799                transform(duration);
3800            }
3801        }
3802    }
3803
3804    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3805        &self.metadata().op
3806    }
3807
3808    pub fn metadata(&self) -> &HydroIrMetadata {
3809        match self {
3810            HydroNode::Placeholder => {
3811                panic!()
3812            }
3813            HydroNode::Cast { metadata, .. } => metadata,
3814            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3815            HydroNode::Source { metadata, .. } => metadata,
3816            HydroNode::SingletonSource { metadata, .. } => metadata,
3817            HydroNode::CycleSource { metadata, .. } => metadata,
3818            HydroNode::Tee { metadata, .. } => metadata,
3819            HydroNode::Partition { metadata, .. } => metadata,
3820            HydroNode::YieldConcat { metadata, .. } => metadata,
3821            HydroNode::BeginAtomic { metadata, .. } => metadata,
3822            HydroNode::EndAtomic { metadata, .. } => metadata,
3823            HydroNode::Batch { metadata, .. } => metadata,
3824            HydroNode::Chain { metadata, .. } => metadata,
3825            HydroNode::ChainFirst { metadata, .. } => metadata,
3826            HydroNode::CrossProduct { metadata, .. } => metadata,
3827            HydroNode::CrossSingleton { metadata, .. } => metadata,
3828            HydroNode::Join { metadata, .. } => metadata,
3829            HydroNode::Difference { metadata, .. } => metadata,
3830            HydroNode::AntiJoin { metadata, .. } => metadata,
3831            HydroNode::ResolveFutures { metadata, .. } => metadata,
3832            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3833            HydroNode::Map { metadata, .. } => metadata,
3834            HydroNode::FlatMap { metadata, .. } => metadata,
3835            HydroNode::Filter { metadata, .. } => metadata,
3836            HydroNode::FilterMap { metadata, .. } => metadata,
3837            HydroNode::DeferTick { metadata, .. } => metadata,
3838            HydroNode::Enumerate { metadata, .. } => metadata,
3839            HydroNode::Inspect { metadata, .. } => metadata,
3840            HydroNode::Unique { metadata, .. } => metadata,
3841            HydroNode::Sort { metadata, .. } => metadata,
3842            HydroNode::Scan { metadata, .. } => metadata,
3843            HydroNode::Fold { metadata, .. } => metadata,
3844            HydroNode::FoldKeyed { metadata, .. } => metadata,
3845            HydroNode::Reduce { metadata, .. } => metadata,
3846            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3847            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3848            HydroNode::ExternalInput { metadata, .. } => metadata,
3849            HydroNode::Network { metadata, .. } => metadata,
3850            HydroNode::Counter { metadata, .. } => metadata,
3851        }
3852    }
3853
3854    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3855        &mut self.metadata_mut().op
3856    }
3857
3858    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3859        match self {
3860            HydroNode::Placeholder => {
3861                panic!()
3862            }
3863            HydroNode::Cast { metadata, .. } => metadata,
3864            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3865            HydroNode::Source { metadata, .. } => metadata,
3866            HydroNode::SingletonSource { metadata, .. } => metadata,
3867            HydroNode::CycleSource { metadata, .. } => metadata,
3868            HydroNode::Tee { metadata, .. } => metadata,
3869            HydroNode::Partition { metadata, .. } => metadata,
3870            HydroNode::YieldConcat { metadata, .. } => metadata,
3871            HydroNode::BeginAtomic { metadata, .. } => metadata,
3872            HydroNode::EndAtomic { metadata, .. } => metadata,
3873            HydroNode::Batch { metadata, .. } => metadata,
3874            HydroNode::Chain { metadata, .. } => metadata,
3875            HydroNode::ChainFirst { metadata, .. } => metadata,
3876            HydroNode::CrossProduct { metadata, .. } => metadata,
3877            HydroNode::CrossSingleton { metadata, .. } => metadata,
3878            HydroNode::Join { metadata, .. } => metadata,
3879            HydroNode::Difference { metadata, .. } => metadata,
3880            HydroNode::AntiJoin { metadata, .. } => metadata,
3881            HydroNode::ResolveFutures { metadata, .. } => metadata,
3882            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3883            HydroNode::Map { metadata, .. } => metadata,
3884            HydroNode::FlatMap { metadata, .. } => metadata,
3885            HydroNode::Filter { metadata, .. } => metadata,
3886            HydroNode::FilterMap { metadata, .. } => metadata,
3887            HydroNode::DeferTick { metadata, .. } => metadata,
3888            HydroNode::Enumerate { metadata, .. } => metadata,
3889            HydroNode::Inspect { metadata, .. } => metadata,
3890            HydroNode::Unique { metadata, .. } => metadata,
3891            HydroNode::Sort { metadata, .. } => metadata,
3892            HydroNode::Scan { metadata, .. } => metadata,
3893            HydroNode::Fold { metadata, .. } => metadata,
3894            HydroNode::FoldKeyed { metadata, .. } => metadata,
3895            HydroNode::Reduce { metadata, .. } => metadata,
3896            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3897            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3898            HydroNode::ExternalInput { metadata, .. } => metadata,
3899            HydroNode::Network { metadata, .. } => metadata,
3900            HydroNode::Counter { metadata, .. } => metadata,
3901        }
3902    }
3903
3904    pub fn input(&self) -> Vec<&HydroNode> {
3905        match self {
3906            HydroNode::Placeholder => {
3907                panic!()
3908            }
3909            HydroNode::Source { .. }
3910            | HydroNode::SingletonSource { .. }
3911            | HydroNode::ExternalInput { .. }
3912            | HydroNode::CycleSource { .. }
3913            | HydroNode::Tee { .. }
3914            | HydroNode::Partition { .. } => {
3915                // Tee/Partition should find their input in separate special ways
3916                vec![]
3917            }
3918            HydroNode::Cast { inner, .. }
3919            | HydroNode::ObserveNonDet { inner, .. }
3920            | HydroNode::YieldConcat { inner, .. }
3921            | HydroNode::BeginAtomic { inner, .. }
3922            | HydroNode::EndAtomic { inner, .. }
3923            | HydroNode::Batch { inner, .. } => {
3924                vec![inner]
3925            }
3926            HydroNode::Chain { first, second, .. } => {
3927                vec![first, second]
3928            }
3929            HydroNode::ChainFirst { first, second, .. } => {
3930                vec![first, second]
3931            }
3932            HydroNode::CrossProduct { left, right, .. }
3933            | HydroNode::CrossSingleton { left, right, .. }
3934            | HydroNode::Join { left, right, .. } => {
3935                vec![left, right]
3936            }
3937            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3938                vec![pos, neg]
3939            }
3940            HydroNode::Map { input, .. }
3941            | HydroNode::FlatMap { input, .. }
3942            | HydroNode::Filter { input, .. }
3943            | HydroNode::FilterMap { input, .. }
3944            | HydroNode::Sort { input, .. }
3945            | HydroNode::DeferTick { input, .. }
3946            | HydroNode::Enumerate { input, .. }
3947            | HydroNode::Inspect { input, .. }
3948            | HydroNode::Unique { input, .. }
3949            | HydroNode::Network { input, .. }
3950            | HydroNode::Counter { input, .. }
3951            | HydroNode::ResolveFutures { input, .. }
3952            | HydroNode::ResolveFuturesOrdered { input, .. }
3953            | HydroNode::Fold { input, .. }
3954            | HydroNode::FoldKeyed { input, .. }
3955            | HydroNode::Reduce { input, .. }
3956            | HydroNode::ReduceKeyed { input, .. }
3957            | HydroNode::Scan { input, .. } => {
3958                vec![input]
3959            }
3960            HydroNode::ReduceKeyedWatermark {
3961                input, watermark, ..
3962            } => {
3963                vec![input, watermark]
3964            }
3965        }
3966    }
3967
3968    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3969        self.input()
3970            .iter()
3971            .map(|input_node| input_node.metadata())
3972            .collect()
3973    }
3974
3975    pub fn print_root(&self) -> String {
3976        match self {
3977            HydroNode::Placeholder => {
3978                panic!()
3979            }
3980            HydroNode::Cast { .. } => "Cast()".to_owned(),
3981            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3982            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3983            HydroNode::SingletonSource {
3984                value,
3985                first_tick_only,
3986                ..
3987            } => format!(
3988                "SingletonSource({:?}, first_tick_only={})",
3989                value, first_tick_only
3990            ),
3991            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
3992            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3993            HydroNode::Partition { f, is_true, .. } => {
3994                format!("Partition({:?}, is_true={})", f, is_true)
3995            }
3996            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3997            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3998            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3999            HydroNode::Batch { .. } => "Batch()".to_owned(),
4000            HydroNode::Chain { first, second, .. } => {
4001                format!("Chain({}, {})", first.print_root(), second.print_root())
4002            }
4003            HydroNode::ChainFirst { first, second, .. } => {
4004                format!(
4005                    "ChainFirst({}, {})",
4006                    first.print_root(),
4007                    second.print_root()
4008                )
4009            }
4010            HydroNode::CrossProduct { left, right, .. } => {
4011                format!(
4012                    "CrossProduct({}, {})",
4013                    left.print_root(),
4014                    right.print_root()
4015                )
4016            }
4017            HydroNode::CrossSingleton { left, right, .. } => {
4018                format!(
4019                    "CrossSingleton({}, {})",
4020                    left.print_root(),
4021                    right.print_root()
4022                )
4023            }
4024            HydroNode::Join { left, right, .. } => {
4025                format!("Join({}, {})", left.print_root(), right.print_root())
4026            }
4027            HydroNode::Difference { pos, neg, .. } => {
4028                format!("Difference({}, {})", pos.print_root(), neg.print_root())
4029            }
4030            HydroNode::AntiJoin { pos, neg, .. } => {
4031                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4032            }
4033            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4034            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4035            HydroNode::Map { f, .. } => format!("Map({:?})", f),
4036            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4037            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4038            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4039            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4040            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4041            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4042            HydroNode::Unique { .. } => "Unique()".to_owned(),
4043            HydroNode::Sort { .. } => "Sort()".to_owned(),
4044            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4045            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4046            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4047            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4048            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4049            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4050            HydroNode::Network { .. } => "Network()".to_owned(),
4051            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4052            HydroNode::Counter { tag, duration, .. } => {
4053                format!("Counter({:?}, {:?})", tag, duration)
4054            }
4055        }
4056    }
4057}
4058
4059#[cfg(feature = "build")]
4060fn instantiate_network<'a, D>(
4061    env: &mut D::InstantiateEnv,
4062    from_location: &LocationId,
4063    to_location: &LocationId,
4064    processes: &SparseSecondaryMap<LocationKey, D::Process>,
4065    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4066    name: Option<&str>,
4067    networking_info: &crate::networking::NetworkingInfo,
4068) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4069where
4070    D: Deploy<'a>,
4071{
4072    let ((sink, source), connect_fn) = match (from_location, to_location) {
4073        (&LocationId::Process(from), &LocationId::Process(to)) => {
4074            let from_node = processes
4075                .get(from)
4076                .unwrap_or_else(|| {
4077                    panic!("A process used in the graph was not instantiated: {}", from)
4078                })
4079                .clone();
4080            let to_node = processes
4081                .get(to)
4082                .unwrap_or_else(|| {
4083                    panic!("A process used in the graph was not instantiated: {}", to)
4084                })
4085                .clone();
4086
4087            let sink_port = from_node.next_port();
4088            let source_port = to_node.next_port();
4089
4090            (
4091                D::o2o_sink_source(
4092                    env,
4093                    &from_node,
4094                    &sink_port,
4095                    &to_node,
4096                    &source_port,
4097                    name,
4098                    networking_info,
4099                ),
4100                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4101            )
4102        }
4103        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4104            let from_node = processes
4105                .get(from)
4106                .unwrap_or_else(|| {
4107                    panic!("A process used in the graph was not instantiated: {}", from)
4108                })
4109                .clone();
4110            let to_node = clusters
4111                .get(to)
4112                .unwrap_or_else(|| {
4113                    panic!("A cluster used in the graph was not instantiated: {}", to)
4114                })
4115                .clone();
4116
4117            let sink_port = from_node.next_port();
4118            let source_port = to_node.next_port();
4119
4120            (
4121                D::o2m_sink_source(
4122                    env,
4123                    &from_node,
4124                    &sink_port,
4125                    &to_node,
4126                    &source_port,
4127                    name,
4128                    networking_info,
4129                ),
4130                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4131            )
4132        }
4133        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4134            let from_node = clusters
4135                .get(from)
4136                .unwrap_or_else(|| {
4137                    panic!("A cluster used in the graph was not instantiated: {}", from)
4138                })
4139                .clone();
4140            let to_node = processes
4141                .get(to)
4142                .unwrap_or_else(|| {
4143                    panic!("A process used in the graph was not instantiated: {}", to)
4144                })
4145                .clone();
4146
4147            let sink_port = from_node.next_port();
4148            let source_port = to_node.next_port();
4149
4150            (
4151                D::m2o_sink_source(
4152                    env,
4153                    &from_node,
4154                    &sink_port,
4155                    &to_node,
4156                    &source_port,
4157                    name,
4158                    networking_info,
4159                ),
4160                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4161            )
4162        }
4163        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4164            let from_node = clusters
4165                .get(from)
4166                .unwrap_or_else(|| {
4167                    panic!("A cluster used in the graph was not instantiated: {}", from)
4168                })
4169                .clone();
4170            let to_node = clusters
4171                .get(to)
4172                .unwrap_or_else(|| {
4173                    panic!("A cluster used in the graph was not instantiated: {}", to)
4174                })
4175                .clone();
4176
4177            let sink_port = from_node.next_port();
4178            let source_port = to_node.next_port();
4179
4180            (
4181                D::m2m_sink_source(
4182                    env,
4183                    &from_node,
4184                    &sink_port,
4185                    &to_node,
4186                    &source_port,
4187                    name,
4188                    networking_info,
4189                ),
4190                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4191            )
4192        }
4193        (LocationId::Tick(_, _), _) => panic!(),
4194        (_, LocationId::Tick(_, _)) => panic!(),
4195        (LocationId::Atomic(_), _) => panic!(),
4196        (_, LocationId::Atomic(_)) => panic!(),
4197    };
4198    (sink, source, connect_fn)
4199}
4200
4201#[cfg(test)]
4202mod test {
4203    use std::mem::size_of;
4204
4205    use stageleft::{QuotedWithContext, q};
4206
4207    use super::*;
4208
4209    #[test]
4210    #[cfg_attr(
4211        not(feature = "build"),
4212        ignore = "expects inclusion of feature-gated fields"
4213    )]
4214    fn hydro_node_size() {
4215        assert_eq!(size_of::<HydroNode>(), 248);
4216    }
4217
4218    #[test]
4219    #[cfg_attr(
4220        not(feature = "build"),
4221        ignore = "expects inclusion of feature-gated fields"
4222    )]
4223    fn hydro_root_size() {
4224        assert_eq!(size_of::<HydroRoot>(), 136);
4225    }
4226
4227    #[test]
4228    fn test_simplify_q_macro_basic() {
4229        // Test basic non-q! expression
4230        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4231        let result = simplify_q_macro(simple_expr.clone());
4232        assert_eq!(result, simple_expr);
4233    }
4234
4235    #[test]
4236    fn test_simplify_q_macro_actual_stageleft_call() {
4237        // Test a simplified version of what a real stageleft call might look like
4238        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4239        let result = simplify_q_macro(stageleft_call);
4240        // This should be processed by our visitor and simplified to q!(...)
4241        // since we detect the stageleft::runtime_support::fn_* pattern
4242        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4243    }
4244
4245    #[test]
4246    fn test_closure_no_pipe_at_start() {
4247        // Test a closure that does not start with a pipe
4248        let stageleft_call = q!({
4249            let foo = 123;
4250            move |b: usize| b + foo
4251        })
4252        .splice_fn1_ctx(&());
4253        let result = simplify_q_macro(stageleft_call);
4254        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4255    }
4256}