changeset 37:08b4f7127980

Encapsulate notification messaging
author Lewin Bormann <lbo@spheniscida.de>
date Thu, 03 Dec 2020 09:35:51 +0100
parents ebdb9c50adb1
children b66a3ef84f24
files src/http.rs src/ids.rs src/main.rs src/notifier.rs src/types.rs src/util.rs
diffstat 6 files changed, 72 insertions(+), 67 deletions(-) [+]
line wrap: on
line diff
--- a/src/http.rs	Thu Dec 03 09:25:29 2020 +0100
+++ b/src/http.rs	Thu Dec 03 09:35:51 2020 +0100
@@ -1,13 +1,12 @@
-
 use rocket::response::Responder;
 
 #[derive(Responder)]
 pub enum GeoHubResponse {
-    #[response(status=200, content_type="json")]
+    #[response(status = 200, content_type = "json")]
     Ok(String),
-    #[response(status=400)]
+    #[response(status = 400)]
     BadRequest(String),
-    #[response(status=500)]
+    #[response(status = 500)]
     ServerError(String),
 }
 
--- a/src/ids.rs	Thu Dec 03 09:25:29 2020 +0100
+++ b/src/ids.rs	Thu Dec 03 09:35:51 2020 +0100
@@ -21,4 +21,3 @@
     assert!(parts.len() == 4);
     return (parts[2], parts[3]);
 }
-
--- a/src/main.rs	Thu Dec 03 09:25:29 2020 +0100
+++ b/src/main.rs	Thu Dec 03 09:35:51 2020 +0100
@@ -8,13 +8,13 @@
 mod util;
 
 use std::sync::{mpsc, Arc, Mutex};
-use std::time;
 
 use postgres;
 use rocket;
 
-/// Almost like retrieve/json, but sorts in descending order and doesn't work with intervals (only
-/// limit). Used for backfilling recent points in the UI.
+/// Almost like retrieve/json, but sorts in descending order, doesn't work with intervals (only
+/// limit), and returns a LiveUpdate.
+/// Used for backfilling recent points in the UI.
 #[rocket::get("/geo/<name>/retrieve/last?<secret>&<last>&<limit>")]
 fn retrieve_last(
     db: db::DBConn,
@@ -22,40 +22,26 @@
     secret: Option<String>,
     last: Option<i32>,
     limit: Option<i64>,
-) -> rocket_contrib::json::Json<LiveUpdate> {
+) -> rocket_contrib::json::Json<types::LiveUpdate> {
     let db = db::DBQuery(&db.0);
     if let Some((geojson, newlast)) =
         db.check_for_new_rows(&name, secret.as_ref().map(|s| s.as_str()), &last, &limit)
     {
-        return rocket_contrib::json::Json(LiveUpdate {
-            typ: "GeoHubUpdate".into(),
-            last: Some(newlast),
-            geo: Some(geojson),
-            error: None,
-        });
+        rocket_contrib::json::Json(types::LiveUpdate::new(Some(newlast), Some(geojson), None))
+    } else {
+        rocket_contrib::json::Json(types::LiveUpdate::new(
+            last,
+            None,
+            Some("No rows returned".into()),
+        ))
     }
-    return rocket_contrib::json::Json(LiveUpdate {
-        typ: "GeoHubUpdate".into(),
-        last: last,
-        geo: None,
-        error: Some("No new rows returned".into()),
-    });
-}
-
-#[derive(serde::Serialize, Debug)]
-struct LiveUpdate {
-    #[serde(rename = "type")]
-    typ: String, // always "GeoHubUpdate"
-    last: Option<i32>,
-    geo: Option<types::GeoJSON>,
-    error: Option<String>,
 }
 
 /// Wait for an update.
 /// Only one point is returned. To retrieve a history of points, call retrieve_last.
 #[rocket::get("/geo/<name>/retrieve/live?<secret>&<timeout>")]
 fn retrieve_live(
-    notify_manager: rocket::State<notifier::SendableSender<notifier::NotifyRequest>>,
+    notify_manager: rocket::State<notifier::NotifyManager>,
     name: String,
     secret: Option<String>,
     timeout: Option<u64>,
@@ -67,33 +53,7 @@
         );
     }
 
-    // Ask the notify thread to tell us when there is an update for this client name and secret.
-    let (send, recv) = mpsc::channel();
-    let send = notifier::SendableSender {
-        sender: Arc::new(Mutex::new(send)),
-    };
-
-    let req = notifier::NotifyRequest {
-        client: name.clone(),
-        secret: secret,
-        respond: send,
-    };
-    notify_manager.send(req).unwrap();
-
-    if let Ok(response) = recv.recv_timeout(time::Duration::new(timeout.unwrap_or(30), 0)) {
-        return http::return_json(&LiveUpdate {
-            typ: "GeoHubUpdate".into(),
-            last: response.last,
-            geo: response.geo,
-            error: None,
-        });
-    }
-    return http::return_json(&LiveUpdate {
-        typ: "GeoHubUpdate".into(),
-        last: None,
-        geo: None,
-        error: None,
-    });
+    http::return_json(&notify_manager.wait_for_notification(name, secret, timeout))
 }
 
 /// Retrieve GeoJSON data.
@@ -188,9 +148,9 @@
 
 fn main() {
     let (send, recv) = mpsc::channel();
-    let send = notifier::SendableSender {
+    let send = notifier::NotifyManager(notifier::SendableSender {
         sender: Arc::new(Mutex::new(send)),
-    };
+    });
 
     rocket::ignite()
         .attach(db::DBConn::fairing())
--- a/src/notifier.rs	Thu Dec 03 09:25:29 2020 +0100
+++ b/src/notifier.rs	Thu Dec 03 09:35:51 2020 +0100
@@ -1,12 +1,11 @@
-
 use crate::db;
 use crate::ids;
 use crate::types;
 
+use fallible_iterator::FallibleIterator;
 use std::collections::HashMap;
 use std::sync::{mpsc, Arc, Mutex};
 use std::time;
-use fallible_iterator::FallibleIterator;
 
 /// Request of a web client thread to the notifier thread.
 pub struct NotifyRequest {
@@ -35,6 +34,35 @@
     }
 }
 
+pub struct NotifyManager(pub SendableSender<NotifyRequest>);
+
+impl NotifyManager {
+    pub fn wait_for_notification(
+        &self,
+        client: String,
+        secret: Option<String>,
+        timeout: Option<u64>,
+    ) -> types::LiveUpdate {
+        let (send, recv) = mpsc::channel();
+        let send = SendableSender {
+            sender: Arc::new(Mutex::new(send)),
+        };
+
+        let req = NotifyRequest {
+            client: client,
+            secret: secret,
+            respond: send,
+        };
+        self.0.send(req).unwrap();
+
+        if let Ok(response) = recv.recv_timeout(time::Duration::new(timeout.unwrap_or(30), 0)) {
+            types::LiveUpdate::new(response.last, response.geo, None)
+        } else {
+            types::LiveUpdate::new(None, None, Some("timeout, try again".into()))
+        }
+    }
+}
+
 /// Listen for notifications in the database and dispatch to waiting clients.
 pub fn live_notifier_thread(rx: mpsc::Receiver<NotifyRequest>, db: postgres::Connection) {
     const TICK_MILLIS: u32 = 500;
@@ -118,4 +146,3 @@
         }
     }
 }
-
--- a/src/types.rs	Thu Dec 03 09:25:29 2020 +0100
+++ b/src/types.rs	Thu Dec 03 09:35:51 2020 +0100
@@ -1,4 +1,3 @@
-
 /// Non-JSON plain point representation.
 #[derive(Debug, Clone)]
 pub struct GeoPoint {
@@ -9,6 +8,26 @@
     pub time: chrono::DateTime<chrono::Utc>,
 }
 
+#[derive(serde::Serialize, Debug)]
+pub struct LiveUpdate {
+    #[serde(rename = "type")]
+    typ: String, // always "GeoHubUpdate"
+    last: Option<i32>,
+    geo: Option<GeoJSON>,
+    error: Option<String>,
+}
+
+impl LiveUpdate {
+    pub fn new(last: Option<i32>, geo: Option<GeoJSON>, err: Option<String>) -> LiveUpdate {
+        LiveUpdate {
+            typ: "GeoHubUpdate".into(),
+            last: last,
+            geo: geo,
+            error: err,
+        }
+    }
+}
+
 /// Fetch geodata as JSON.
 ///
 #[derive(serde::Serialize, Debug, Clone)]
@@ -42,7 +61,10 @@
 
 impl GeoJSON {
     pub fn new() -> GeoJSON {
-        GeoJSON { typ: "FeatureCollection".into(), features: vec![] }
+        GeoJSON {
+            typ: "FeatureCollection".into(),
+            features: vec![],
+        }
     }
     pub fn reserve_features(&mut self, cap: usize) {
         self.features.reserve(cap);
@@ -72,4 +94,3 @@
         },
     }
 }
-
--- a/src/util.rs	Thu Dec 03 09:25:29 2020 +0100
+++ b/src/util.rs	Thu Dec 03 09:35:51 2020 +0100
@@ -24,4 +24,3 @@
     }
     None
 }
-