changeset 23:8fc023bbfb97

Restructure API to make database usage more efficient.
author Lewin Bormann <lbo@spheniscida.de>
date Wed, 02 Dec 2020 21:35:59 +0100
parents 65065070a4b4
children 2174218dd521
files Rocket.toml src/main.rs
diffstat 2 files changed, 61 insertions(+), 37 deletions(-) [+]
line wrap: on
line diff
--- a/Rocket.toml	Wed Dec 02 21:09:41 2020 +0100
+++ b/Rocket.toml	Wed Dec 02 21:35:59 2020 +0100
@@ -3,10 +3,10 @@
 # Majority of workers is expected to be waiting for updates.
 
 [development]
-workers = 16
+workers = 8
 
 [production]
 log = "normal"
 address = "::1"
 port = 8000
-workers = 8
+workers = 16
--- a/src/main.rs	Wed Dec 02 21:09:41 2020 +0100
+++ b/src/main.rs	Wed Dec 02 21:35:59 2020 +0100
@@ -101,7 +101,7 @@
 
 /// Queries for at most `limit` rows since entry ID `last`.
 fn check_for_new_rows(
-    db: &DBConn,
+    db: &postgres::Connection,
     name: &String,
     secret: &Option<String>,
     last: &Option<i32>,
@@ -111,7 +111,7 @@
         typ: "FeatureCollection".into(),
         features: vec![],
     };
-    let check_for_new = db.0.prepare_cached(
+    let check_for_new = db.prepare_cached(
         r"SELECT id, t, lat, long, spd, ele FROM geohub.geodata
         WHERE (client = $1) and (id > $2) AND (secret = public.digest($3, 'sha256') or secret is null)
         ORDER BY id DESC
@@ -161,30 +161,36 @@
     return None;
 }
 
-/// Wait for an update.
-///
-/// Points are returned in descending order of time.
-#[rocket::get("/geo/<name>/retrieve/live?<secret>&<last>&<timeout>")]
-fn retrieve_live(
+#[rocket::get("/geo/<name>/retrieve/last?<secret>&<last>&<limit>")]
+fn retrieve_last(
     db: DBConn,
-    notify_manager: rocket::State<SendableSender<NotifyRequest>>,
     name: String,
     secret: Option<String>,
     last: Option<i32>,
+    limit: Option<i64>,
+) -> rocket_contrib::json::Json<LiveUpdate> {
+    if let Some((geojson, newlast)) = check_for_new_rows(&db.0, &name, &secret, &last, &limit) {
+        return rocket_contrib::json::Json(LiveUpdate {
+            typ: "GeoHubUpdate".into(),
+            last: Some(newlast),
+            geo: Some(geojson),
+        });
+    }
+    return rocket_contrib::json::Json(LiveUpdate {
+        typ: "GeoHubUpdate".into(),
+        last: last,
+        geo: None,
+    });
+}
+/// Wait for an update.
+/// Only one point is returned. To retrieve a history of points, call retrieve_last.
+#[rocket::get("/geo/<name>/retrieve/live?<secret>&<timeout>")]
+fn retrieve_live(
+    notify_manager: rocket::State<SendableSender<NotifyRequest>>,
+    name: String,
+    secret: Option<String>,
     timeout: Option<u64>,
 ) -> rocket_contrib::json::Json<LiveUpdate> {
-    // Only if the client supplied a paging token should we check for new rows before. This is an
-    // optimization.
-    if last.is_some() {
-        if let Some((geojson, newlast)) = check_for_new_rows(&db, &name, &secret, &last, &None) {
-            return rocket_contrib::json::Json(LiveUpdate {
-                typ: "GeoHubUpdate".into(),
-                last: Some(newlast),
-                geo: Some(geojson),
-            });
-        }
-    }
-
     let (send, recv) = mpsc::channel();
     let send = SendableSender {
         sender: Arc::new(Mutex::new(send)),
@@ -192,23 +198,22 @@
 
     let req = NotifyRequest {
         client: name.clone(),
+        secret: secret,
         respond: send,
     };
     notify_manager.send(req).unwrap();
 
     if let Ok(response) = recv.recv_timeout(time::Duration::new(timeout.unwrap_or(30), 0)) {
         eprintln!("Worker received response for {}", response.client);
-        if let Some((geojson, last)) = check_for_new_rows(&db, &name, &secret, &last, &Some(1)) {
-            return rocket_contrib::json::Json(LiveUpdate {
-                typ: "GeoHubUpdate".into(),
-                last: Some(last),
-                geo: Some(geojson),
-            });
-        }
+        return rocket_contrib::json::Json(LiveUpdate {
+            typ: "GeoHubUpdate".into(),
+            last: response.last,
+            geo: response.geo,
+        });
     }
     return rocket_contrib::json::Json(LiveUpdate {
         typ: "GeoHubUpdate".into(),
-        last: last,
+        last: None,
         geo: None,
     });
 }
@@ -309,11 +314,15 @@
 // Notify all waiters using just one DB connection.
 struct NotifyRequest {
     client: String,
+    secret: Option<String>,
     respond: SendableSender<NotifyResponse>,
 }
 
 struct NotifyResponse {
     client: String,
+    // The GeoJSON object containing the update and the `last` page token.
+    geo: Option<GeoJSON>,
+    last: Option<i32>,
 }
 
 #[derive(Clone)]
@@ -368,12 +377,27 @@
             unlisten(&db, &payload).ok();
 
             for request in clients.remove(&payload).unwrap_or(vec![]) {
-                request
-                    .respond
-                    .send(NotifyResponse {
-                        client: payload.clone(),
-                    })
-                    .ok();
+                if let Some((geo, last)) =
+                    check_for_new_rows(&db, &payload, &request.secret, &None, &Some(1))
+                {
+                    request
+                        .respond
+                        .send(NotifyResponse {
+                            client: payload.clone(),
+                            geo: Some(geo),
+                            last: Some(last),
+                        })
+                        .ok();
+                } else {
+                    request
+                        .respond
+                        .send(NotifyResponse {
+                            client: payload.clone(),
+                            geo: None,
+                            last: None,
+                        })
+                        .ok();
+                }
             }
 
             // We also need to receive new notification requests.
@@ -407,7 +431,7 @@
         ))
         .mount(
             "/",
-            rocket::routes![log, retrieve_json, retrieve_live, assets],
+            rocket::routes![log, retrieve_json, retrieve_last, retrieve_live, assets],
         )
         .launch();
 }