dhttp/reqres/
sse.rs

1//! Server Sent Events (SSE)
2//! # Example
3//! ```
4//! use dhttp::reqres::res;
5//!
6//! # use dhttp::reqres::sse::{HttpSse, HttpSseEvent};
7//! struct MyEvents;
8//! impl HttpSse for MyEvents {
9//!     async fn next(&mut self) -> Option<HttpSseEvent> {
10//!         Some(HttpSseEvent::new("hello world"))
11//!     }
12//! }
13//! # use dhttp::core::{HttpService, HttpResult};
14//! # use dhttp::reqres::HttpRequest;
15//! # use dhttp::core::connection::HttpRead;
16//! struct MyService;
17//! impl HttpService for MyService {
18//!     async fn request(&self, _route: &str, _req: &HttpRequest, _body: &mut dyn HttpRead) -> HttpResult {
19//!         Ok(res::sse(MyEvents))
20//!     }
21//! }
22//! ```
23
24use std::fmt::Write;
25use std::io;
26
27use tokio::io::AsyncWriteExt;
28
29use crate::core::connection::HttpConnection;
30use crate::reqres::HttpUpgrade;
31
32pub struct HttpSseEvent(String);
33
34fn add_data(event: &mut String, data: &str) {
35    for line in data.split('\n') {
36        write!(event, "data: {}", line).unwrap();
37    }
38    event.push('\n');
39}
40
41impl HttpSseEvent {
42    pub fn new(data: &str) -> HttpSseEvent {
43        let mut event = String::new();
44        add_data(&mut event, data);
45        HttpSseEvent(event)
46    }
47
48    pub fn named(name: &str, data: &str) -> HttpSseEvent {
49        let mut event = format!("event: {}\n", name.replace('\n', ""));
50        add_data(&mut event, data);
51        HttpSseEvent(event)
52    }
53}
54
55/// SSE stream
56///
57/// Can be used through [`res::sse`]
58///
59/// [`res::sse`]: crate::reqres::res::sse
60#[doc(alias = "EventSource")]
61pub trait HttpSse: Send + 'static {
62    /// Produces a new event or `None` if there are no more events
63    fn next(&mut self) -> impl Future<Output = Option<HttpSseEvent>> + Send;
64}
65
66impl<T: HttpSse> HttpUpgrade for T {
67    async fn upgrade(&mut self, conn: &mut dyn HttpConnection) -> io::Result<()> {
68        while let Some(event) = self.next().await {
69            conn.write_all(event.0.as_bytes()).await?;
70        }
71
72        Ok(())
73    }
74}