1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
use super::Compat; use futures::{ task as task01, Async as Async01, AsyncSink as AsyncSink01, Future as Future01, Poll as Poll01, Sink as Sink01, StartSend as StartSend01, Stream as Stream01, }; use futures_core::{ task as task03, TryFuture as TryFuture03, TryStream as TryStream03, }; use futures_sink::Sink as Sink03; use std::{marker::Unpin, pin::PinMut, sync::Arc}; impl<Fut, Sp> Future01 for Compat<Fut, Sp> where Fut: TryFuture03 + Unpin, Sp: task03::Spawn, { type Item = Fut::Ok; type Error = Fut::Error; fn poll(&mut self) -> Poll01<Self::Item, Self::Error> { with_context(self, |inner, cx| match inner.try_poll(cx) { task03::Poll::Ready(Ok(t)) => Ok(Async01::Ready(t)), task03::Poll::Pending => Ok(Async01::NotReady), task03::Poll::Ready(Err(e)) => Err(e), }) } } impl<St, Sp> Stream01 for Compat<St, Sp> where St: TryStream03 + Unpin, Sp: task03::Spawn, { type Item = St::Ok; type Error = St::Error; fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> { with_context(self, |inner, cx| match inner.try_poll_next(cx) { task03::Poll::Ready(None) => Ok(Async01::Ready(None)), task03::Poll::Ready(Some(Ok(t))) => Ok(Async01::Ready(Some(t))), task03::Poll::Pending => Ok(Async01::NotReady), task03::Poll::Ready(Some(Err(e))) => Err(e), }) } } impl<T, E> Sink01 for Compat<T, E> where T: Sink03 + Unpin, E: task03::Spawn, { type SinkItem = T::SinkItem; type SinkError = T::SinkError; fn start_send( &mut self, item: Self::SinkItem, ) -> StartSend01<Self::SinkItem, Self::SinkError> { with_context(self, |mut inner, cx| { match inner.reborrow().poll_ready(cx) { task03::Poll::Ready(Ok(())) => { inner.start_send(item).map(|()| AsyncSink01::Ready) } task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)), task03::Poll::Ready(Err(e)) => Err(e), } }) } fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> { with_context(self, |inner, cx| match inner.poll_flush(cx) { task03::Poll::Ready(Ok(())) => Ok(Async01::Ready(())), task03::Poll::Pending => Ok(Async01::NotReady), task03::Poll::Ready(Err(e)) => Err(e), }) } fn close(&mut self) -> Poll01<(), Self::SinkError> { with_context(self, |inner, cx| match inner.poll_close(cx) { task03::Poll::Ready(Ok(())) => Ok(Async01::Ready(())), task03::Poll::Pending => Ok(Async01::NotReady), task03::Poll::Ready(Err(e)) => Err(e), }) } } fn current_as_waker() -> task03::LocalWaker { let arc_waker = Arc::new(Current(task01::current())); task03::local_waker_from_nonlocal(arc_waker) } struct Current(task01::Task); impl task03::Wake for Current { fn wake(arc_self: &Arc<Self>) { arc_self.0.notify(); } } fn with_context<T, E, R, F>(compat: &mut Compat<T, E>, f: F) -> R where T: Unpin, E: task03::Spawn, F: FnOnce(PinMut<T>, &mut task03::Context) -> R, { let waker = current_as_waker(); let spawn = compat.spawn.as_mut().unwrap(); let mut cx = task03::Context::new(&waker, spawn); f(PinMut::new(&mut compat.inner), &mut cx) }