changeset 63:fa2a46761712

Fix missed notifications for upper case IDs
author Lewin Bormann <lbo@spheniscida.de>
date Fri, 04 Dec 2020 14:25:41 +0100
parents 45d88bf5d99f
children 79b0186c7331
files TODO src/db.rs src/ids.rs src/main.rs src/notifier.rs
diffstat 5 files changed, 94 insertions(+), 45 deletions(-) [+]
line wrap: on
line diff
--- a/TODO	Fri Dec 04 13:57:09 2020 +0100
+++ b/TODO	Fri Dec 04 14:25:41 2020 +0100
@@ -1,5 +1,11 @@
 GENERAL
 
+* UI
+ * trackme: Tell user to fill secret/client
+
 FEATURES
 
 * GPX/json export (with UI + API)
+
+BUGS
+
--- a/src/db.rs	Fri Dec 04 13:57:09 2020 +0100
+++ b/src/db.rs	Fri Dec 04 14:25:41 2020 +0100
@@ -1,4 +1,3 @@
-use crate::ids;
 use crate::types;
 
 /// Managed by Rocket.
@@ -61,12 +60,6 @@
         let stmt = self.0.prepare_cached(
             r"INSERT INTO geohub.geodata (client, lat, long, spd, t, ele, secret, note, accuracy)
             VALUES ($1, $2, $3, $4, $5, $6, public.digest($7, 'sha256'), $8, $9)").unwrap();
-        let channel = format!(
-            "NOTIFY {}, '{}'",
-            ids::channel_name(name, secret.as_ref().unwrap_or(&"".into()).as_str()),
-            name
-        );
-        let notify = self.0.prepare_cached(channel.as_str()).unwrap();
         stmt.execute(&[
             &name,
             &point.lat,
@@ -77,10 +70,7 @@
             &secret,
             &point.note,
             &point.accuracy,
-        ])
-        .unwrap();
-        notify.execute(&[]).unwrap();
-        Ok(())
+        ]).map(|_| ())
     }
 
     /// Queries for at most `limit` rows since entry ID `last`.
@@ -137,9 +127,8 @@
                 return Some((returnable, last));
             }
             return None;
-        } else {
-            // For debugging.
-            rows.unwrap();
+        } else if let Err(e) = rows {
+            eprintln!("check_for_new_rows: Couldn't check new rows: {}", e);
         }
         return None;
     }
--- a/src/ids.rs	Fri Dec 04 13:57:09 2020 +0100
+++ b/src/ids.rs	Fri Dec 04 14:25:41 2020 +0100
@@ -6,18 +6,3 @@
             .chars()
             .any(|c| !c.is_ascii_alphanumeric()))
 }
-
-/// Build a channel name from a client name and secret.
-pub fn channel_name(client: &str, secret: &str) -> String {
-    // The log handler should check this.
-    assert!(secret.find('_').is_none());
-    format!("geohubclient_update_{}_{}", client, secret)
-}
-
-/// Extract client name and secret from the database channel name.
-pub fn client_secret(channel_name: &str) -> (&str, &str) {
-    // Channel name is like geohubclient_update_<client>_<secret>
-    let parts = channel_name.split('_').collect::<Vec<&str>>();
-    assert!(parts.len() == 4);
-    return (parts[2], parts[3]);
-}
--- a/src/main.rs	Fri Dec 04 13:57:09 2020 +0100
+++ b/src/main.rs	Fri Dec 04 14:25:41 2020 +0100
@@ -127,6 +127,7 @@
 )]
 fn log(
     db: db::DBConn,
+    notify_manager: rocket::State<notifier::NotifyManager>,
     name: String,
     lat: f64,
     longitude: f64,
@@ -178,6 +179,9 @@
     if let Err(e) = db.log_geopoint(name.as_str(), &secret, &point) {
         return http::server_error(e.to_string());
     }
+    if let Err(e) = notify_manager.send_notification(&db, name.as_str(), &secret) {
+        eprintln!("Couldn't send notification: {}", e);
+    }
     http::GeoHubResponse::Ok("".into())
 }
 
--- a/src/notifier.rs	Fri Dec 04 13:57:09 2020 +0100
+++ b/src/notifier.rs	Fri Dec 04 14:25:41 2020 +0100
@@ -1,5 +1,4 @@
 use crate::db;
-use crate::ids;
 use crate::types;
 
 use fallible_iterator::FallibleIterator;
@@ -34,6 +33,33 @@
     }
 }
 
+fn encode_notify_payload(client: &str, secret: &Option<String>) -> String {
+    format!(
+        "{} {}",
+        client,
+        secret.as_ref().map(|s| s.as_str()).unwrap_or("")
+    )
+}
+
+fn decode_notify_payload(payload: &str) -> (String, Option<String>) {
+    let parts = payload.split(' ').collect::<Vec<&str>>();
+    assert!(parts.len() >= 1);
+    let secret = if parts.len() > 1 {
+        Some(parts[1].into())
+    } else {
+        None
+    };
+    return (parts[0].into(), secret);
+}
+
+/// Build a channel name from a client name and secret.
+fn channel_name(client: &str, secret: &str) -> String {
+    // The log handler should already have checked this.
+    assert!(secret.find(' ').is_none());
+    assert!(client.find(' ').is_none());
+    format!("geohubclient_update_{}_{}", client, secret)
+}
+
 pub struct NotifyManager(pub SendableSender<NotifyRequest>);
 
 impl NotifyManager {
@@ -61,6 +87,21 @@
             types::LiveUpdate::new(None, None, Some("timeout, try again".into()))
         }
     }
+
+    pub fn send_notification(
+        &self,
+        dbq: &db::DBQuery,
+        client: &str,
+        secret: &Option<String>,
+    ) -> Result<u64, postgres::Error> {
+        let channel = format!(
+            "NOTIFY {}, '{}'",
+            channel_name(client, secret.as_ref().unwrap_or(&"".into()).as_str()),
+            encode_notify_payload(client, secret),
+        );
+        let notify = dbq.0.prepare_cached(channel.as_str()).unwrap();
+        notify.execute(&[])
+    }
 }
 
 /// Listen for notifications in the database and dispatch to waiting clients.
@@ -70,17 +111,38 @@
     let mut clients: HashMap<String, Vec<NotifyRequest>> = HashMap::new();
     let db = db::DBQuery(&db);
 
-    fn listen(db: &postgres::Connection, client: &str, secret: &str) -> postgres::Result<u64> {
+    fn listen(
+        db: &postgres::Connection,
+        client: &str,
+        secret: &Option<String>,
+    ) -> postgres::Result<u64> {
         let n = db
             .execute(
-                &format!("LISTEN {}", ids::channel_name(client, secret).as_str()),
+                &format!(
+                    "LISTEN {}",
+                    channel_name(client, secret.as_ref().map(|s| s.as_str()).unwrap_or(""))
+                        .as_str()
+                ),
                 &[],
             )
             .unwrap();
         Ok(n)
     }
-    fn unlisten(db: &postgres::Connection, chan: &str) -> postgres::Result<u64> {
-        let n = db.execute(&format!("UNLISTEN {}", chan), &[]).unwrap();
+    fn unlisten(
+        db: &postgres::Connection,
+        client: &str,
+        secret: &Option<String>,
+    ) -> postgres::Result<u64> {
+        let n = db
+            .execute(
+                &format!(
+                    "UNLISTEN {}",
+                    channel_name(client, secret.as_ref().map(|s| s.as_str()).unwrap_or(""))
+                        .as_str()
+                ),
+                &[],
+            )
+            .unwrap();
         Ok(n)
     }
 
@@ -91,12 +153,13 @@
         // We listen per client and secret to separate clients with different sessions (by secret).
         loop {
             if let Ok(nrq) = rx.try_recv() {
-                let secret = nrq.secret.as_ref().map(|s| s.as_str()).unwrap_or("");
-                let chan_name = ids::channel_name(nrq.client.as_str(), secret);
-                if !clients.contains_key(&chan_name) {
-                    listen(db.0, &nrq.client, secret).ok();
+                // client_id is also the payload sent to the channel. It keys waiters by client and
+                // secret.
+                let client_id = encode_notify_payload(nrq.client.as_str(), &nrq.secret);
+                if !clients.contains_key(&client_id) {
+                    listen(db.0, &nrq.client, &nrq.secret).ok();
                 }
-                clients.entry(chan_name).or_insert(vec![]).push(nrq);
+                clients.entry(client_id).or_insert(vec![]).push(nrq);
             } else {
                 break;
             }
@@ -109,15 +172,17 @@
         let mut count = 0;
 
         while let Ok(Some(notification)) = iter.next() {
-            let chan = notification.channel;
-            let (client, secret) = ids::client_secret(chan.as_str());
-            unlisten(db.0, &chan).ok();
+            // We can extract the client and secret from the channel payload. The payload itself is
+            // the hashmap key.
+            let client_id = notification.payload;
+            let (client, secret) = decode_notify_payload(client_id.as_str());
+            unlisten(db.0, client.as_str(), &secret).ok();
 
             // These queries use the primary key index returning one row only and will be quite fast.
             // Still: One query per client.
-            let rows = db.check_for_new_rows(client, &Some(secret.into()), &None, &Some(1));
+            let rows = db.check_for_new_rows(client.as_str(), &secret, &None, &Some(1));
             if let Some((geo, last)) = rows {
-                for request in clients.remove(&chan).unwrap_or(vec![]) {
+                for request in clients.remove(&client_id).unwrap_or(vec![]) {
                     request
                         .respond
                         .send(NotifyResponse {
@@ -127,7 +192,7 @@
                         .ok();
                 }
             } else {
-                for request in clients.remove(&chan).unwrap_or(vec![]) {
+                for request in clients.remove(&client_id).unwrap_or(vec![]) {
                     request
                         .respond
                         .send(NotifyResponse {