Creates a future that attempts to resolve the next item in the stream.
If an error is encountered before the next item, the error is returned
instead.
This is similar to the Stream::next combinator, but returns a
Result<Option<T>, E> rather than an Option<Result<T, E>>, making
for easy use with the ? operator.
Attempts to run this stream to completion, executing the provided
asynchronous closure for each element on the stream.
The provided closure will be called for each item this stream produces,
yielding a future. That future will then be executed to completion
before moving on to the next item.
The returned value is a Future where the
Output type is
Result<(), Self::Error>. If any of the intermediate
futures or the stream returns an error, this future will return
immediately with an error.
Attempts to run this stream to completion, executing the provided asynchronous
closure for each element on the stream concurrently as elements become
available, exiting as soon as an error occurs.
This is similar to
StreamExt::for_each_concurrent,
but will resolve to an error immediately if the underlying stream or the provided
closure return an error.
This method is only available when the std feature of this
library is activated, and it is activated by default.
#![feature(async_await, await_macro)]usefutures::channel::oneshot;
usefutures::stream::{self, StreamExt, TryStreamExt};
let (tx1, rx1) =oneshot::channel();
let (tx2, rx2) =oneshot::channel();
let (_tx3, rx3) =oneshot::channel();
letstream=stream::iter(vec![rx1, rx2, rx3]);
letfut=stream.map(Ok).try_for_each_concurrent(
/* limit */2,
asyncmove|rx| {
letres: Result<(), oneshot::Canceled>=await!(rx);
res
}
);
tx1.send(()).unwrap();
// Drop the second sender so that `rx2` resolves to `Canceled`.drop(tx2);
// The final result is an error because the second future// resulted in an error.assert_eq!(Err(oneshot::Canceled), await!(fut));
Attempt to Collect all of the values of this stream into a vector,
returning a future representing the result of that computation.
This combinator will collect all successful results of this stream and
collect them into a Vec<Self::Item>. If an error happens then all
collected elements will be dropped and the error will be returned.
The returned future will be resolved when the stream terminates.
This method is only available when the std feature of this
library is activated, and it is activated by default.
Attempt to filter the values produced by this stream while
simultaneously mapping them to a different type according to the
provided asynchronous closure.
As values of this stream are made available, the provided function will
be run on them. If the future returned by the predicate f resolves to
Some(item) then the stream will yield the value item, but if
it resolves to None then the next value will be produced.
All errors are passed through without filtering in this combinator.
Note that this function consumes the stream passed into it and returns a
wrapped version of it, similar to the existing filter_map methods in
the standard library.
Attempt to execute an accumulating asynchronous computation over a
stream, collecting all the values into one final result.
This combinator will accumulate all values returned by this stream
according to the closure provided. The initial state is also provided to
this method and then is returned again by each execution of the closure.
Once the entire stream has been exhausted the returned future will
resolve to this value.
This method is similar to fold, but will
exit early if an error is encountered in either the stream or the
provided closure.
Attempt to execute several futures from a stream concurrently.
This stream's Ok type must be a TryFuture with an Error type
that matches the stream's Error type.
This adaptor will buffer up to n futures and then return their
outputs in the order in which they complete. If the underlying stream
returns an error, it will be immediately propagated.
The returned stream will be a stream of results, each containing either
an error or a future's output. An error can be produced either by the
underlying stream itself or by one of the futures it yielded.
This method is only available when the std feature of this
library is activated, and it is activated by default.
Errors from the underlying stream itself are propagated:
#![feature(async_await, await_macro)]usefutures::channel::mpsc;
usefutures::future;
usefutures::stream::{StreamExt, TryStreamExt};
let (sink, stream_of_futures) =mpsc::unbounded();
letmutbuffered=stream_of_futures.try_buffer_unordered(10);
sink.unbounded_send(Ok(future::ready(Ok(7i32))));
assert_eq!(await!(buffered.next()), Some(Ok(7i32)));
sink.unbounded_send(Err("error in the stream"));
assert_eq!(await!(buffered.next()), Some(Err("error in the stream")));
Wraps a TryStream into a stream compatible with libraries using
futures 0.1 Stream. Requires the compat feature to be enabled.
#![feature(async_await, await_macro, futures_api)]usefutures::future::{FutureExt, TryFutureExt};
usefutures::spawn;
usefutures::compat::TokioDefaultSpawner;
letfuture03=async {
println!("Running on the pool");
spawn!(async {
println!("Spawned!");
}).unwrap();
};
letfuture01=future03
.unit_error() // Make it a TryFuture
.boxed() // Make it Unpin
.compat(TokioDefaultSpawner);
tokio::run(future01);