1use std::fmt::{Debug, Formatter};
13use std::marker::PhantomData;
14
15use proc_macro2::Span;
16use quote::quote;
17use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
18use stageleft::{QuotedWithContextWithProps, quote_type};
19
20use super::dynamic::LocationId;
21use super::{Location, MemberId};
22use crate::compile::builder::FlowState;
23use crate::location::LocationKey;
24use crate::location::member_id::TaglessMemberId;
25use crate::staging_util::{Invariant, get_this_crate};
26
27pub struct Cluster<'a, ClusterTag> {
37 pub(crate) key: LocationKey,
38 pub(crate) flow_state: FlowState,
39 pub(crate) _phantom: Invariant<'a, ClusterTag>,
40}
41
42impl<C> Debug for Cluster<'_, C> {
43 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44 write!(f, "Cluster({})", self.key)
45 }
46}
47
48impl<C> Eq for Cluster<'_, C> {}
49impl<C> PartialEq for Cluster<'_, C> {
50 fn eq(&self, other: &Self) -> bool {
51 self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
52 }
53}
54
55impl<C> Clone for Cluster<'_, C> {
56 fn clone(&self) -> Self {
57 Cluster {
58 key: self.key,
59 flow_state: self.flow_state.clone(),
60 _phantom: PhantomData,
61 }
62 }
63}
64
65impl<'a, C> super::dynamic::DynLocation for Cluster<'a, C> {
66 fn id(&self) -> LocationId {
67 LocationId::Cluster(self.key)
68 }
69
70 fn flow_state(&self) -> &FlowState {
71 &self.flow_state
72 }
73
74 fn is_top_level() -> bool {
75 true
76 }
77
78 fn multiversioned(&self) -> bool {
79 false }
81}
82
83impl<'a, C> Location<'a> for Cluster<'a, C> {
84 type Root = Cluster<'a, C>;
85
86 fn root(&self) -> Self::Root {
87 self.clone()
88 }
89}
90
91pub struct ClusterIds<'a> {
96 pub key: LocationKey,
98 pub _phantom: PhantomData<&'a ()>,
100}
101
102impl<'a> Clone for ClusterIds<'a> {
103 fn clone(&self) -> Self {
104 Self {
105 key: self.key,
106 _phantom: Default::default(),
107 }
108 }
109}
110
111impl<'a, Ctx> FreeVariableWithContextWithProps<Ctx, ()> for ClusterIds<'a> {
112 type O = &'a [TaglessMemberId];
113
114 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
115 where
116 Self: Sized,
117 {
118 let ident = syn::Ident::new(
119 &format!("__hydro_lang_cluster_ids_{}", self.key),
120 Span::call_site(),
121 );
122
123 (
124 QuoteTokens {
125 prelude: None,
126 expr: Some(quote! { #ident }),
127 },
128 (),
129 )
130 }
131}
132
133impl<'a, Ctx> QuotedWithContextWithProps<'a, &'a [TaglessMemberId], Ctx, ()> for ClusterIds<'a> {}
134
135pub trait IsCluster {
137 type Tag;
139}
140
141impl<C> IsCluster for Cluster<'_, C> {
142 type Tag = C;
143}
144
145pub static CLUSTER_SELF_ID: ClusterSelfId = ClusterSelfId { _private: &() };
148
149#[derive(Clone, Copy)]
154pub struct ClusterSelfId<'a> {
155 _private: &'a (),
156}
157
158impl<'a, L> FreeVariableWithContextWithProps<L, ()> for ClusterSelfId<'a>
159where
160 L: Location<'a>,
161 <L as Location<'a>>::Root: IsCluster,
162{
163 type O = MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>;
164
165 fn to_tokens(self, ctx: &L) -> (QuoteTokens, ())
166 where
167 Self: Sized,
168 {
169 let LocationId::Cluster(cluster_id) = ctx.root().id() else {
170 unreachable!()
171 };
172
173 let ident = syn::Ident::new(
174 &format!("__hydro_lang_cluster_self_id_{}", cluster_id),
175 Span::call_site(),
176 );
177 let root = get_this_crate();
178 let c_type: syn::Type = quote_type::<<<L as Location<'a>>::Root as IsCluster>::Tag>();
179
180 (
181 QuoteTokens {
182 prelude: None,
183 expr: Some(
184 quote! { #root::__staged::location::MemberId::<#c_type>::from_tagless((#ident).clone()) },
185 ),
186 },
187 (),
188 )
189 }
190}
191
192impl<'a, L>
193 QuotedWithContextWithProps<'a, MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>, L, ()>
194 for ClusterSelfId<'a>
195where
196 L: Location<'a>,
197 <L as Location<'a>>::Root: IsCluster,
198{
199}
200
201#[cfg(test)]
202mod tests {
203 #[cfg(feature = "sim")]
204 use stageleft::q;
205
206 #[cfg(feature = "sim")]
207 use super::CLUSTER_SELF_ID;
208 #[cfg(feature = "sim")]
209 use crate::location::{Location, MemberId, MembershipEvent};
210 #[cfg(feature = "sim")]
211 use crate::networking::TCP;
212 #[cfg(feature = "sim")]
213 use crate::nondet::nondet;
214 #[cfg(feature = "sim")]
215 use crate::prelude::FlowBuilder;
216
217 #[cfg(feature = "sim")]
218 #[test]
219 fn sim_cluster_self_id() {
220 let mut flow = FlowBuilder::new();
221 let cluster1 = flow.cluster::<()>();
222 let cluster2 = flow.cluster::<()>();
223
224 let node = flow.process::<()>();
225
226 let out_recv = cluster1
227 .source_iter(q!(vec![CLUSTER_SELF_ID]))
228 .send(&node, TCP.fail_stop().bincode())
229 .values()
230 .interleave(
231 cluster2
232 .source_iter(q!(vec![CLUSTER_SELF_ID]))
233 .send(&node, TCP.fail_stop().bincode())
234 .values(),
235 )
236 .sim_output();
237
238 flow.sim()
239 .with_cluster_size(&cluster1, 3)
240 .with_cluster_size(&cluster2, 4)
241 .exhaustive(async || {
242 out_recv
243 .assert_yields_only_unordered([0, 1, 2, 0, 1, 2, 3].map(MemberId::from_raw_id))
244 .await
245 });
246 }
247
248 #[cfg(feature = "sim")]
249 #[test]
250 fn sim_cluster_with_tick() {
251 use std::collections::HashMap;
252
253 let mut flow = FlowBuilder::new();
254 let cluster = flow.cluster::<()>();
255 let node = flow.process::<()>();
256
257 let out_recv = cluster
258 .source_iter(q!(vec![1, 2, 3]))
259 .batch(&cluster.tick(), nondet!())
260 .count()
261 .all_ticks()
262 .send(&node, TCP.fail_stop().bincode())
263 .entries()
264 .map(q!(|(id, v)| (id, v)))
265 .sim_output();
266
267 let count = flow
268 .sim()
269 .with_cluster_size(&cluster, 2)
270 .exhaustive(async || {
271 let grouped = out_recv.collect_sorted::<Vec<_>>().await.into_iter().fold(
272 HashMap::new(),
273 |mut acc: HashMap<MemberId<()>, usize>, (id, v)| {
274 *acc.entry(id).or_default() += v;
275 acc
276 },
277 );
278
279 assert!(grouped.len() == 2);
280 for (_id, v) in grouped {
281 assert!(v == 3);
282 }
283 });
284
285 assert_eq!(count, 106);
286 }
290
291 #[cfg(feature = "sim")]
292 #[test]
293 fn sim_cluster_membership() {
294 let mut flow = FlowBuilder::new();
295 let cluster = flow.cluster::<()>();
296 let node = flow.process::<()>();
297
298 let out_recv = node
299 .source_cluster_members(&cluster)
300 .entries()
301 .map(q!(|(id, v)| (id, v)))
302 .sim_output();
303
304 flow.sim()
305 .with_cluster_size(&cluster, 2)
306 .exhaustive(async || {
307 out_recv
308 .assert_yields_only_unordered(vec![
309 (MemberId::from_raw_id(0), MembershipEvent::Joined),
310 (MemberId::from_raw_id(1), MembershipEvent::Joined),
311 ])
312 .await;
313 });
314 }
315}