Skip to content

Commit d0bae15

Browse files
Merge main into 0.1 release branch
Signed-off-by: Luca Della Vedova <[email protected]>
1 parent 5bd485a commit d0bae15

20 files changed

+1794
-112
lines changed

.cargo/config.toml

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[build]
2+
rustdocflags = ["-D", "warnings"]

.github/workflows/ci_linux.yaml

+10
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,18 @@ env:
1616

1717
jobs:
1818
build:
19+
strategy:
20+
matrix:
21+
rust-version: [stable, 1.75]
1922

2023
runs-on: ubuntu-latest
2124

2225
steps:
2326
- uses: actions/checkout@v3
2427

28+
- name: Setup rust
29+
run: rustup default ${{ matrix.rust-version }}
30+
2531
- name: Build default features
2632
run: cargo build
2733
- name: Test default features
@@ -31,3 +37,7 @@ jobs:
3137
run: cargo build --features single_threaded_async
3238
- name: Test single_threaded_async
3339
run: cargo test --features single_threaded_async
40+
41+
- name: Build docs
42+
run: cargo doc
43+

Cargo.toml

+1-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ keywords = ["reactive", "workflow", "behavior", "agent", "bevy"]
1111
categories = ["science::robotics", "asynchronous", "concurrency", "game-development"]
1212

1313
[dependencies]
14-
bevy_impulse_derive = { path = "macros", version = "0.0.1" }
14+
bevy_impulse_derive = { path = "macros", version = "0.0.2" }
1515
bevy_ecs = "0.13"
1616
bevy_utils = "0.13"
1717
bevy_hierarchy = "0.13"
@@ -25,7 +25,6 @@ async-task = { version = "4.7.1", optional = true }
2525
# bevy_tasks::Task, so we're leaving it as a mandatory dependency for now.
2626
bevy_tasks = { version = "0.13", features = ["multi-threaded"] }
2727

28-
arrayvec = "0.7"
2928
itertools = "0.13"
3029
smallvec = "1.13"
3130
tokio = { version = "1.39", features = ["sync"]}

README.md

+5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
[![style](https://github.com/open-rmf/bevy_impulse/actions/workflows/style.yaml/badge.svg)](https://github.com/open-rmf/bevy_impulse/actions/workflows/style.yaml)
2+
[![ci_linux](https://github.com/open-rmf/bevy_impulse/actions/workflows/ci_linux.yaml/badge.svg)](https://github.com/open-rmf/bevy_impulse/actions/workflows/ci_linux.yaml)
3+
[![ci_windows](https://github.com/open-rmf/bevy_impulse/actions/workflows/ci_windows.yaml/badge.svg)](https://github.com/open-rmf/bevy_impulse/actions/workflows/ci_windows.yaml)
4+
[![ci_web](https://github.com/open-rmf/bevy_impulse/actions/workflows/ci_web.yaml/badge.svg)](https://github.com/open-rmf/bevy_impulse/actions/workflows/ci_web.yaml)
5+
16
# Reactive Programming for Bevy
27

38
This library provides sophisticated [reactive programming](https://en.wikipedia.org/wiki/Reactive_programming) for the [bevy](https://bevyengine.org/) ECS. In addition to supporting one-shot chains of async operations, it can support reusable workflows with parallel branches, synchronization, races, and cycles. These workflows can be hierarchical, so a workflow can be used as a building block by other workflows.

macros/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "bevy_impulse_derive"
3-
version = "0.0.1"
3+
version = "0.0.2"
44
edition = "2021"
55
authors = ["Grey <[email protected]>"]
66
license = "Apache-2.0"

macros/src/lib.rs

+27-2
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,37 @@ pub fn simple_stream_macro(item: TokenStream) -> TokenStream {
2424
let ast: DeriveInput = syn::parse(item).unwrap();
2525
let struct_name = &ast.ident;
2626
let (impl_generics, type_generics, where_clause) = &ast.generics.split_for_impl();
27-
// let bevy_impulse_path: Path =
2827

2928
quote! {
30-
impl #impl_generics Stream for #struct_name #type_generics #where_clause {
29+
impl #impl_generics ::bevy_impulse::Stream for #struct_name #type_generics #where_clause {
3130
type Container = ::bevy_impulse::DefaultStreamContainer<Self>;
3231
}
3332
}
3433
.into()
3534
}
35+
36+
#[proc_macro_derive(DeliveryLabel)]
37+
pub fn delivery_label_macro(item: TokenStream) -> TokenStream {
38+
let ast: DeriveInput = syn::parse(item).unwrap();
39+
let struct_name = &ast.ident;
40+
let (impl_generics, type_generics, where_clause) = &ast.generics.split_for_impl();
41+
42+
quote! {
43+
impl #impl_generics ::bevy_impulse::DeliveryLabel for #struct_name #type_generics #where_clause {
44+
fn dyn_clone(&self) -> Box<dyn DeliveryLabel> {
45+
::std::boxed::Box::new(::std::clone::Clone::clone(self))
46+
}
47+
48+
fn as_dyn_eq(&self) -> &dyn ::bevy_impulse::utils::DynEq {
49+
self
50+
}
51+
52+
fn dyn_hash(&self, mut state: &mut dyn ::std::hash::Hasher) {
53+
let ty_id = ::std::any::TypeId::of::<Self>();
54+
::std::hash::Hash::hash(&ty_id, &mut state);
55+
::std::hash::Hash::hash(self, &mut state);
56+
}
57+
}
58+
}
59+
.into()
60+
}

src/buffer/bufferable.rs

+25-8
Original file line numberDiff line numberDiff line change
@@ -207,28 +207,28 @@ impl<T: Bufferable, const N: usize> Bufferable for [T; N] {
207207
}
208208

209209
pub trait IterBufferable {
210-
type BufferType: Buffered;
210+
type BufferElement: Buffered;
211211

212212
/// Convert an iterable collection of bufferable workflow elements into
213213
/// buffers if they are not buffers already.
214214
fn into_buffer_vec<const N: usize>(
215215
self,
216216
builder: &mut Builder,
217-
) -> SmallVec<[Self::BufferType; N]>;
217+
) -> SmallVec<[Self::BufferElement; N]>;
218218

219219
/// Join an iterable collection of bufferable workflow elements.
220220
///
221221
/// Performance is best if you can choose an `N` which is equal to the
222222
/// number of buffers inside the iterable, but this will work even if `N`
223223
/// does not match the number.
224-
fn join_vec<const N: usize>(
224+
fn join_vec<'w, 's, 'a, 'b, const N: usize>(
225225
self,
226-
builder: &mut Builder,
227-
) -> Output<SmallVec<[<Self::BufferType as Buffered>::Item; N]>>
226+
builder: &'b mut Builder<'w, 's, 'a>,
227+
) -> Chain<'w, 's, 'a, 'b, SmallVec<[<Self::BufferElement as Buffered>::Item; N]>>
228228
where
229229
Self: Sized,
230-
Self::BufferType: 'static + Send + Sync,
231-
<Self::BufferType as Buffered>::Item: 'static + Send + Sync,
230+
Self::BufferElement: 'static + Send + Sync,
231+
<Self::BufferElement as Buffered>::Item: 'static + Send + Sync,
232232
{
233233
let buffers = self.into_buffer_vec::<N>(builder);
234234
let join = builder.commands.spawn(()).id();
@@ -239,6 +239,23 @@ pub trait IterBufferable {
239239
Join::new(buffers, target),
240240
));
241241

242-
Output::new(builder.scope, target)
242+
Output::new(builder.scope, target).chain(builder)
243+
}
244+
}
245+
246+
impl<T> IterBufferable for T
247+
where
248+
T: IntoIterator,
249+
T::Item: Bufferable,
250+
{
251+
type BufferElement = <T::Item as Bufferable>::BufferType;
252+
253+
fn into_buffer_vec<const N: usize>(
254+
self,
255+
builder: &mut Builder,
256+
) -> SmallVec<[Self::BufferElement; N]> {
257+
SmallVec::<[Self::BufferElement; N]>::from_iter(
258+
self.into_iter().map(|e| e.into_buffer(builder)),
259+
)
243260
}
244261
}

src/builder.rs

+29-4
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ use crate::{
2626
AddOperation, AsMap, BeginCleanupWorkflow, Buffer, BufferItem, BufferKeys, BufferSettings,
2727
Bufferable, Buffered, Chain, Collect, ForkClone, ForkCloneOutput, ForkTargetStorage, Gate,
2828
GateRequest, Injection, InputSlot, IntoAsyncMap, IntoBlockingMap, Node, OperateBuffer,
29-
OperateBufferAccess, OperateDynamicGate, OperateScope, OperateStaticGate, Output, Provider,
30-
RequestOfMap, ResponseOfMap, Scope, ScopeEndpoints, ScopeSettings, ScopeSettingsStorage,
31-
Sendish, Service, StreamPack, StreamTargetMap, StreamsOfMap, Trim, TrimBranch, UnusedTarget,
29+
OperateBufferAccess, OperateDynamicGate, OperateScope, OperateSplit, OperateStaticGate, Output,
30+
Provider, RequestOfMap, ResponseOfMap, Scope, ScopeEndpoints, ScopeSettings,
31+
ScopeSettingsStorage, Sendish, Service, SplitOutputs, Splittable, StreamPack, StreamTargetMap,
32+
StreamsOfMap, Trim, TrimBranch, UnusedTarget,
3233
};
3334

3435
pub(crate) mod connect;
@@ -339,6 +340,26 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
339340
self.create_collect(n, Some(n))
340341
}
341342

343+
/// Create a new split operation in the workflow. The [`InputSlot`] can take
344+
/// in values that you want to split, and [`SplitOutputs::build`] will let
345+
/// you build connections to the split value.
346+
pub fn create_split<T>(&mut self) -> (InputSlot<T>, SplitOutputs<T>)
347+
where
348+
T: 'static + Send + Sync + Splittable,
349+
{
350+
let source = self.commands.spawn(()).id();
351+
self.commands.add(AddOperation::new(
352+
Some(self.scope),
353+
source,
354+
OperateSplit::<T>::default(),
355+
));
356+
357+
(
358+
InputSlot::new(self.scope, source),
359+
SplitOutputs::new(self.scope, source),
360+
)
361+
}
362+
342363
/// This method allows you to define a cleanup workflow that branches off of
343364
/// this scope that will activate during the scope's cleanup phase. The
344365
/// input to the cleanup workflow will be a key to access to one or more
@@ -938,11 +959,15 @@ mod tests {
938959
let mut promise = context.command(|commands| commands.request(5, workflow).take_response());
939960

940961
context.run_with_conditions(&mut promise, Duration::from_secs(2));
962+
assert!(
963+
context.no_unhandled_errors(),
964+
"{:#?}",
965+
context.get_unhandled_errors(),
966+
);
941967
assert!(promise.peek().is_cancelled());
942968
let channel_output = receiver.try_recv().unwrap();
943969
assert_eq!(channel_output, 5);
944970
assert!(receiver.try_recv().is_err());
945-
assert!(context.no_unhandled_errors());
946971
assert!(context.confirm_buffers_empty().is_ok());
947972

948973
let (cancel_sender, mut cancel_receiver) = unbounded_channel();

0 commit comments

Comments
 (0)