Mercurial > lbo > hg > geohub
changeset 10:625c0c52e631
Implement live updates
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Tue, 01 Dec 2020 22:44:34 +0100 |
parents | b8b99af28199 |
children | d8312d4db6a6 |
files | Cargo.lock Cargo.toml pgsql_schema.sql src/main.rs |
diffstat | 4 files changed, 141 insertions(+), 12 deletions(-) [+] |
line wrap: on
line diff
--- a/Cargo.lock Tue Dec 01 21:39:06 2020 +0100 +++ b/Cargo.lock Tue Dec 01 22:44:34 2020 +0100 @@ -393,6 +393,7 @@ version = "0.1.0" dependencies = [ "chrono", + "fallible-iterator", "postgres", "rocket", "rocket_contrib", @@ -1106,6 +1107,8 @@ "r2d2_postgres", "rocket", "rocket_contrib_codegen", + "serde", + "serde_json", ] [[package]]
--- a/Cargo.toml Tue Dec 01 21:39:06 2020 +0100 +++ b/Cargo.toml Tue Dec 01 22:44:34 2020 +0100 @@ -12,9 +12,10 @@ chrono = { version = "^0.4", features = ["serde"] } serde = { version = "~1.0", features = ["derive"] } serde_json = "~1.0" +fallible-iterator = "~0.1" [dependencies.rocket_contrib] version = "~0.4" default-features = false -features = ["postgres_pool"] +features = ["postgres_pool", "json"]
--- a/pgsql_schema.sql Tue Dec 01 21:39:06 2020 +0100 +++ b/pgsql_schema.sql Tue Dec 01 22:44:34 2020 +0100 @@ -27,13 +27,14 @@ CREATE SCHEMA IF NOT EXISTS geohub; CREATE TABLE geohub.geodata ( - id text not null, - secret text, + id serial primary key, + client text not null, lat double precision, long double precision, spd double precision, t timestamp with time zone not null, ele double precision + secret bytea, );
--- a/src/main.rs Tue Dec 01 21:39:06 2020 +0100 +++ b/src/main.rs Tue Dec 01 22:44:34 2020 +0100 @@ -5,6 +5,9 @@ use chrono::TimeZone; +use fallible_iterator::FallibleIterator; +use std::iter::Iterator; + #[rocket_contrib::database("geohub")] struct DBConn(postgres::Connection); @@ -83,6 +86,127 @@ features: Vec<GeoFeature>, } +#[derive(serde::Serialize, Debug)] +struct LiveUpdate { + #[serde(rename = "type")] + typ: String, // always "GeoHubUpdate" + last: Option<i32>, // page token -- send in next request! + geo: Option<GeoJSON>, +} + +/// Queries for at most `limit` rows since entry ID `last`. +fn check_for_new_rows( + db: &DBConn, + name: &String, + secret: &Option<String>, + last: &Option<i32>, + limit: &Option<i64>, +) -> Option<(GeoJSON, i32)> { + let mut returnable = GeoJSON { + typ: "FeatureCollection".into(), + features: vec![], + }; + let check_for_new = db.0.prepare_cached( + r"SELECT id, t, lat, long, spd, ele FROM geohub.geodata + WHERE (client = $1) and (id > $2) AND (secret = public.digest($3, 'sha256') or secret is null) + ORDER BY t DESC + LIMIT $4").unwrap(); // Must succeed. + + let last = last.unwrap_or(0); + let limit = limit.unwrap_or(256); + + let rows = check_for_new.query(&[&name, &last, &secret, &limit]); + if let Ok(rows) = rows { + // If there are unknown entries, return those. + if rows.len() > 0 { + returnable.features = Vec::with_capacity(rows.len()); + let mut last = 0; + + for row in rows.iter() { + let (id, ts, lat, long, spd, ele): ( + i32, + chrono::DateTime<chrono::Utc>, + Option<f64>, + Option<f64>, + Option<f64>, + Option<f64>, + ) = ( + row.get(0), + row.get(1), + row.get(2), + row.get(3), + row.get(4), + row.get(5), + ); + returnable + .features + .push(geofeature_from_row(ts, lat, long, spd, ele)); + if id > last { + last = id; + } + } + + return Some((returnable, last)); + } + return None; + } else { + // For debugging. + rows.unwrap(); + } + return None; +} + +/// Wait for an update. +#[rocket::get("/geo/<name>/retrieve/live?<secret>&<last>&<timeout>")] +fn retrieve_live( + db: DBConn, + name: String, + secret: Option<String>, + last: Option<i32>, + timeout: Option<i32>, +) -> rocket_contrib::json::Json<LiveUpdate> { + // Only if the client supplied a paging token should we check for new rows before. This is an + // optimization. + if last.is_some() { + if let Some((geojson, last)) = check_for_new_rows(&db, &name, &secret, &last, &None) { + return rocket_contrib::json::Json(LiveUpdate { + typ: "GeoHubUpdate".into(), + last: Some(last), + geo: Some(geojson), + }); + } + } + + // Otherwise we will wait for the next update. + // + let listen = + db.0.prepare_cached(format!("LISTEN {}", name).as_str()) + .unwrap(); + let unlisten = + db.0.prepare_cached(format!("UNLISTEN {}", name).as_str()) + .unwrap(); + + listen.execute(&[]).ok(); + + let timeout = std::time::Duration::new(timeout.unwrap_or(30) as u64, 0); + if let Ok(_) = db.0.notifications().timeout_iter(timeout).next() { + unlisten.execute(&[]).ok(); + if let Some((geojson, last)) = check_for_new_rows(&db, &name, &secret, &last, &Some(1)) { + return rocket_contrib::json::Json(LiveUpdate { + typ: "GeoHubUpdate".into(), + last: Some(last), + geo: Some(geojson), + }); + } + } + unlisten.execute(&[]).ok(); + return rocket_contrib::json::Json(LiveUpdate { + typ: "GeoHubUpdate".into(), + last: last, + geo: None, + }); +} + /// Retrieve GeoJSON data. #[rocket::get("/geo/<name>/retrieve/json?<secret>&<from>&<to>&<max>")] fn retrieve_json( @@ -92,7 +216,7 @@ from: Option<String>, to: Option<String>, max: Option<i64>, -) -> rocket::response::content::Json<String> { +) -> rocket_contrib::json::Json<GeoJSON> { let mut returnable = GeoJSON { typ: "FeatureCollection".into(), features: vec![], @@ -111,12 +235,11 @@ let stmt = db.0.prepare_cached( r"SELECT t, lat, long, spd, ele FROM geohub.geodata - WHERE (id = $1) and (t between $2 and $3) AND (secret = public.digest($4, 'sha256') or secret is null) + WHERE (client = $1) and (t between $2 and $3) AND (secret = public.digest($4, 'sha256') or secret is null) + ORDER BY t ASC LIMIT $5").unwrap(); // Must succeed. - let rows = stmt - .query(&[&name, &from_ts, &to_ts, &secret, &max]) - .unwrap(); - { + let rows = stmt.query(&[&name, &from_ts, &to_ts, &secret, &max]); + if let Ok(rows) = rows { returnable.features = Vec::with_capacity(rows.len()); for row in rows.iter() { let (ts, lat, long, spd, ele): ( @@ -132,7 +255,7 @@ } } - rocket::response::content::Json(serde_json::to_string(&returnable).unwrap()) + rocket_contrib::json::Json(returnable) } /// Ingest geo data. @@ -157,7 +280,8 @@ if let Some(time) = time { ts = flexible_timestamp_parse(time).unwrap_or(ts); } - let stmt = db.0.prepare_cached("INSERT INTO geohub.geodata (id, lat, long, spd, t, ele, secret) VALUES ($1, $2, $3, $4, $5, $6, public.digest($7, 'sha256'))").unwrap(); + println!("{}", name); + let stmt = db.0.prepare_cached("INSERT INTO geohub.geodata (client, lat, long, spd, t, ele, secret) VALUES ($1, $2, $3, $4, $5, $6, public.digest($7, 'sha256'))").unwrap(); let notify = db.0.prepare_cached(format!("NOTIFY {}, '{}'", name, name).as_str()) .unwrap(); @@ -170,6 +294,6 @@ fn main() { rocket::ignite() .attach(DBConn::fairing()) - .mount("/", rocket::routes![log, retrieve_json]) + .mount("/", rocket::routes![log, retrieve_json, retrieve_live]) .launch(); }