Skip to content

Commit 2b1c6f0

Browse files
authored
Merge pull request #384 from yjhmelody/stream-max_by
add stream::max_by method
2 parents 5ff4ef8 + b57849e commit 2b1c6f0

File tree

2 files changed

+98
-0
lines changed

2 files changed

+98
-0
lines changed

src/stream/stream/max_by.rs

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use std::cmp::Ordering;
2+
use std::pin::Pin;
3+
4+
use pin_project_lite::pin_project;
5+
6+
use crate::future::Future;
7+
use crate::stream::Stream;
8+
use crate::task::{Context, Poll};
9+
10+
pin_project! {
11+
#[doc(hidden)]
12+
#[allow(missing_debug_implementations)]
13+
pub struct MaxByFuture<S, F, T> {
14+
#[pin]
15+
stream: S,
16+
compare: F,
17+
max: Option<T>,
18+
}
19+
}
20+
21+
impl<S, F, T> MaxByFuture<S, F, T> {
22+
pub(super) fn new(stream: S, compare: F) -> Self {
23+
MaxByFuture {
24+
stream,
25+
compare,
26+
max: None,
27+
}
28+
}
29+
}
30+
31+
impl<S, F> Future for MaxByFuture<S, F, S::Item>
32+
where
33+
S: Stream,
34+
F: FnMut(&S::Item, &S::Item) -> Ordering,
35+
{
36+
type Output = Option<S::Item>;
37+
38+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
39+
let this = self.project();
40+
let next = futures_core::ready!(this.stream.poll_next(cx));
41+
42+
match next {
43+
Some(new) => {
44+
cx.waker().wake_by_ref();
45+
match this.max.take() {
46+
None => *this.max = Some(new),
47+
Some(old) => match (this.compare)(&new, &old) {
48+
Ordering::Greater => *this.max = Some(new),
49+
_ => *this.max = Some(old),
50+
},
51+
}
52+
Poll::Pending
53+
}
54+
None => Poll::Ready(this.max.take()),
55+
}
56+
}
57+
}

src/stream/stream/mod.rs

+41
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ mod last;
4040
mod le;
4141
mod lt;
4242
mod map;
43+
mod max_by;
4344
mod min_by;
4445
mod min_by_key;
4546
mod next;
@@ -69,6 +70,7 @@ use gt::GtFuture;
6970
use last::LastFuture;
7071
use le::LeFuture;
7172
use lt::LtFuture;
73+
use max_by::MaxByFuture;
7274
use min_by::MinByFuture;
7375
use min_by_key::MinByKeyFuture;
7476
use next::NextFuture;
@@ -758,6 +760,45 @@ extension_trait! {
758760
MinByFuture::new(self, compare)
759761
}
760762

763+
#[doc = r#"
764+
Returns the element that gives the maximum value with respect to the
765+
specified comparison function. If several elements are equally maximum,
766+
the first element is returned. If the stream is empty, `None` is returned.
767+
768+
# Examples
769+
770+
```
771+
# fn main() { async_std::task::block_on(async {
772+
#
773+
use std::collections::VecDeque;
774+
775+
use async_std::prelude::*;
776+
777+
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
778+
779+
let max = s.clone().max_by(|x, y| x.cmp(y)).await;
780+
assert_eq!(max, Some(3));
781+
782+
let max = s.max_by(|x, y| y.cmp(x)).await;
783+
assert_eq!(max, Some(1));
784+
785+
let max = VecDeque::<usize>::new().max_by(|x, y| x.cmp(y)).await;
786+
assert_eq!(max, None);
787+
#
788+
# }) }
789+
```
790+
"#]
791+
fn max_by<F>(
792+
self,
793+
compare: F,
794+
) -> impl Future<Output = Option<Self::Item>> [MaxByFuture<Self, F, Self::Item>]
795+
where
796+
Self: Sized,
797+
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
798+
{
799+
MaxByFuture::new(self, compare)
800+
}
801+
761802
#[doc = r#"
762803
Returns the nth element of the stream.
763804

0 commit comments

Comments
 (0)