1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91

use super::Compat;
use crate::{TryFutureExt, FutureExt, future::UnitError};
use futures01::future::Executor as Executor01;
use futures_core::task::Spawn as Spawn03;
use futures_core::task as task03;
use futures_core::future::FutureObj;

/// A future that can run on a futures 0.1
/// [`Executor`](futures01::future::Executor).
pub type Executor01Future = Compat<UnitError<FutureObj<'static, ()>>, Box<dyn Spawn03 + Send>>;

/// Extension trait for futures 0.1 [`Executor`](futures01::future::Executor).
pub trait Executor01CompatExt: Executor01<Executor01Future> +
                               Clone + Send + 'static
{
    /// Converts a futures 0.1 [`Executor`](futures01::future::Executor) into a
    /// futures 0.3 [`Spawn`](futures_core::task::Spawn).
    ///
    /// ```
    /// #![feature(async_await, await_macro, futures_api)]
    /// use futures01::Future;
    /// use futures::future::{FutureExt, TryFutureExt};
    /// use futures::compat::Executor01CompatExt;
    /// use futures::spawn;
    /// use tokio_threadpool::ThreadPool;
    ///
    /// let pool01 = ThreadPool::new();
    /// let spawner03 = pool01.sender().clone().compat();
    /// # let (tx, rx) = futures::channel::oneshot::channel();
    ///
    /// let future03 = async {
    ///     println!("Running on the pool");
    ///     spawn!(async {
    ///         println!("Spawned!");
    ///         # tx.send(42).unwrap();
    ///     }).unwrap();
    /// };
    ///
    /// let future01 = future03.unit_error().boxed().compat(spawner03);
    ///
    /// pool01.spawn(future01);
    /// pool01.shutdown_on_idle().wait().unwrap();
    /// # futures::executor::block_on(rx).unwrap();
    /// ```
    fn compat(self) -> Executor01As03<Self>
        where Self: Sized;
}

impl<Ex> Executor01CompatExt for Ex
where Ex: Executor01<Executor01Future> + Clone + Send + 'static
{
    fn compat(self) -> Executor01As03<Self> {
        Executor01As03 {
            executor01: self,
        }
    }
}

/// Converts a futures 0.1 [`Executor`](futures01::future::Executor) into a
/// futures 0.3 [`Spawn`](futures_core::task::Spawn).
#[derive(Clone)]
pub struct Executor01As03<Ex> {
    executor01: Ex
}

impl<Ex> Spawn03 for Executor01As03<Ex>
where Ex: Executor01<Executor01Future>,
      Ex: Clone + Send + 'static,
{
    fn spawn_obj(
        &mut self,
        future: FutureObj<'static, ()>,
    ) -> Result<(), task03::SpawnObjError> {
        let exec: Box<dyn Spawn03 + Send> = Box::new(self.clone());
        let future = future.unit_error().compat(exec);

        match self.executor01.execute(future) {
            Ok(()) => Ok(()),
            Err(err) => {
                use futures_core::task::{SpawnObjError, SpawnErrorKind};

                let fut = err.into_future().into_inner().unwrap_or_else(|_| ());
                Err(SpawnObjError {
                    kind: SpawnErrorKind::shutdown(),
                    future: Box::new(fut).into(),
                })
            }
        }
    }
}