dhttp/core/
connection.rs

1//! Connection types
2
3use std::io::{self, ErrorKind};
4use std::net::SocketAddr;
5use std::pin::Pin;
6use std::task::{Context, Poll, ready};
7
8use tokio::io::{AsyncRead, AsyncBufRead, AsyncWrite, BufReader, ReadBuf, Take};
9use tokio::net::TcpStream;
10
11/// Async buffered reader stream
12pub trait HttpRead: AsyncBufRead + Unpin + Send + Sync {}
13impl<T: AsyncBufRead + Unpin + Send + Sync> HttpRead for T {}
14
15/// Async writer stream
16pub trait HttpWrite: AsyncWrite + Unpin + Send + Sync {}
17impl<T: AsyncWrite + Unpin + Send + Sync> HttpWrite for T {}
18
19/// Async BufRead/Write stream that represents an HTTP connection
20pub trait HttpConnection: HttpRead + HttpWrite {}
21impl<T: HttpRead + HttpWrite> HttpConnection for T {}
22
23pub(crate) struct EmitContinue<T: HttpConnection> {
24    pub conn: Take<T>,
25    pub to_send: &'static [u8],
26}
27
28impl<T: HttpConnection> AsyncRead for EmitContinue<T> {
29    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
30        while !self.to_send.is_empty() {
31            let to_send = self.to_send;
32            let written = ready!(Pin::new(self.conn.get_mut()).poll_write(cx, to_send))?;
33            if written == 0 { return Poll::Ready(Err(ErrorKind::WriteZero.into())); }
34            self.to_send = &self.to_send[written..];
35        }
36
37        Pin::new(&mut self.conn).poll_read(cx, buf)
38    }
39}
40
41impl<T: HttpConnection> AsyncBufRead for EmitContinue<T> {
42    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
43        Pin::new(&mut Pin::into_inner(self).conn).poll_fill_buf(cx)
44    }
45
46    fn consume(self: Pin<&mut Self>, amt: usize) {
47        Pin::new(&mut Pin::into_inner(self).conn).consume(amt)
48    }
49}