Mercurial > lbo > hg > geohub
changeset 80:8ec6df976a19
Enable ingestion of JSON batches
for Overland, e.g.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 05 Dec 2020 22:14:15 +0100 |
parents | b53fcb02a550 |
children | e75a327cca26 |
files | src/main.rs src/notifier.rs src/types.rs |
diffstat | 3 files changed, 138 insertions(+), 22 deletions(-) [+] |
line wrap: on
line diff
--- a/src/main.rs Sat Dec 05 22:13:50 2020 +0100 +++ b/src/main.rs Sat Dec 05 22:14:15 2020 +0100 @@ -34,10 +34,15 @@ }; let db = db::DBQuery(&db.0); if let Some((geojson, newlast)) = db.check_for_new_rows(&client, &secret, &last, &limit) { - rocket_contrib::json::Json(types::LiveUpdate::new(client, Some(newlast), Some(geojson), None)) + rocket_contrib::json::Json(types::LiveUpdate::new( + client, + Some(newlast), + Some(geojson), + None, + )) } else { rocket_contrib::json::Json(types::LiveUpdate::new( - client, + client, last, None, Some("No rows returned".into()), @@ -46,7 +51,8 @@ } /// Wait for an update. -/// Only one point is returned. To retrieve a history of points, call retrieve_last. +/// Usually, one point is returned, but if a client sent several at once, all the points will be +/// delivered. #[rocket::get("/geo/<name>/retrieve/live?<secret>&<timeout>")] fn retrieve_live( notify_manager: rocket::State<notifier::NotifyManager>, @@ -120,6 +126,8 @@ /// Ingest geo data. +/// Ingest individual points by URL query string. +/// /// time is like 2020-11-30T20:12:36.444Z (ISO 8601). By default, server time is set. /// secret can be used to protect points. #[rocket::post( @@ -139,7 +147,6 @@ accuracy: Option<f64>, note: rocket::data::Data, ) -> http::GeoHubResponse { - let db = db::DBQuery(&db.0); // Check that secret and client name are legal. if !ids::name_and_secret_acceptable(name.as_str(), secret.as_ref().map(|s| s.as_str())) { return http::bad_request( @@ -156,6 +163,7 @@ } else { secret }; + let db = db::DBQuery(&db.0); let mut ts = chrono::Utc::now(); if let Some(time) = time { @@ -164,7 +172,13 @@ // Length-limit notes. let note = match http::read_data(note, 4096) { - Ok(n) => { if n.is_empty() { None } else { Some(n) } }, + Ok(n) => { + if n.is_empty() { + None + } else { + Some(n) + } + } Err(e) => return e, }; @@ -180,12 +194,70 @@ if let Err(e) = db.log_geopoint(name.as_str(), &secret, &point) { return http::server_error(e.to_string()); } - if let Err(e) = notify_manager.send_notification(&db, name.as_str(), &secret) { + if let Err(e) = notify_manager.send_notification(&db, name.as_str(), &secret, Some(1)) { eprintln!("Couldn't send notification: {}", e); } http::GeoHubResponse::Ok("".into()) } +/// Ingest GeoJSON. +#[rocket::post("/geo/<name>/logjson?<secret>", data = "<body>")] +fn log_json( + db: db::DBConn, + notify_manager: rocket::State<notifier::NotifyManager>, + name: String, + secret: Option<String>, + body: rocket_contrib::json::Json<types::LogLocations>, +) -> http::GeoHubResponse { + // Check that secret and client name are legal. + if !ids::name_and_secret_acceptable(name.as_str(), secret.as_ref().map(|s| s.as_str())) { + return http::bad_request( + "You have supplied an invalid secret or name. Both must be ASCII alphanumeric strings." + .into(), + ); + } + let secret = if let Some(secret) = secret { + if secret.is_empty() { + None + } else { + Some(secret) + } + } else { + secret + }; + let db = db::DBQuery(&db.0); + + let geofeats = body.into_inner().locations; + let nrows = geofeats.len() as i64; + + // Due to prepared statements, this isn't as bad as it looks. + let mut errs = vec![]; + for feat in geofeats { + let point = types::geopoint_from_feature(feat); + if let Err(e) = db.log_geopoint(name.as_str(), &secret, &point) { + errs.push(e); + } + } + + // Only notify once. + if let Err(e) = notify_manager.send_notification(&db, name.as_str(), &secret, Some(nrows)) { + eprintln!("Couldn't send notification: {}", e); + } + + if errs.is_empty() { + http::GeoHubResponse::Ok("".into()) + } else { + let errstring = errs + .into_iter() + .take(10) + .map(|e| e.to_string()) + .collect::<Vec<String>>() + .join(";"); + eprintln!("Couldn't write points: {}", errstring); + http::GeoHubResponse::Ok(errstring) + } +} + /// Serve static files. #[rocket::get("/geo/assets/<file..>")] fn assets( @@ -218,7 +290,7 @@ )) .mount( "/", - rocket::routes![log, retrieve_json, retrieve_last, retrieve_live, assets], + rocket::routes![log, log_json, retrieve_json, retrieve_last, retrieve_live, assets], ) .launch(); }
--- a/src/notifier.rs Sat Dec 05 22:13:50 2020 +0100 +++ b/src/notifier.rs Sat Dec 05 22:14:15 2020 +0100 @@ -1,8 +1,10 @@ + use crate::db; use crate::types; use fallible_iterator::FallibleIterator; use std::collections::HashMap; +use std::str::FromStr; use std::sync::{mpsc, Arc, Mutex}; use std::time; @@ -33,15 +35,24 @@ } } -fn encode_notify_payload(client: &str, secret: &Option<String>) -> String { +fn encode_client_id(client: &str, secret: &Option<String>) -> String { format!( "{} {}", client, - secret.as_ref().map(|s| s.as_str()).unwrap_or("") + secret.as_ref().map(|s| s.as_str()).unwrap_or(""), ) } -fn decode_notify_payload(payload: &str) -> (String, Option<String>) { +fn encode_notify_payload(client: &str, secret: &Option<String>, nrows: Option<i64>) -> String { + format!( + "{} {} {}", + client, + secret.as_ref().map(|s| s.as_str()).unwrap_or(""), + nrows.unwrap_or(1) + ) +} + +fn decode_notify_payload(payload: &str) -> (String, Option<String>, Option<i64>) { let parts = payload.split(' ').collect::<Vec<&str>>(); assert!(parts.len() >= 1); let secret = if parts.len() > 1 { @@ -49,7 +60,12 @@ } else { None }; - return (parts[0].into(), secret); + let nrows = if parts.len() > 2 { + i64::from_str(parts[2]).ok() + } else { + None + }; + return (parts[0].into(), secret, nrows); } /// Build a channel name from a client name and secret. @@ -93,11 +109,12 @@ dbq: &db::DBQuery, client: &str, secret: &Option<String>, + nrows: Option<i64>, ) -> Result<u64, postgres::Error> { let channel = format!( "NOTIFY {}, '{}'", channel_name(client, secret.as_ref().unwrap_or(&"".into()).as_str()), - encode_notify_payload(client, secret), + encode_notify_payload(client, secret, nrows), ); let notify = dbq.0.prepare_cached(channel.as_str()).unwrap(); notify.execute(&[]) @@ -155,7 +172,7 @@ if let Ok(nrq) = rx.try_recv() { // client_id is also the payload sent to the channel. It keys waiters by client and // secret. - let client_id = encode_notify_payload(nrq.client.as_str(), &nrq.secret); + let client_id = encode_client_id(nrq.client.as_str(), &nrq.secret); if !clients.contains_key(&client_id) { listen(db.0, &nrq.client, &nrq.secret).ok(); } @@ -174,12 +191,13 @@ while let Ok(Some(notification)) = iter.next() { // We can extract the client and secret from the channel payload. The payload itself is // the hashmap key. - let client_id = notification.payload; - let (client, secret) = decode_notify_payload(client_id.as_str()); + let (client, secret, nrows) = decode_notify_payload(¬ification.payload); + let client_id = encode_client_id(&client, &secret); + unlisten(db.0, client.as_str(), &secret).ok(); // These queries use the primary key index returning one row only and will be quite fast. - let rows = db.check_for_new_rows(client.as_str(), &secret, &None, &Some(1)); + let rows = db.check_for_new_rows(client.as_str(), &secret, &None, &Some(nrows.unwrap_or(1))); if let Some((geo, last)) = rows { for request in clients.remove(&client_id).unwrap_or(vec![]) { request
--- a/src/types.rs Sat Dec 05 22:13:50 2020 +0100 +++ b/src/types.rs Sat Dec 05 22:14:15 2020 +0100 @@ -1,4 +1,4 @@ -/// Non-JSON plain point representation. +/// Non-JSON plain point representation. Flat and representing a database row. #[derive(Debug, Clone)] pub struct GeoPoint { pub lat: f64, @@ -21,7 +21,12 @@ } impl LiveUpdate { - pub fn new(client: String, last: Option<i32>, geo: Option<GeoJSON>, err: Option<String>) -> LiveUpdate { + pub fn new( + client: String, + last: Option<i32>, + geo: Option<GeoJSON>, + err: Option<String>, + ) -> LiveUpdate { LiveUpdate { typ: "GeoHubUpdate".into(), client: client, @@ -32,13 +37,20 @@ } } +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct LogLocations { + pub locations: Vec<GeoFeature>, +} + /// Fetch geodata as JSON. /// -#[derive(serde::Serialize, Debug, Clone)] +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] pub struct GeoProperties { + #[serde(alias = "timestamp")] time: chrono::DateTime<chrono::Utc>, altitude: Option<f64>, speed: Option<f64>, + #[serde(alias = "horizontal_accuracy")] accuracy: Option<f64>, /// The unique ID of the point. id: Option<i32>, @@ -46,14 +58,14 @@ note: Option<String>, } -#[derive(serde::Serialize, Debug, Clone)] +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] pub struct GeoGeometry { #[serde(rename = "type")] typ: String, // always "Point" coordinates: (f64, f64), // always [long, lat] } -#[derive(serde::Serialize, Debug, Clone)] +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] pub struct GeoFeature { #[serde(rename = "type")] typ: String, // always "Feature" @@ -61,7 +73,7 @@ geometry: GeoGeometry, } -#[derive(serde::Serialize, Debug, Clone)] +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] pub struct GeoJSON { #[serde(rename = "type")] typ: String, // always "FeatureCollection" @@ -100,3 +112,17 @@ }, } } + +pub fn geopoint_from_feature(feat: GeoFeature) -> GeoPoint { + let geo = feat.geometry; + let prop = feat.properties; + GeoPoint { + accuracy: prop.accuracy, + ele: prop.altitude, + long: geo.coordinates.0, + lat: geo.coordinates.1, + note: None, + spd: prop.speed, + time: prop.time, + } +}