1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
use core::marker::Unpin;
use core::pin::PinMut;
use futures_core::future::TryFuture;
use futures_core::task::{self, Poll};
use futures_sink::Sink;

#[derive(Debug)]
enum State<Fut, Si> {
    Waiting(Fut),
    Ready(Si),
    Closed,
}
use self::State::*;

/// Future for the [`flatten_sink`](super::TryFutureExt::flatten_sink)
/// combinator.
#[derive(Debug)]
pub struct FlattenSink<Fut, Si>(State<Fut, Si>);

impl<Fut: Unpin, Si: Unpin> Unpin for FlattenSink<Fut, Si> {}

impl<Fut, Si> FlattenSink<Fut, Si>
where
    Fut: TryFuture<Ok = Si>,
    Si: Sink<SinkError = Fut::Error>,
{
    pub(super) fn new(future: Fut) -> FlattenSink<Fut, Si> {
        FlattenSink(Waiting(future))
    }

    #[allow(needless_lifetimes)] // https://github.com/rust-lang/rust/issues/52675
    fn project_pin<'a>(
        self: PinMut<'a, Self>
    ) -> State<PinMut<'a, Fut>, PinMut<'a, Si>> {
        unsafe {
            match &mut PinMut::get_mut_unchecked(self).0 {
                Waiting(f) => Waiting(PinMut::new_unchecked(f)),
                Ready(s) => Ready(PinMut::new_unchecked(s)),
                Closed => Closed,
            }
        }
    }
}

impl<Fut, Si> Sink for FlattenSink<Fut, Si>
where
    Fut: TryFuture<Ok = Si>,
    Si: Sink<SinkError = Fut::Error>,
{
    type SinkItem = Si::SinkItem;
    type SinkError = Si::SinkError;

    fn poll_ready(
        mut self: PinMut<Self>,
        cx: &mut task::Context,
    ) -> Poll<Result<(), Self::SinkError>> {
        let resolved_stream = match self.reborrow().project_pin() {
            Ready(s) => return s.poll_ready(cx),
            Waiting(f) => try_ready!(f.try_poll(cx)),
            Closed => panic!("poll_ready called after eof"),
        };
        PinMut::set(self.reborrow(), FlattenSink(Ready(resolved_stream)));
        if let Ready(resolved_stream) = self.project_pin() {
            resolved_stream.poll_ready(cx)
        } else {
            unreachable!()
        }
    }

    fn start_send(
        self: PinMut<Self>,
        item: Self::SinkItem,
    ) -> Result<(), Self::SinkError> {
        match self.project_pin() {
            Ready(s) => s.start_send(item),
            Waiting(_) => panic!("poll_ready not called first"),
            Closed => panic!("start_send called after eof"),
        }
    }

    fn poll_flush(
        self: PinMut<Self>,
        cx: &mut task::Context,
    ) -> Poll<Result<(), Self::SinkError>> {
        match self.project_pin() {
            Ready(s) => s.poll_flush(cx),
            // if sink not yet resolved, nothing written ==> everything flushed
            Waiting(_) => Poll::Ready(Ok(())),
            Closed => panic!("poll_flush called after eof"),
        }
    }

    fn poll_close(
        mut self: PinMut<Self>,
        cx: &mut task::Context,
    ) -> Poll<Result<(), Self::SinkError>> {
        let res = match self.reborrow().project_pin() {
            Ready(s) => s.poll_close(cx),
            Waiting(_) | Closed => Poll::Ready(Ok(())),
        };
        if res.is_ready() {
            PinMut::set(self, FlattenSink(Closed));
        }
        res
    }
}