changeset 80:8ec6df976a19

Enable ingestion of JSON batches for Overland, e.g.
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 05 Dec 2020 22:14:15 +0100
parents b53fcb02a550
children e75a327cca26
files src/main.rs src/notifier.rs src/types.rs
diffstat 3 files changed, 138 insertions(+), 22 deletions(-) [+]
line wrap: on
line diff
--- a/src/main.rs	Sat Dec 05 22:13:50 2020 +0100
+++ b/src/main.rs	Sat Dec 05 22:14:15 2020 +0100
@@ -34,10 +34,15 @@
     };
     let db = db::DBQuery(&db.0);
     if let Some((geojson, newlast)) = db.check_for_new_rows(&client, &secret, &last, &limit) {
-        rocket_contrib::json::Json(types::LiveUpdate::new(client, Some(newlast), Some(geojson), None))
+        rocket_contrib::json::Json(types::LiveUpdate::new(
+            client,
+            Some(newlast),
+            Some(geojson),
+            None,
+        ))
     } else {
         rocket_contrib::json::Json(types::LiveUpdate::new(
-                client,
+            client,
             last,
             None,
             Some("No rows returned".into()),
@@ -46,7 +51,8 @@
 }
 
 /// Wait for an update.
-/// Only one point is returned. To retrieve a history of points, call retrieve_last.
+/// Usually, one point is returned, but if a client sent several at once, all the points will be
+/// delivered.
 #[rocket::get("/geo/<name>/retrieve/live?<secret>&<timeout>")]
 fn retrieve_live(
     notify_manager: rocket::State<notifier::NotifyManager>,
@@ -120,6 +126,8 @@
 
 /// Ingest geo data.
 
+/// Ingest individual points by URL query string.
+///
 /// time is like 2020-11-30T20:12:36.444Z (ISO 8601). By default, server time is set.
 /// secret can be used to protect points.
 #[rocket::post(
@@ -139,7 +147,6 @@
     accuracy: Option<f64>,
     note: rocket::data::Data,
 ) -> http::GeoHubResponse {
-    let db = db::DBQuery(&db.0);
     // Check that secret and client name are legal.
     if !ids::name_and_secret_acceptable(name.as_str(), secret.as_ref().map(|s| s.as_str())) {
         return http::bad_request(
@@ -156,6 +163,7 @@
     } else {
         secret
     };
+    let db = db::DBQuery(&db.0);
 
     let mut ts = chrono::Utc::now();
     if let Some(time) = time {
@@ -164,7 +172,13 @@
 
     // Length-limit notes.
     let note = match http::read_data(note, 4096) {
-        Ok(n) => { if n.is_empty() { None } else { Some(n) } },
+        Ok(n) => {
+            if n.is_empty() {
+                None
+            } else {
+                Some(n)
+            }
+        }
         Err(e) => return e,
     };
 
@@ -180,12 +194,70 @@
     if let Err(e) = db.log_geopoint(name.as_str(), &secret, &point) {
         return http::server_error(e.to_string());
     }
-    if let Err(e) = notify_manager.send_notification(&db, name.as_str(), &secret) {
+    if let Err(e) = notify_manager.send_notification(&db, name.as_str(), &secret, Some(1)) {
         eprintln!("Couldn't send notification: {}", e);
     }
     http::GeoHubResponse::Ok("".into())
 }
 
+/// Ingest GeoJSON.
+#[rocket::post("/geo/<name>/logjson?<secret>", data = "<body>")]
+fn log_json(
+    db: db::DBConn,
+    notify_manager: rocket::State<notifier::NotifyManager>,
+    name: String,
+    secret: Option<String>,
+    body: rocket_contrib::json::Json<types::LogLocations>,
+) -> http::GeoHubResponse {
+    // Check that secret and client name are legal.
+    if !ids::name_and_secret_acceptable(name.as_str(), secret.as_ref().map(|s| s.as_str())) {
+        return http::bad_request(
+            "You have supplied an invalid secret or name. Both must be ASCII alphanumeric strings."
+                .into(),
+        );
+    }
+    let secret = if let Some(secret) = secret {
+        if secret.is_empty() {
+            None
+        } else {
+            Some(secret)
+        }
+    } else {
+        secret
+    };
+    let db = db::DBQuery(&db.0);
+
+    let geofeats = body.into_inner().locations;
+    let nrows = geofeats.len() as i64;
+
+    // Due to prepared statements, this isn't as bad as it looks.
+    let mut errs = vec![];
+    for feat in geofeats {
+        let point = types::geopoint_from_feature(feat);
+        if let Err(e) = db.log_geopoint(name.as_str(), &secret, &point) {
+            errs.push(e);
+        }
+    }
+
+    // Only notify once.
+    if let Err(e) = notify_manager.send_notification(&db, name.as_str(), &secret, Some(nrows)) {
+        eprintln!("Couldn't send notification: {}", e);
+    }
+
+    if errs.is_empty() {
+        http::GeoHubResponse::Ok("".into())
+    } else {
+        let errstring = errs
+            .into_iter()
+            .take(10)
+            .map(|e| e.to_string())
+            .collect::<Vec<String>>()
+            .join(";");
+        eprintln!("Couldn't write points: {}", errstring);
+        http::GeoHubResponse::Ok(errstring)
+    }
+}
+
 /// Serve static files.
 #[rocket::get("/geo/assets/<file..>")]
 fn assets(
@@ -218,7 +290,7 @@
         ))
         .mount(
             "/",
-            rocket::routes![log, retrieve_json, retrieve_last, retrieve_live, assets],
+            rocket::routes![log, log_json, retrieve_json, retrieve_last, retrieve_live, assets],
         )
         .launch();
 }
--- a/src/notifier.rs	Sat Dec 05 22:13:50 2020 +0100
+++ b/src/notifier.rs	Sat Dec 05 22:14:15 2020 +0100
@@ -1,8 +1,10 @@
+
 use crate::db;
 use crate::types;
 
 use fallible_iterator::FallibleIterator;
 use std::collections::HashMap;
+use std::str::FromStr;
 use std::sync::{mpsc, Arc, Mutex};
 use std::time;
 
@@ -33,15 +35,24 @@
     }
 }
 
-fn encode_notify_payload(client: &str, secret: &Option<String>) -> String {
+fn encode_client_id(client: &str, secret: &Option<String>) -> String {
     format!(
         "{} {}",
         client,
-        secret.as_ref().map(|s| s.as_str()).unwrap_or("")
+        secret.as_ref().map(|s| s.as_str()).unwrap_or(""),
     )
 }
 
-fn decode_notify_payload(payload: &str) -> (String, Option<String>) {
+fn encode_notify_payload(client: &str, secret: &Option<String>, nrows: Option<i64>) -> String {
+    format!(
+        "{} {} {}",
+        client,
+        secret.as_ref().map(|s| s.as_str()).unwrap_or(""),
+        nrows.unwrap_or(1)
+    )
+}
+
+fn decode_notify_payload(payload: &str) -> (String, Option<String>, Option<i64>) {
     let parts = payload.split(' ').collect::<Vec<&str>>();
     assert!(parts.len() >= 1);
     let secret = if parts.len() > 1 {
@@ -49,7 +60,12 @@
     } else {
         None
     };
-    return (parts[0].into(), secret);
+    let nrows = if parts.len() > 2 {
+        i64::from_str(parts[2]).ok()
+    } else {
+        None
+    };
+    return (parts[0].into(), secret, nrows);
 }
 
 /// Build a channel name from a client name and secret.
@@ -93,11 +109,12 @@
         dbq: &db::DBQuery,
         client: &str,
         secret: &Option<String>,
+        nrows: Option<i64>,
     ) -> Result<u64, postgres::Error> {
         let channel = format!(
             "NOTIFY {}, '{}'",
             channel_name(client, secret.as_ref().unwrap_or(&"".into()).as_str()),
-            encode_notify_payload(client, secret),
+            encode_notify_payload(client, secret, nrows),
         );
         let notify = dbq.0.prepare_cached(channel.as_str()).unwrap();
         notify.execute(&[])
@@ -155,7 +172,7 @@
             if let Ok(nrq) = rx.try_recv() {
                 // client_id is also the payload sent to the channel. It keys waiters by client and
                 // secret.
-                let client_id = encode_notify_payload(nrq.client.as_str(), &nrq.secret);
+                let client_id = encode_client_id(nrq.client.as_str(), &nrq.secret);
                 if !clients.contains_key(&client_id) {
                     listen(db.0, &nrq.client, &nrq.secret).ok();
                 }
@@ -174,12 +191,13 @@
         while let Ok(Some(notification)) = iter.next() {
             // We can extract the client and secret from the channel payload. The payload itself is
             // the hashmap key.
-            let client_id = notification.payload;
-            let (client, secret) = decode_notify_payload(client_id.as_str());
+            let (client, secret, nrows) = decode_notify_payload(&notification.payload);
+            let client_id = encode_client_id(&client, &secret);
+
             unlisten(db.0, client.as_str(), &secret).ok();
 
             // These queries use the primary key index returning one row only and will be quite fast.
-            let rows = db.check_for_new_rows(client.as_str(), &secret, &None, &Some(1));
+            let rows = db.check_for_new_rows(client.as_str(), &secret, &None, &Some(nrows.unwrap_or(1)));
             if let Some((geo, last)) = rows {
                 for request in clients.remove(&client_id).unwrap_or(vec![]) {
                     request
--- a/src/types.rs	Sat Dec 05 22:13:50 2020 +0100
+++ b/src/types.rs	Sat Dec 05 22:14:15 2020 +0100
@@ -1,4 +1,4 @@
-/// Non-JSON plain point representation.
+/// Non-JSON plain point representation. Flat and representing a database row.
 #[derive(Debug, Clone)]
 pub struct GeoPoint {
     pub lat: f64,
@@ -21,7 +21,12 @@
 }
 
 impl LiveUpdate {
-    pub fn new(client: String, last: Option<i32>, geo: Option<GeoJSON>, err: Option<String>) -> LiveUpdate {
+    pub fn new(
+        client: String,
+        last: Option<i32>,
+        geo: Option<GeoJSON>,
+        err: Option<String>,
+    ) -> LiveUpdate {
         LiveUpdate {
             typ: "GeoHubUpdate".into(),
             client: client,
@@ -32,13 +37,20 @@
     }
 }
 
+#[derive(serde::Serialize, serde::Deserialize, Debug)]
+pub struct LogLocations {
+    pub locations: Vec<GeoFeature>,
+}
+
 /// Fetch geodata as JSON.
 ///
-#[derive(serde::Serialize, Debug, Clone)]
+#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
 pub struct GeoProperties {
+    #[serde(alias = "timestamp")]
     time: chrono::DateTime<chrono::Utc>,
     altitude: Option<f64>,
     speed: Option<f64>,
+    #[serde(alias = "horizontal_accuracy")]
     accuracy: Option<f64>,
     /// The unique ID of the point.
     id: Option<i32>,
@@ -46,14 +58,14 @@
     note: Option<String>,
 }
 
-#[derive(serde::Serialize, Debug, Clone)]
+#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
 pub struct GeoGeometry {
     #[serde(rename = "type")]
     typ: String, // always "Point"
     coordinates: (f64, f64), // always [long, lat]
 }
 
-#[derive(serde::Serialize, Debug, Clone)]
+#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
 pub struct GeoFeature {
     #[serde(rename = "type")]
     typ: String, // always "Feature"
@@ -61,7 +73,7 @@
     geometry: GeoGeometry,
 }
 
-#[derive(serde::Serialize, Debug, Clone)]
+#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
 pub struct GeoJSON {
     #[serde(rename = "type")]
     typ: String, // always "FeatureCollection"
@@ -100,3 +112,17 @@
         },
     }
 }
+
+pub fn geopoint_from_feature(feat: GeoFeature) -> GeoPoint {
+    let geo = feat.geometry;
+    let prop = feat.properties;
+    GeoPoint {
+        accuracy: prop.accuracy,
+        ele: prop.altitude,
+        long: geo.coordinates.0,
+        lat: geo.coordinates.1,
+        note: None,
+        spd: prop.speed,
+        time: prop.time,
+    }
+}