1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use futures_core::stream::Stream;
use futures_core::task::{Waker, Poll};
use futures_sink::Sink;
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::collections::VecDeque;
use std::pin::Pin;

/// Sink for the `Sink::buffer` combinator, which buffers up to some fixed
/// number of values when the underlying sink is unable to accept them.
#[derive(Debug)]
#[must_use = "sinks do nothing unless polled"]
pub struct Buffer<Si: Sink> {
    sink: Si,
    buf: VecDeque<Si::SinkItem>,

    // Track capacity separately from the `VecDeque`, which may be rounded up
    capacity: usize,
}

impl<Si: Sink + Unpin> Unpin for Buffer<Si> {}

impl<Si: Sink> Buffer<Si> {
    unsafe_pinned!(sink: Si);
    unsafe_unpinned!(buf: VecDeque<Si::SinkItem>);
    unsafe_unpinned!(capacity: usize);


    pub(super) fn new(sink: Si, capacity: usize) -> Buffer<Si> {
        Buffer {
            sink,
            buf: VecDeque::with_capacity(capacity),
            capacity,
        }
    }

    /// Get a shared reference to the inner sink.
    pub fn get_ref(&self) -> &Si {
        &self.sink
    }

    fn try_empty_buffer(
        mut self: Pin<&mut Self>,
        waker: &Waker
    ) -> Poll<Result<(), Si::SinkError>> {
        try_ready!(self.as_mut().sink().poll_ready(waker));
        while let Some(item) = self.as_mut().buf().pop_front() {
            if let Err(e) = self.as_mut().sink().start_send(item) {
                return Poll::Ready(Err(e));
            }
            if !self.buf.is_empty() {
                try_ready!(self.as_mut().sink().poll_ready(waker));
            }
        }
        Poll::Ready(Ok(()))
    }
}

// Forwarding impl of Stream from the underlying sink
impl<S> Stream for Buffer<S> where S: Sink + Stream {
    type Item = S::Item;

    fn poll_next(self: Pin<&mut Self>, waker: &Waker) -> Poll<Option<S::Item>> {
        self.sink().poll_next(waker)
    }
}

impl<Si: Sink> Sink for Buffer<Si> {
    type SinkItem = Si::SinkItem;
    type SinkError = Si::SinkError;

    fn poll_ready(
        mut self: Pin<&mut Self>,
        waker: &Waker,
    ) -> Poll<Result<(), Self::SinkError>> {
        if self.capacity == 0 {
            return self.as_mut().sink().poll_ready(waker);
        }

        if let Poll::Ready(Err(e)) = self.as_mut().try_empty_buffer(waker) {
            return Poll::Ready(Err(e));
        }

        if self.buf.len() >= self.capacity {
            Poll::Pending
        } else {
            Poll::Ready(Ok(()))
        }
    }

    fn start_send(
        mut self: Pin<&mut Self>,
        item: Self::SinkItem,
    ) -> Result<(), Self::SinkError> {
        if self.capacity == 0 {
            self.as_mut().sink().start_send(item)
        } else {
            self.as_mut().buf().push_back(item);
            Ok(())
        }
    }

    fn poll_flush(
        mut self: Pin<&mut Self>,
        waker: &Waker,
    ) -> Poll<Result<(), Self::SinkError>> {
        try_ready!(self.as_mut().try_empty_buffer(waker));
        debug_assert!(self.as_mut().buf().is_empty());
        self.as_mut().sink().poll_flush(waker)
    }

    fn poll_close(
        mut self: Pin<&mut Self>,
        waker: &Waker,
    ) -> Poll<Result<(), Self::SinkError>> {
        try_ready!(self.as_mut().try_empty_buffer(waker));
        debug_assert!(self.as_mut().buf().is_empty());
        self.as_mut().sink().poll_close(waker)
    }
}