Chain on a computation for when a value is ready, passing the successful
results to the provided closure f.
This function can be used to run a unit of work when the next successful
value on a stream is ready. The closure provided will be yielded a value
when ready, and the returned future will then be run to completion to
produce the next value on this stream.
Any errors produced by this stream will not be passed to the closure,
and will be passed through.
The returned value of the closure must implement the TryFuture trait
and can represent some more work to be done before the composed stream
is finished.
Note that this function consumes the receiving stream and returns a
wrapped version of it.
To process the entire stream and return a single future representing
success or error, use try_for_each instead.
Chain on a computation for when an error happens, passing the
erroneous result to the provided closure f.
This function can be used to run a unit of work and attempt to recover from
an error if one happens. The closure provided will be yielded an error
when one appears, and the returned future will then be run to completion
to produce the next value on this stream.
Any successful values produced by this stream will not be passed to the
closure, and will be passed through.
The returned value of the closure must implement the [TryFuture] trait
and can represent some more work to be done before the composed stream
is finished.
Note that this function consumes the receiving stream and returns a
wrapped version of it.
Creates a future that attempts to resolve the next item in the stream.
If an error is encountered before the next item, the error is returned
instead.
This is similar to the Stream::next combinator, but returns a
Result<Option<T>, E> rather than an Option<Result<T, E>>, making
for easy use with the ? operator.
Attempts to run this stream to completion, executing the provided
asynchronous closure for each element on the stream.
The provided closure will be called for each item this stream produces,
yielding a future. That future will then be executed to completion
before moving on to the next item.
The returned value is a Future where the
Output type is
Result<(), Self::Error>. If any of the intermediate
futures or the stream returns an error, this future will return
immediately with an error.
Attempts to run this stream to completion, executing the provided asynchronous
closure for each element on the stream concurrently as elements become
available, exiting as soon as an error occurs.
This is similar to
StreamExt::for_each_concurrent,
but will resolve to an error immediately if the underlying stream or the provided
closure return an error.
This method is only available when the std feature of this
library is activated, and it is activated by default.
#![feature(async_await, await_macro)]usefutures::channel::oneshot;
usefutures::stream::{self, StreamExt, TryStreamExt};
let (tx1, rx1) =oneshot::channel();
let (tx2, rx2) =oneshot::channel();
let (_tx3, rx3) =oneshot::channel();
letstream=stream::iter(vec![rx1, rx2, rx3]);
letfut=stream.map(Ok).try_for_each_concurrent(
/* limit */2,
asyncmove|rx| {
letres: Result<(), oneshot::Canceled>=await!(rx);
res
}
);
tx1.send(()).unwrap();
// Drop the second sender so that `rx2` resolves to `Canceled`.drop(tx2);
// The final result is an error because the second future// resulted in an error.assert_eq!(Err(oneshot::Canceled), await!(fut));
Attempt to Collect all of the values of this stream into a vector,
returning a future representing the result of that computation.
This combinator will collect all successful results of this stream and
collect them into a Vec<Self::Item>. If an error happens then all
collected elements will be dropped and the error will be returned.
The returned future will be resolved when the stream terminates.
Attempt to filter the values produced by this stream while
simultaneously mapping them to a different type according to the
provided asynchronous closure.
As values of this stream are made available, the provided function will
be run on them. If the future returned by the predicate f resolves to
Some(item) then the stream will yield the value item, but if
it resolves to None then the next value will be produced.
All errors are passed through without filtering in this combinator.
Note that this function consumes the stream passed into it and returns a
wrapped version of it, similar to the existing filter_map methods in
the standard library.
Attempt to execute an accumulating asynchronous computation over a
stream, collecting all the values into one final result.
This combinator will accumulate all values returned by this stream
according to the closure provided. The initial state is also provided to
this method and then is returned again by each execution of the closure.
Once the entire stream has been exhausted the returned future will
resolve to this value.
This method is similar to fold, but will
exit early if an error is encountered in either the stream or the
provided closure.
Attempt to concatenate all items of a stream into a single
extendable destination, returning a future representing the end result.
This combinator will extend the first item with the contents of all
the subsequent successful results of the stream. If the stream is empty,
the default value will be returned.
Works with all collections that implement the Extend trait.
This method is similar to concat, but will
exit early if an error is encountered in the stream.
Attempt to execute several futures from a stream concurrently.
This stream's Ok type must be a [TryFuture] with an Error type
that matches the stream's Error type.
This adaptor will buffer up to n futures and then return their
outputs in the order in which they complete. If the underlying stream
returns an error, it will be immediately propagated.
The returned stream will be a stream of results, each containing either
an error or a future's output. An error can be produced either by the
underlying stream itself or by one of the futures it yielded.
This method is only available when the std feature of this
library is activated, and it is activated by default.
Errors from the underlying stream itself are propagated:
#![feature(async_await, await_macro)]usefutures::channel::mpsc;
usefutures::future;
usefutures::stream::{StreamExt, TryStreamExt};
let (sink, stream_of_futures) =mpsc::unbounded();
letmutbuffered=stream_of_futures.try_buffer_unordered(10);
sink.unbounded_send(Ok(future::ready(Ok(7i32))));
assert_eq!(await!(buffered.next()), Some(Ok(7i32)));
sink.unbounded_send(Err("error in the stream"));
assert_eq!(await!(buffered.next()), Some(Err("error in the stream")));
Wraps a TryStream into a stream compatible with libraries using
futures 0.1 Stream. Requires the compat feature to be enabled.
#![feature(async_await, await_macro)]usefutures::future::{FutureExt, TryFutureExt};
letfuture03=async {
println!("Running on the pool");
tx.send(42).unwrap();
};
letfuture01=future03
.unit_error() // Make it a TryFuture
.boxed() // Make it Unpin
.compat();
tokio::run(future01);
Adapter that converts this stream into an [AsyncRead].
Note that because into_async_read moves the stream, the Stream type must be
Unpin. If you want to use into_async_read with a !Unpin stream, you'll
first have to pin the stream. This can be done by boxing the stream using Box::pin
or pinning it to the stack using the pin_mut! macro from the pin_utils crate.