Mercurial > lbo > hg > geohub
changeset 37:08b4f7127980
Encapsulate notification messaging
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Thu, 03 Dec 2020 09:35:51 +0100 |
parents | ebdb9c50adb1 |
children | b66a3ef84f24 |
files | src/http.rs src/ids.rs src/main.rs src/notifier.rs src/types.rs src/util.rs |
diffstat | 6 files changed, 72 insertions(+), 67 deletions(-) [+] |
line wrap: on
line diff
--- a/src/http.rs Thu Dec 03 09:25:29 2020 +0100 +++ b/src/http.rs Thu Dec 03 09:35:51 2020 +0100 @@ -1,13 +1,12 @@ - use rocket::response::Responder; #[derive(Responder)] pub enum GeoHubResponse { - #[response(status=200, content_type="json")] + #[response(status = 200, content_type = "json")] Ok(String), - #[response(status=400)] + #[response(status = 400)] BadRequest(String), - #[response(status=500)] + #[response(status = 500)] ServerError(String), }
--- a/src/ids.rs Thu Dec 03 09:25:29 2020 +0100 +++ b/src/ids.rs Thu Dec 03 09:35:51 2020 +0100 @@ -21,4 +21,3 @@ assert!(parts.len() == 4); return (parts[2], parts[3]); } -
--- a/src/main.rs Thu Dec 03 09:25:29 2020 +0100 +++ b/src/main.rs Thu Dec 03 09:35:51 2020 +0100 @@ -8,13 +8,13 @@ mod util; use std::sync::{mpsc, Arc, Mutex}; -use std::time; use postgres; use rocket; -/// Almost like retrieve/json, but sorts in descending order and doesn't work with intervals (only -/// limit). Used for backfilling recent points in the UI. +/// Almost like retrieve/json, but sorts in descending order, doesn't work with intervals (only +/// limit), and returns a LiveUpdate. +/// Used for backfilling recent points in the UI. #[rocket::get("/geo/<name>/retrieve/last?<secret>&<last>&<limit>")] fn retrieve_last( db: db::DBConn, @@ -22,40 +22,26 @@ secret: Option<String>, last: Option<i32>, limit: Option<i64>, -) -> rocket_contrib::json::Json<LiveUpdate> { +) -> rocket_contrib::json::Json<types::LiveUpdate> { let db = db::DBQuery(&db.0); if let Some((geojson, newlast)) = db.check_for_new_rows(&name, secret.as_ref().map(|s| s.as_str()), &last, &limit) { - return rocket_contrib::json::Json(LiveUpdate { - typ: "GeoHubUpdate".into(), - last: Some(newlast), - geo: Some(geojson), - error: None, - }); + rocket_contrib::json::Json(types::LiveUpdate::new(Some(newlast), Some(geojson), None)) + } else { + rocket_contrib::json::Json(types::LiveUpdate::new( + last, + None, + Some("No rows returned".into()), + )) } - return rocket_contrib::json::Json(LiveUpdate { - typ: "GeoHubUpdate".into(), - last: last, - geo: None, - error: Some("No new rows returned".into()), - }); -} - -#[derive(serde::Serialize, Debug)] -struct LiveUpdate { - #[serde(rename = "type")] - typ: String, // always "GeoHubUpdate" - last: Option<i32>, - geo: Option<types::GeoJSON>, - error: Option<String>, } /// Wait for an update. /// Only one point is returned. To retrieve a history of points, call retrieve_last. #[rocket::get("/geo/<name>/retrieve/live?<secret>&<timeout>")] fn retrieve_live( - notify_manager: rocket::State<notifier::SendableSender<notifier::NotifyRequest>>, + notify_manager: rocket::State<notifier::NotifyManager>, name: String, secret: Option<String>, timeout: Option<u64>, @@ -67,33 +53,7 @@ ); } - // Ask the notify thread to tell us when there is an update for this client name and secret. - let (send, recv) = mpsc::channel(); - let send = notifier::SendableSender { - sender: Arc::new(Mutex::new(send)), - }; - - let req = notifier::NotifyRequest { - client: name.clone(), - secret: secret, - respond: send, - }; - notify_manager.send(req).unwrap(); - - if let Ok(response) = recv.recv_timeout(time::Duration::new(timeout.unwrap_or(30), 0)) { - return http::return_json(&LiveUpdate { - typ: "GeoHubUpdate".into(), - last: response.last, - geo: response.geo, - error: None, - }); - } - return http::return_json(&LiveUpdate { - typ: "GeoHubUpdate".into(), - last: None, - geo: None, - error: None, - }); + http::return_json(¬ify_manager.wait_for_notification(name, secret, timeout)) } /// Retrieve GeoJSON data. @@ -188,9 +148,9 @@ fn main() { let (send, recv) = mpsc::channel(); - let send = notifier::SendableSender { + let send = notifier::NotifyManager(notifier::SendableSender { sender: Arc::new(Mutex::new(send)), - }; + }); rocket::ignite() .attach(db::DBConn::fairing())
--- a/src/notifier.rs Thu Dec 03 09:25:29 2020 +0100 +++ b/src/notifier.rs Thu Dec 03 09:35:51 2020 +0100 @@ -1,12 +1,11 @@ - use crate::db; use crate::ids; use crate::types; +use fallible_iterator::FallibleIterator; use std::collections::HashMap; use std::sync::{mpsc, Arc, Mutex}; use std::time; -use fallible_iterator::FallibleIterator; /// Request of a web client thread to the notifier thread. pub struct NotifyRequest { @@ -35,6 +34,35 @@ } } +pub struct NotifyManager(pub SendableSender<NotifyRequest>); + +impl NotifyManager { + pub fn wait_for_notification( + &self, + client: String, + secret: Option<String>, + timeout: Option<u64>, + ) -> types::LiveUpdate { + let (send, recv) = mpsc::channel(); + let send = SendableSender { + sender: Arc::new(Mutex::new(send)), + }; + + let req = NotifyRequest { + client: client, + secret: secret, + respond: send, + }; + self.0.send(req).unwrap(); + + if let Ok(response) = recv.recv_timeout(time::Duration::new(timeout.unwrap_or(30), 0)) { + types::LiveUpdate::new(response.last, response.geo, None) + } else { + types::LiveUpdate::new(None, None, Some("timeout, try again".into())) + } + } +} + /// Listen for notifications in the database and dispatch to waiting clients. pub fn live_notifier_thread(rx: mpsc::Receiver<NotifyRequest>, db: postgres::Connection) { const TICK_MILLIS: u32 = 500; @@ -118,4 +146,3 @@ } } } -
--- a/src/types.rs Thu Dec 03 09:25:29 2020 +0100 +++ b/src/types.rs Thu Dec 03 09:35:51 2020 +0100 @@ -1,4 +1,3 @@ - /// Non-JSON plain point representation. #[derive(Debug, Clone)] pub struct GeoPoint { @@ -9,6 +8,26 @@ pub time: chrono::DateTime<chrono::Utc>, } +#[derive(serde::Serialize, Debug)] +pub struct LiveUpdate { + #[serde(rename = "type")] + typ: String, // always "GeoHubUpdate" + last: Option<i32>, + geo: Option<GeoJSON>, + error: Option<String>, +} + +impl LiveUpdate { + pub fn new(last: Option<i32>, geo: Option<GeoJSON>, err: Option<String>) -> LiveUpdate { + LiveUpdate { + typ: "GeoHubUpdate".into(), + last: last, + geo: geo, + error: err, + } + } +} + /// Fetch geodata as JSON. /// #[derive(serde::Serialize, Debug, Clone)] @@ -42,7 +61,10 @@ impl GeoJSON { pub fn new() -> GeoJSON { - GeoJSON { typ: "FeatureCollection".into(), features: vec![] } + GeoJSON { + typ: "FeatureCollection".into(), + features: vec![], + } } pub fn reserve_features(&mut self, cap: usize) { self.features.reserve(cap); @@ -72,4 +94,3 @@ }, } } -