1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
use crate::{future::FutureExt, try_future::TryFutureExt}; use futures_core::future::FutureObj; use futures_core::task::{Spawn, SpawnError}; use tokio_executor::{DefaultExecutor, Executor as TokioExecutor}; /// A spawner that delegates to `tokio`'s /// [`DefaultExecutor`](tokio_executor::DefaultExecutor), will panic if used in /// the context of a task that is not running on `tokio`'s executor. /// /// *NOTE* The future of this struct in `futures` is uncertain. It may be /// deprecated before or soon after the initial 0.3 release and moved to a /// feature in `tokio` instead. /// /// # Examples /// /// ```ignore /// #![feature(async_await, await_macro, futures_api)] /// use futures::future::{FutureExt, TryFutureExt}; /// use futures::spawn; /// use futures::compat::TokioDefaultSpawner; /// # let (tx, rx) = futures::channel::oneshot::channel(); /// /// let future03 = async { /// println!("Running on the pool"); /// spawn!(async { /// println!("Spawned!"); /// # tx.send(42).unwrap(); /// }).unwrap(); /// }; /// /// let future01 = future03 /// .unit_error() // Make it a TryFuture /// .boxed() // Make it Unpin /// .compat(TokioDefaultSpawner); /// /// tokio::run(future01); /// # futures::executor::block_on(rx).unwrap(); /// ``` #[derive(Debug, Copy, Clone)] pub struct TokioDefaultSpawner; impl Spawn for TokioDefaultSpawner { fn spawn_obj( &mut self, future: FutureObj<'static, ()>, ) -> Result<(), SpawnError> { let fut = Box::new(future.unit_error().compat()); DefaultExecutor::current().spawn(fut).map_err(|err| { panic!( "tokio failed to spawn and doesn't return the future: {:?}", err ) }) } fn status(&self) -> Result<(), SpawnError> { DefaultExecutor::current().status().map_err(|err| { if err.is_shutdown() { SpawnError::shutdown() } else { panic!( "tokio executor failed for non-shutdown reason: {:?}", err ) } }) } }