Mercurial > lbo > hg > geohub
changeset 63:fa2a46761712
Fix missed notifications for upper case IDs
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Fri, 04 Dec 2020 14:25:41 +0100 |
parents | 45d88bf5d99f |
children | 79b0186c7331 |
files | TODO src/db.rs src/ids.rs src/main.rs src/notifier.rs |
diffstat | 5 files changed, 94 insertions(+), 45 deletions(-) [+] |
line wrap: on
line diff
--- a/TODO Fri Dec 04 13:57:09 2020 +0100 +++ b/TODO Fri Dec 04 14:25:41 2020 +0100 @@ -1,5 +1,11 @@ GENERAL +* UI + * trackme: Tell user to fill secret/client + FEATURES * GPX/json export (with UI + API) + +BUGS +
--- a/src/db.rs Fri Dec 04 13:57:09 2020 +0100 +++ b/src/db.rs Fri Dec 04 14:25:41 2020 +0100 @@ -1,4 +1,3 @@ -use crate::ids; use crate::types; /// Managed by Rocket. @@ -61,12 +60,6 @@ let stmt = self.0.prepare_cached( r"INSERT INTO geohub.geodata (client, lat, long, spd, t, ele, secret, note, accuracy) VALUES ($1, $2, $3, $4, $5, $6, public.digest($7, 'sha256'), $8, $9)").unwrap(); - let channel = format!( - "NOTIFY {}, '{}'", - ids::channel_name(name, secret.as_ref().unwrap_or(&"".into()).as_str()), - name - ); - let notify = self.0.prepare_cached(channel.as_str()).unwrap(); stmt.execute(&[ &name, &point.lat, @@ -77,10 +70,7 @@ &secret, &point.note, &point.accuracy, - ]) - .unwrap(); - notify.execute(&[]).unwrap(); - Ok(()) + ]).map(|_| ()) } /// Queries for at most `limit` rows since entry ID `last`. @@ -137,9 +127,8 @@ return Some((returnable, last)); } return None; - } else { - // For debugging. - rows.unwrap(); + } else if let Err(e) = rows { + eprintln!("check_for_new_rows: Couldn't check new rows: {}", e); } return None; }
--- a/src/ids.rs Fri Dec 04 13:57:09 2020 +0100 +++ b/src/ids.rs Fri Dec 04 14:25:41 2020 +0100 @@ -6,18 +6,3 @@ .chars() .any(|c| !c.is_ascii_alphanumeric())) } - -/// Build a channel name from a client name and secret. -pub fn channel_name(client: &str, secret: &str) -> String { - // The log handler should check this. - assert!(secret.find('_').is_none()); - format!("geohubclient_update_{}_{}", client, secret) -} - -/// Extract client name and secret from the database channel name. -pub fn client_secret(channel_name: &str) -> (&str, &str) { - // Channel name is like geohubclient_update_<client>_<secret> - let parts = channel_name.split('_').collect::<Vec<&str>>(); - assert!(parts.len() == 4); - return (parts[2], parts[3]); -}
--- a/src/main.rs Fri Dec 04 13:57:09 2020 +0100 +++ b/src/main.rs Fri Dec 04 14:25:41 2020 +0100 @@ -127,6 +127,7 @@ )] fn log( db: db::DBConn, + notify_manager: rocket::State<notifier::NotifyManager>, name: String, lat: f64, longitude: f64, @@ -178,6 +179,9 @@ if let Err(e) = db.log_geopoint(name.as_str(), &secret, &point) { return http::server_error(e.to_string()); } + if let Err(e) = notify_manager.send_notification(&db, name.as_str(), &secret) { + eprintln!("Couldn't send notification: {}", e); + } http::GeoHubResponse::Ok("".into()) }
--- a/src/notifier.rs Fri Dec 04 13:57:09 2020 +0100 +++ b/src/notifier.rs Fri Dec 04 14:25:41 2020 +0100 @@ -1,5 +1,4 @@ use crate::db; -use crate::ids; use crate::types; use fallible_iterator::FallibleIterator; @@ -34,6 +33,33 @@ } } +fn encode_notify_payload(client: &str, secret: &Option<String>) -> String { + format!( + "{} {}", + client, + secret.as_ref().map(|s| s.as_str()).unwrap_or("") + ) +} + +fn decode_notify_payload(payload: &str) -> (String, Option<String>) { + let parts = payload.split(' ').collect::<Vec<&str>>(); + assert!(parts.len() >= 1); + let secret = if parts.len() > 1 { + Some(parts[1].into()) + } else { + None + }; + return (parts[0].into(), secret); +} + +/// Build a channel name from a client name and secret. +fn channel_name(client: &str, secret: &str) -> String { + // The log handler should already have checked this. + assert!(secret.find(' ').is_none()); + assert!(client.find(' ').is_none()); + format!("geohubclient_update_{}_{}", client, secret) +} + pub struct NotifyManager(pub SendableSender<NotifyRequest>); impl NotifyManager { @@ -61,6 +87,21 @@ types::LiveUpdate::new(None, None, Some("timeout, try again".into())) } } + + pub fn send_notification( + &self, + dbq: &db::DBQuery, + client: &str, + secret: &Option<String>, + ) -> Result<u64, postgres::Error> { + let channel = format!( + "NOTIFY {}, '{}'", + channel_name(client, secret.as_ref().unwrap_or(&"".into()).as_str()), + encode_notify_payload(client, secret), + ); + let notify = dbq.0.prepare_cached(channel.as_str()).unwrap(); + notify.execute(&[]) + } } /// Listen for notifications in the database and dispatch to waiting clients. @@ -70,17 +111,38 @@ let mut clients: HashMap<String, Vec<NotifyRequest>> = HashMap::new(); let db = db::DBQuery(&db); - fn listen(db: &postgres::Connection, client: &str, secret: &str) -> postgres::Result<u64> { + fn listen( + db: &postgres::Connection, + client: &str, + secret: &Option<String>, + ) -> postgres::Result<u64> { let n = db .execute( - &format!("LISTEN {}", ids::channel_name(client, secret).as_str()), + &format!( + "LISTEN {}", + channel_name(client, secret.as_ref().map(|s| s.as_str()).unwrap_or("")) + .as_str() + ), &[], ) .unwrap(); Ok(n) } - fn unlisten(db: &postgres::Connection, chan: &str) -> postgres::Result<u64> { - let n = db.execute(&format!("UNLISTEN {}", chan), &[]).unwrap(); + fn unlisten( + db: &postgres::Connection, + client: &str, + secret: &Option<String>, + ) -> postgres::Result<u64> { + let n = db + .execute( + &format!( + "UNLISTEN {}", + channel_name(client, secret.as_ref().map(|s| s.as_str()).unwrap_or("")) + .as_str() + ), + &[], + ) + .unwrap(); Ok(n) } @@ -91,12 +153,13 @@ // We listen per client and secret to separate clients with different sessions (by secret). loop { if let Ok(nrq) = rx.try_recv() { - let secret = nrq.secret.as_ref().map(|s| s.as_str()).unwrap_or(""); - let chan_name = ids::channel_name(nrq.client.as_str(), secret); - if !clients.contains_key(&chan_name) { - listen(db.0, &nrq.client, secret).ok(); + // client_id is also the payload sent to the channel. It keys waiters by client and + // secret. + let client_id = encode_notify_payload(nrq.client.as_str(), &nrq.secret); + if !clients.contains_key(&client_id) { + listen(db.0, &nrq.client, &nrq.secret).ok(); } - clients.entry(chan_name).or_insert(vec![]).push(nrq); + clients.entry(client_id).or_insert(vec![]).push(nrq); } else { break; } @@ -109,15 +172,17 @@ let mut count = 0; while let Ok(Some(notification)) = iter.next() { - let chan = notification.channel; - let (client, secret) = ids::client_secret(chan.as_str()); - unlisten(db.0, &chan).ok(); + // We can extract the client and secret from the channel payload. The payload itself is + // the hashmap key. + let client_id = notification.payload; + let (client, secret) = decode_notify_payload(client_id.as_str()); + unlisten(db.0, client.as_str(), &secret).ok(); // These queries use the primary key index returning one row only and will be quite fast. // Still: One query per client. - let rows = db.check_for_new_rows(client, &Some(secret.into()), &None, &Some(1)); + let rows = db.check_for_new_rows(client.as_str(), &secret, &None, &Some(1)); if let Some((geo, last)) = rows { - for request in clients.remove(&chan).unwrap_or(vec![]) { + for request in clients.remove(&client_id).unwrap_or(vec![]) { request .respond .send(NotifyResponse { @@ -127,7 +192,7 @@ .ok(); } } else { - for request in clients.remove(&chan).unwrap_or(vec![]) { + for request in clients.remove(&client_id).unwrap_or(vec![]) { request .respond .send(NotifyResponse {