1use std::io;
4use std::sync::Arc;
5use std::net::SocketAddr;
6use std::time::Duration;
7
8use tokio::io::{BufReader, AsyncReadExt, AsyncWriteExt};
9use tokio::net::TcpSocket;
10use socket2::SockRef;
11
12use crate::h1::{self, HttpRequestError};
13use crate::reqres::{HttpRequest, StatusCode};
14use crate::core::{HttpService, HttpServiceRaw, HttpErrorHandler, HttpErrorType, HttpLogger};
15use crate::core::connection::{HttpConnection, EmitContinue};
16use crate::services::{DefaultService, DefaultLogger, ErrorPageHandler};
17use crate::util::future::Or;
18
19const DEFAULT_MAX_HEADERS_SIZE: u64 = 65536; pub struct HttpServer {
23 pub name: String,
24 pub max_headers_size: u64,
25 pub service: Box<dyn HttpServiceRaw>,
26 pub error_handler: Box<dyn HttpErrorHandler>,
27 pub logger: Box<dyn HttpLogger>,
28}
29
30impl HttpServer {
31 pub fn new() -> HttpServer {
32 HttpServer {
33 name: "DrakoHTTP".to_string(),
34 max_headers_size: DEFAULT_MAX_HEADERS_SIZE,
35 service: Box::new(DefaultService),
36 error_handler: Box::new(ErrorPageHandler { name: "DrakoHTTP".to_string() }),
37 logger: Box::new(DefaultLogger),
38 }
39 }
40
41 pub fn service(&mut self, service: impl HttpService) -> &mut Self {
42 self.service = Box::new(service);
43 self
44 }
45
46 pub fn error_handler(&mut self, error_handler: impl HttpErrorHandler) -> &mut Self {
47 self.error_handler = Box::new(error_handler);
48 self
49 }
50
51 pub fn logger(&mut self, logger: impl HttpLogger) -> &mut Self {
52 self.logger = Box::new(logger);
53 self
54 }
55}
56
57impl Default for HttpServer {
58 fn default() -> HttpServer {
59 HttpServer::new()
60 }
61}
62
63impl HttpServer {
64 async fn handle_connection(&self, mut conn: impl HttpConnection, addr: SocketAddr) -> io::Result<()> {
65 let mut connection_close = false;
66 while !connection_close {
67 let req = h1::read((&mut conn).take(self.max_headers_size)).await;
68 if let Err(err) = req {
69 if let HttpRequestError::Io(err) = err {
70 return Err(err);
72 } else {
73 let res = self.error_handler.plain_code(StatusCode::BAD_REQUEST);
75 h1::send(&HttpRequest::default(), res, &mut conn).await?;
76 return conn.shutdown().await;
77 }
78 }
79 let mut req = req.unwrap();
81
82 req.addr = addr.ip().to_canonical();
84
85 if req.version.major != 1 {
88 let res = self.error_handler.plain_code(StatusCode::HTTP_VERSION_NOT_SUPPORTED);
89 h1::send(&req, res, &mut conn).await?;
90 return conn.shutdown().await;
91 }
92
93 let mut body = EmitContinue {
98 conn: (&mut conn).take(req.len),
99 to_send: b"",
100 };
101 if req.cmp_header("Expect", "100-continue") {
102 body.to_send = b"HTTP/1.1 100 Continue\r\n\r\n";
103 }
104
105 let mut res = match self.service.filter_raw(&req.route, &req) {
111 Ok(()) => self.service.request_raw(&req.route, &req, &mut body).await,
112 Err(err) => Err(err),
113 };
114
115 if let Ok(res) = &res {
116 self.logger.log(&req, res);
118 } else if let Err(err) = res {
119 let mut handled = match err.error_type() {
121 HttpErrorType::Fatal => return conn.shutdown().await,
123 HttpErrorType::Hidden => self.error_handler.plain_code(err.status_code()),
125 HttpErrorType::User => self.error_handler.error(&req, err.as_ref()),
127 };
128 handled.code = err.status_code();
130 match err.error_type() {
132 HttpErrorType::Fatal => unreachable!(),
133 HttpErrorType::Hidden => self.logger.log(&req, &handled),
134 HttpErrorType::User => self.logger.err(&req, &handled, err.as_ref()),
135 };
136 res = Ok(handled);
137 }
138 let mut res = res.unwrap();
140
141 if !self.name.is_empty() {
143 res.add_header("Server", &self.name);
144 }
145
146 if body.conn.limit() != 0 || req.version.is(1, 0) {
150 res.add_header("Connection", "close");
151 connection_close = true;
152 } else if req.version.is(1, 1) {
153 if !req.has_header("Connection") || req.cmp_header("Connection", "keep-alive") {
154 res.add_header("Connection", "keep-alive");
155 } else {
156 res.add_header("Connection", "close");
157 connection_close = true;
158 }
159 }
160
161 h1::send(&req, res, &mut conn).await?;
163 }
164 conn.shutdown().await
166 }
167}
168
169pub async fn serve_tcp(addr: &str, server: impl Into<Arc<HttpServer>>) -> io::Result<()> {
171 let addr: SocketAddr = addr.parse().map_err(io::Error::other)?;
172
173 let sock = match addr {
174 SocketAddr::V4(_) => TcpSocket::new_v4()?,
175 SocketAddr::V6(_) => TcpSocket::new_v6()?,
176 };
177
178 if addr.is_ipv6() && addr.ip().is_unspecified() {
179 SockRef::from(&sock).set_only_v6(false)?;
181 }
182
183 #[cfg(not(windows))]
186 sock.set_reuseaddr(true)?;
187 sock.set_nodelay(true)?;
189
190 sock.bind(addr)?;
191
192 let tcp = sock.listen(128)?;
193 let server = server.into();
194 let mut err_shown = false;
195 loop {
196 let result = Or::new(tcp.accept(), tokio::signal::ctrl_c()).await;
198 if result.is_err() { break; }
199
200 match result.unwrap() {
201 Ok((conn, addr)) => {
202 err_shown = false;
203 let server2 = Arc::clone(&server);
204 tokio::spawn(async move {
205 let _ = server2.handle_connection(BufReader::new(conn), addr).await;
207 });
208 }
209 Err(e) => {
210 if !err_shown {
212 println!("DrakoHTTP critical error: connection not accepted: {e}");
213 err_shown = true;
214 }
215 let d = Duration::from_millis(100);
216 tokio::time::sleep(d).await;
217 }
218 };
219 }
220
221 Ok(())
222}
223
224pub fn tokio_rt() -> io::Result<tokio::runtime::Runtime> {
228 tokio::runtime::Builder::new_multi_thread()
229 .enable_all()
230 .build()
231}