changeset 10:625c0c52e631

Implement live updates
author Lewin Bormann <lbo@spheniscida.de>
date Tue, 01 Dec 2020 22:44:34 +0100
parents b8b99af28199
children d8312d4db6a6
files Cargo.lock Cargo.toml pgsql_schema.sql src/main.rs
diffstat 4 files changed, 141 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/Cargo.lock	Tue Dec 01 21:39:06 2020 +0100
+++ b/Cargo.lock	Tue Dec 01 22:44:34 2020 +0100
@@ -393,6 +393,7 @@
 version = "0.1.0"
 dependencies = [
  "chrono",
+ "fallible-iterator",
  "postgres",
  "rocket",
  "rocket_contrib",
@@ -1106,6 +1107,8 @@
  "r2d2_postgres",
  "rocket",
  "rocket_contrib_codegen",
+ "serde",
+ "serde_json",
 ]
 
 [[package]]
--- a/Cargo.toml	Tue Dec 01 21:39:06 2020 +0100
+++ b/Cargo.toml	Tue Dec 01 22:44:34 2020 +0100
@@ -12,9 +12,10 @@
 chrono = { version = "^0.4", features = ["serde"] }
 serde = { version = "~1.0", features = ["derive"] }
 serde_json = "~1.0"
+fallible-iterator = "~0.1"
 
 [dependencies.rocket_contrib]
 version = "~0.4"
 default-features = false
-features = ["postgres_pool"]
+features = ["postgres_pool", "json"]
 
--- a/pgsql_schema.sql	Tue Dec 01 21:39:06 2020 +0100
+++ b/pgsql_schema.sql	Tue Dec 01 22:44:34 2020 +0100
@@ -27,13 +27,14 @@
 CREATE SCHEMA IF NOT EXISTS geohub;
 
 CREATE TABLE geohub.geodata (
-    id text not null,
-    secret text,
+    id serial primary key,
+    client text not null,
     lat double precision,
     long double precision,
     spd double precision,
     t timestamp with time zone not null,
     ele double precision
+    secret bytea,
 );
 
 
--- a/src/main.rs	Tue Dec 01 21:39:06 2020 +0100
+++ b/src/main.rs	Tue Dec 01 22:44:34 2020 +0100
@@ -5,6 +5,9 @@
 
 use chrono::TimeZone;
 
+use fallible_iterator::FallibleIterator;
+use std::iter::Iterator;
+
 #[rocket_contrib::database("geohub")]
 struct DBConn(postgres::Connection);
 
@@ -83,6 +86,127 @@
     features: Vec<GeoFeature>,
 }
 
+#[derive(serde::Serialize, Debug)]
+struct LiveUpdate {
+    #[serde(rename = "type")]
+    typ: String, // always "GeoHubUpdate"
+    last: Option<i32>, // page token -- send in next request!
+    geo: Option<GeoJSON>,
+}
+
+/// Queries for at most `limit` rows since entry ID `last`.
+fn check_for_new_rows(
+    db: &DBConn,
+    name: &String,
+    secret: &Option<String>,
+    last: &Option<i32>,
+    limit: &Option<i64>,
+) -> Option<(GeoJSON, i32)> {
+    let mut returnable = GeoJSON {
+        typ: "FeatureCollection".into(),
+        features: vec![],
+    };
+    let check_for_new = db.0.prepare_cached(
+        r"SELECT id, t, lat, long, spd, ele FROM geohub.geodata
+        WHERE (client = $1) and (id > $2) AND (secret = public.digest($3, 'sha256') or secret is null)
+        ORDER BY t DESC
+        LIMIT $4").unwrap(); // Must succeed.
+
+    let last = last.unwrap_or(0);
+    let limit = limit.unwrap_or(256);
+
+    let rows = check_for_new.query(&[&name, &last, &secret, &limit]);
+    if let Ok(rows) = rows {
+        // If there are unknown entries, return those.
+        if rows.len() > 0 {
+            returnable.features = Vec::with_capacity(rows.len());
+            let mut last = 0;
+
+            for row in rows.iter() {
+                let (id, ts, lat, long, spd, ele): (
+                    i32,
+                    chrono::DateTime<chrono::Utc>,
+                    Option<f64>,
+                    Option<f64>,
+                    Option<f64>,
+                    Option<f64>,
+                ) = (
+                    row.get(0),
+                    row.get(1),
+                    row.get(2),
+                    row.get(3),
+                    row.get(4),
+                    row.get(5),
+                );
+                returnable
+                    .features
+                    .push(geofeature_from_row(ts, lat, long, spd, ele));
+                if id > last {
+                    last = id;
+                }
+            }
+
+            return Some((returnable, last));
+        }
+        return None;
+    } else {
+        // For debugging.
+        rows.unwrap();
+    }
+    return None;
+}
+
+/// Wait for an update.
+#[rocket::get("/geo/<name>/retrieve/live?<secret>&<last>&<timeout>")]
+fn retrieve_live(
+    db: DBConn,
+    name: String,
+    secret: Option<String>,
+    last: Option<i32>,
+    timeout: Option<i32>,
+) -> rocket_contrib::json::Json<LiveUpdate> {
+    // Only if the client supplied a paging token should we check for new rows before. This is an
+    // optimization.
+    if last.is_some() {
+        if let Some((geojson, last)) = check_for_new_rows(&db, &name, &secret, &last, &None) {
+            return rocket_contrib::json::Json(LiveUpdate {
+                typ: "GeoHubUpdate".into(),
+                last: Some(last),
+                geo: Some(geojson),
+            });
+        }
+    }
+
+    // Otherwise we will wait for the next update.
+    //
+    let listen =
+        db.0.prepare_cached(format!("LISTEN {}", name).as_str())
+            .unwrap();
+    let unlisten =
+        db.0.prepare_cached(format!("UNLISTEN {}", name).as_str())
+            .unwrap();
+
+    listen.execute(&[]).ok();
+
+    let timeout = std::time::Duration::new(timeout.unwrap_or(30) as u64, 0);
+    if let Ok(_) = db.0.notifications().timeout_iter(timeout).next() {
+        unlisten.execute(&[]).ok();
+        if let Some((geojson, last)) = check_for_new_rows(&db, &name, &secret, &last, &Some(1)) {
+            return rocket_contrib::json::Json(LiveUpdate {
+                typ: "GeoHubUpdate".into(),
+                last: Some(last),
+                geo: Some(geojson),
+            });
+        }
+    }
+    unlisten.execute(&[]).ok();
+    return rocket_contrib::json::Json(LiveUpdate {
+        typ: "GeoHubUpdate".into(),
+        last: last,
+        geo: None,
+    });
+}
+
 /// Retrieve GeoJSON data.
 #[rocket::get("/geo/<name>/retrieve/json?<secret>&<from>&<to>&<max>")]
 fn retrieve_json(
@@ -92,7 +216,7 @@
     from: Option<String>,
     to: Option<String>,
     max: Option<i64>,
-) -> rocket::response::content::Json<String> {
+) -> rocket_contrib::json::Json<GeoJSON> {
     let mut returnable = GeoJSON {
         typ: "FeatureCollection".into(),
         features: vec![],
@@ -111,12 +235,11 @@
 
     let stmt = db.0.prepare_cached(
         r"SELECT t, lat, long, spd, ele FROM geohub.geodata
-        WHERE (id = $1) and (t between $2 and $3) AND (secret = public.digest($4, 'sha256') or secret is null)
+        WHERE (client = $1) and (t between $2 and $3) AND (secret = public.digest($4, 'sha256') or secret is null)
+        ORDER BY t ASC
         LIMIT $5").unwrap(); // Must succeed.
-    let rows = stmt
-        .query(&[&name, &from_ts, &to_ts, &secret, &max])
-        .unwrap();
-    {
+    let rows = stmt.query(&[&name, &from_ts, &to_ts, &secret, &max]);
+    if let Ok(rows) = rows {
         returnable.features = Vec::with_capacity(rows.len());
         for row in rows.iter() {
             let (ts, lat, long, spd, ele): (
@@ -132,7 +255,7 @@
         }
     }
 
-    rocket::response::content::Json(serde_json::to_string(&returnable).unwrap())
+    rocket_contrib::json::Json(returnable)
 }
 
 /// Ingest geo data.
@@ -157,7 +280,8 @@
     if let Some(time) = time {
         ts = flexible_timestamp_parse(time).unwrap_or(ts);
     }
-    let stmt = db.0.prepare_cached("INSERT INTO geohub.geodata (id, lat, long, spd, t, ele, secret) VALUES ($1, $2, $3, $4, $5, $6, public.digest($7, 'sha256'))").unwrap();
+    println!("{}", name);
+    let stmt = db.0.prepare_cached("INSERT INTO geohub.geodata (client, lat, long, spd, t, ele, secret) VALUES ($1, $2, $3, $4, $5, $6, public.digest($7, 'sha256'))").unwrap();
     let notify =
         db.0.prepare_cached(format!("NOTIFY {}, '{}'", name, name).as_str())
             .unwrap();
@@ -170,6 +294,6 @@
 fn main() {
     rocket::ignite()
         .attach(DBConn::fairing())
-        .mount("/", rocket::routes![log, retrieve_json])
+        .mount("/", rocket::routes![log, retrieve_json, retrieve_live])
         .launch();
 }