dhttp/
server.rs

1//! HTTP server
2
3use std::io;
4use std::sync::Arc;
5use std::net::SocketAddr;
6use std::time::Duration;
7
8use tokio::io::{BufReader, AsyncReadExt, AsyncWriteExt};
9use tokio::net::TcpSocket;
10use socket2::SockRef;
11
12use crate::h1::{self, HttpRequestError};
13use crate::reqres::{HttpRequest, StatusCode};
14use crate::core::{HttpService, HttpServiceRaw, HttpErrorHandler, HttpErrorType, HttpLogger};
15use crate::core::connection::{HttpConnection, EmitContinue};
16use crate::services::{DefaultService, DefaultLogger, ErrorPageHandler};
17use crate::util::future::Or;
18
19const DEFAULT_MAX_HEADERS_SIZE: u64 = 65536; // 64KB
20
21/// An HTTP/1.1 server
22pub struct HttpServer {
23    pub name: String,
24    pub max_headers_size: u64,
25    pub service: Box<dyn HttpServiceRaw>,
26    pub error_handler: Box<dyn HttpErrorHandler>,
27    pub logger: Box<dyn HttpLogger>,
28}
29
30impl HttpServer {
31    pub fn new() -> HttpServer {
32        HttpServer {
33            name: "DrakoHTTP".to_string(),
34            max_headers_size: DEFAULT_MAX_HEADERS_SIZE,
35            service: Box::new(DefaultService),
36            error_handler: Box::new(ErrorPageHandler { name: "DrakoHTTP".to_string() }),
37            logger: Box::new(DefaultLogger),
38        }
39    }
40
41    pub fn service(&mut self, service: impl HttpService) -> &mut Self {
42        self.service = Box::new(service);
43        self
44    }
45
46    pub fn error_handler(&mut self, error_handler: impl HttpErrorHandler) -> &mut Self {
47        self.error_handler = Box::new(error_handler);
48        self
49    }
50
51    pub fn logger(&mut self, logger: impl HttpLogger) -> &mut Self {
52        self.logger = Box::new(logger);
53        self
54    }
55}
56
57impl Default for HttpServer {
58    fn default() -> HttpServer {
59        HttpServer::new()
60    }
61}
62
63impl HttpServer {
64    async fn handle_connection(&self, mut conn: impl HttpConnection, addr: SocketAddr) -> io::Result<()> {
65        let mut connection_close = false;
66        while !connection_close {
67            let req = h1::read((&mut conn).take(self.max_headers_size)).await;
68            if let Err(err) = req {
69                if let HttpRequestError::Io(err) = err {
70                    // IO errors should not be handled
71                    return Err(err);
72                } else {
73                    // Could not parse request, return Bad request
74                    let res = self.error_handler.plain_code(StatusCode::BAD_REQUEST);
75                    h1::send(&HttpRequest::default(), res, &mut conn).await?;
76                    return conn.shutdown().await;
77                }
78            }
79            // Request is Ok
80            let mut req = req.unwrap();
81
82            // Address has to be set by the connection handler
83            req.addr = addr.ip().to_canonical();
84
85            // HTTP/2 prior knowledge headers look like `PRI * HTTP/2.0`
86            // These connections are not supported
87            if req.version.major != 1 {
88                let res = self.error_handler.plain_code(StatusCode::HTTP_VERSION_NOT_SUPPORTED);
89                h1::send(&req, res, &mut conn).await?;
90                return conn.shutdown().await;
91            }
92
93            // Before starting file upload, curl expects server to send `100 Continue` response
94            // Otherwise, it will wait for a timeout
95            // This adapter echoes `100 Continue` when service starts reading the body
96            // (meaning, that service has accepted it)
97            let mut body = EmitContinue {
98                conn: (&mut conn).take(req.len),
99                to_send: b"",
100            };
101            if req.cmp_header("Expect", "100-continue") {
102                body.to_send = b"HTTP/1.1 100 Continue\r\n\r\n";
103            }
104
105            // Future TODO: HTTP/1.1 connection handler has a lot of hardcoded functionality
106            // that still applies to HTTP/2 and QUIC. Some logic here could be separated
107
108            // Before executing the service, we have to check if request is compatible
109            // This is connection handler's responsibility
110            let mut res = match self.service.filter_raw(&req.route, &req) {
111                Ok(()) => self.service.request_raw(&req.route, &req, &mut body).await,
112                Err(err) => Err(err),
113            };
114
115            if let Ok(res) = &res {
116                // Log request+response with our defined logger
117                self.logger.log(&req, res);
118            } else if let Err(err) = res {
119                // Response is Err, should be handled with defined error handler
120                let mut handled = match err.error_type() {
121                    // IO error
122                    HttpErrorType::Fatal => return conn.shutdown().await,
123                    // Status code
124                    HttpErrorType::Hidden => self.error_handler.plain_code(err.status_code()),
125                    // Error with description
126                    HttpErrorType::User => self.error_handler.error(&req, err.as_ref()),
127                };
128                // Always use the original status code in the error response (connection handler sets this)
129                handled.code = err.status_code();
130                // Log the error
131                match err.error_type() {
132                    HttpErrorType::Fatal => unreachable!(),
133                    HttpErrorType::Hidden => self.logger.log(&req, &handled),
134                    HttpErrorType::User => self.logger.err(&req, &handled, err.as_ref()),
135                };
136                res = Ok(handled);
137            }
138            // Response is Ok
139            let mut res = res.unwrap();
140
141            // Add our server name
142            if !self.name.is_empty() {
143                res.add_header("Server", &self.name);
144            }
145
146            // Stop pipelining if:
147            // - service didn't consume the body completely
148            // - HTTP/1.0 (doesn't support pipelining)
149            if body.conn.limit() != 0 || req.version.is(1, 0) {
150                res.add_header("Connection", "close");
151                connection_close = true;
152            } else if req.version.is(1, 1) {
153                if !req.has_header("Connection") || req.cmp_header("Connection", "keep-alive") {
154                    res.add_header("Connection", "keep-alive");
155                } else {
156                    res.add_header("Connection", "close");
157                    connection_close = true;
158                }
159            }
160
161            // Now, send the response
162            h1::send(&req, res, &mut conn).await?;
163        }
164        // Loop ended, we close the connection now
165        conn.shutdown().await
166    }
167}
168
169/// Starts handling connections on a given [`HttpServer`], without TLS
170pub async fn serve_tcp(addr: &str, server: impl Into<Arc<HttpServer>>) -> io::Result<()> {
171    let addr: SocketAddr = addr.parse().map_err(io::Error::other)?;
172
173    let sock = match addr {
174        SocketAddr::V4(_) => TcpSocket::new_v4()?,
175        SocketAddr::V6(_) => TcpSocket::new_v6()?,
176    };
177
178    if addr.is_ipv6() && addr.ip().is_unspecified() {
179        // allows to use [::] for both ipv4 and ipv6 on windows
180        SockRef::from(&sock).set_only_v6(false)?;
181    }
182
183    // https://github.com/tokio-rs/mio/blob/b0578c2d166c2ebc78dfd5f70395591351ba8dde/src/net/tcp/listener.rs#L73
184    // TL;DR socket is active some time after closing and you can't rebind it even if you have exited
185    #[cfg(not(windows))]
186    sock.set_reuseaddr(true)?;
187    // already buffered
188    sock.set_nodelay(true)?;
189
190    sock.bind(addr)?;
191
192    let tcp = sock.listen(128)?;
193    let server = server.into();
194    let mut err_shown = false;
195    loop {
196        // This way, shutdown is handled gracefully
197        let result = Or::new(tcp.accept(), tokio::signal::ctrl_c()).await;
198        if result.is_err() { break; }
199
200        match result.unwrap() {
201            Ok((conn, addr)) => {
202                err_shown = false;
203                let server2 = Arc::clone(&server);
204                tokio::spawn(async move {
205                    // ignore network errors
206                    let _ = server2.handle_connection(BufReader::new(conn), addr).await;
207                });
208            }
209            Err(e) => {
210                // this may fire when fd limit is exhausted
211                if !err_shown {
212                    println!("DrakoHTTP critical error: connection not accepted: {e}");
213                    err_shown = true;
214                }
215                let d = Duration::from_millis(100);
216                tokio::time::sleep(d).await;
217            }
218        };
219    }
220
221    Ok(())
222}
223
224/// Builds the tokio runtime
225///
226/// This function is a simple replacement for `#[tokio::main]` that does not use macros
227pub fn tokio_rt() -> io::Result<tokio::runtime::Runtime> {
228    tokio::runtime::Builder::new_multi_thread()
229        .enable_all()
230        .build()
231}