Mercurial > lbo > hg > geohub
changeset 23:8fc023bbfb97
Restructure API to make database usage more efficient.
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Wed, 02 Dec 2020 21:35:59 +0100 |
parents | 65065070a4b4 |
children | 2174218dd521 |
files | Rocket.toml src/main.rs |
diffstat | 2 files changed, 61 insertions(+), 37 deletions(-) [+] |
line wrap: on
line diff
--- a/Rocket.toml Wed Dec 02 21:09:41 2020 +0100 +++ b/Rocket.toml Wed Dec 02 21:35:59 2020 +0100 @@ -3,10 +3,10 @@ # Majority of workers is expected to be waiting for updates. [development] -workers = 16 +workers = 8 [production] log = "normal" address = "::1" port = 8000 -workers = 8 +workers = 16
--- a/src/main.rs Wed Dec 02 21:09:41 2020 +0100 +++ b/src/main.rs Wed Dec 02 21:35:59 2020 +0100 @@ -101,7 +101,7 @@ /// Queries for at most `limit` rows since entry ID `last`. fn check_for_new_rows( - db: &DBConn, + db: &postgres::Connection, name: &String, secret: &Option<String>, last: &Option<i32>, @@ -111,7 +111,7 @@ typ: "FeatureCollection".into(), features: vec![], }; - let check_for_new = db.0.prepare_cached( + let check_for_new = db.prepare_cached( r"SELECT id, t, lat, long, spd, ele FROM geohub.geodata WHERE (client = $1) and (id > $2) AND (secret = public.digest($3, 'sha256') or secret is null) ORDER BY id DESC @@ -161,30 +161,36 @@ return None; } -/// Wait for an update. -/// -/// Points are returned in descending order of time. -#[rocket::get("/geo/<name>/retrieve/live?<secret>&<last>&<timeout>")] -fn retrieve_live( +#[rocket::get("/geo/<name>/retrieve/last?<secret>&<last>&<limit>")] +fn retrieve_last( db: DBConn, - notify_manager: rocket::State<SendableSender<NotifyRequest>>, name: String, secret: Option<String>, last: Option<i32>, + limit: Option<i64>, +) -> rocket_contrib::json::Json<LiveUpdate> { + if let Some((geojson, newlast)) = check_for_new_rows(&db.0, &name, &secret, &last, &limit) { + return rocket_contrib::json::Json(LiveUpdate { + typ: "GeoHubUpdate".into(), + last: Some(newlast), + geo: Some(geojson), + }); + } + return rocket_contrib::json::Json(LiveUpdate { + typ: "GeoHubUpdate".into(), + last: last, + geo: None, + }); +} +/// Wait for an update. +/// Only one point is returned. To retrieve a history of points, call retrieve_last. +#[rocket::get("/geo/<name>/retrieve/live?<secret>&<timeout>")] +fn retrieve_live( + notify_manager: rocket::State<SendableSender<NotifyRequest>>, + name: String, + secret: Option<String>, timeout: Option<u64>, ) -> rocket_contrib::json::Json<LiveUpdate> { - // Only if the client supplied a paging token should we check for new rows before. This is an - // optimization. - if last.is_some() { - if let Some((geojson, newlast)) = check_for_new_rows(&db, &name, &secret, &last, &None) { - return rocket_contrib::json::Json(LiveUpdate { - typ: "GeoHubUpdate".into(), - last: Some(newlast), - geo: Some(geojson), - }); - } - } - let (send, recv) = mpsc::channel(); let send = SendableSender { sender: Arc::new(Mutex::new(send)), @@ -192,23 +198,22 @@ let req = NotifyRequest { client: name.clone(), + secret: secret, respond: send, }; notify_manager.send(req).unwrap(); if let Ok(response) = recv.recv_timeout(time::Duration::new(timeout.unwrap_or(30), 0)) { eprintln!("Worker received response for {}", response.client); - if let Some((geojson, last)) = check_for_new_rows(&db, &name, &secret, &last, &Some(1)) { - return rocket_contrib::json::Json(LiveUpdate { - typ: "GeoHubUpdate".into(), - last: Some(last), - geo: Some(geojson), - }); - } + return rocket_contrib::json::Json(LiveUpdate { + typ: "GeoHubUpdate".into(), + last: response.last, + geo: response.geo, + }); } return rocket_contrib::json::Json(LiveUpdate { typ: "GeoHubUpdate".into(), - last: last, + last: None, geo: None, }); } @@ -309,11 +314,15 @@ // Notify all waiters using just one DB connection. struct NotifyRequest { client: String, + secret: Option<String>, respond: SendableSender<NotifyResponse>, } struct NotifyResponse { client: String, + // The GeoJSON object containing the update and the `last` page token. + geo: Option<GeoJSON>, + last: Option<i32>, } #[derive(Clone)] @@ -368,12 +377,27 @@ unlisten(&db, &payload).ok(); for request in clients.remove(&payload).unwrap_or(vec![]) { - request - .respond - .send(NotifyResponse { - client: payload.clone(), - }) - .ok(); + if let Some((geo, last)) = + check_for_new_rows(&db, &payload, &request.secret, &None, &Some(1)) + { + request + .respond + .send(NotifyResponse { + client: payload.clone(), + geo: Some(geo), + last: Some(last), + }) + .ok(); + } else { + request + .respond + .send(NotifyResponse { + client: payload.clone(), + geo: None, + last: None, + }) + .ok(); + } } // We also need to receive new notification requests. @@ -407,7 +431,7 @@ )) .mount( "/", - rocket::routes![log, retrieve_json, retrieve_live, assets], + rocket::routes![log, retrieve_json, retrieve_last, retrieve_live, assets], ) .launch(); }