changeset 585:90ec4071176f

Implement snapshots in asyncdb
author Lewin Bormann <lbo@spheniscida.de>
date Wed, 28 Sep 2022 11:15:09 +0200
parents adcd81a99cee
children acb33b545ce1
files src/asyncdb.rs
diffstat 1 files changed, 60 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/src/asyncdb.rs	Wed Sep 28 11:14:57 2022 +0200
+++ b/src/asyncdb.rs	Wed Sep 28 11:15:09 2022 +0200
@@ -1,6 +1,8 @@
 
+use std::collections::hash_map::HashMap;
 use std::path::Path;
 
+
 use crate::{DB, Status, StatusCode, Options, Result, snapshot::Snapshot, WriteBatch};
 
 use tokio::sync::mpsc;
@@ -9,15 +11,19 @@
 
 const CHANNEL_BUFFER_SIZE: usize = 32;
 
+#[derive(Clone, Copy)]
+pub struct SnapshotRef(usize);
+
 enum Request {
     Close,
     Put { key: Vec<u8>, val: Vec<u8> },
     Delete { key: Vec<u8> },
     Write { batch: WriteBatch, sync: bool },
     Flush,
-    //GetAt { snapshot: Snapshot, key: Vec<u8> },
+    GetAt { snapshot: SnapshotRef, key: Vec<u8> },
     Get { key: Vec<u8> },
-    //GetSnapshot,
+    GetSnapshot,
+    DropSnapshot { snapshot: SnapshotRef },
     CompactRange { from: Vec<u8>, to: Vec<u8> }
 }
 
@@ -27,7 +33,7 @@
     Value(Option<Vec<u8>>),
     // Idea: don't send snapshots but opaque reference to a snapshot that doesn't leave the worker
     // thread.
-    //Snapshot(Snapshot),
+    Snapshot(SnapshotRef),
 }
 
 struct Message {
@@ -98,6 +104,30 @@
             _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }),
         }
     }
+    pub async fn get_at(&self, snapshot: SnapshotRef, key: Vec<u8>) -> Result<Option<Vec<u8>>> {
+        let r = self.process_request(Request::GetAt { snapshot, key }).await?;
+        match r {
+            Response::Value(v) => Ok(v),
+            Response::Error(s) => Err(s),
+            _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }),
+        }
+    }
+    pub async fn get_snapshot(&self) -> Result<SnapshotRef> {
+        let r = self.process_request(Request::GetSnapshot).await?;
+        match r {
+            Response::Snapshot(sr) => Ok(sr),
+            _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }),
+        }
+    }
+    /// As snapshots returned by `AsyncDB::get_snapshot()` are sort-of "weak references" to an
+    /// actual snapshot, they need to be dropped explicitly.
+    pub async fn drop_snapshot(&self, snapshot: SnapshotRef) -> Result<()> {
+        let r = self.process_request(Request::DropSnapshot { snapshot }).await?;
+        match r {
+            Response::OK => Ok(()),
+            _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }),
+        }
+    }
     pub async fn compact_range(&self, from: Vec<u8>, to: Vec<u8>) -> Result<()> {
         let r = self.process_request(Request::CompactRange { from, to }).await?;
         match r {
@@ -121,6 +151,9 @@
     }
 
     fn run_server(mut db: DB, mut recv: mpsc::Receiver<Message>) {
+        let mut snapshots = HashMap::new();
+        let mut snapshot_counter: usize = 0;
+
         while let Some(message) = recv.blocking_recv() {
             match message.req {
                 Request::Close => {
@@ -144,26 +177,36 @@
                     let ok = db.flush();
                     send_response(message.resp_channel, ok);
                 },
-                /*Request::GetAt { snapshot, key } => {
-                    let ok = db.get_at(&snapshot, &key);
-                    match ok {
-                        Err(e) => {
-                            message.resp_channel.send(Response::Error(e));
-                        },
-                        Ok(v) => {
-                            message.resp_channel.send(Response::Value(v));
-                        }
-                    };
+                Request::GetAt { snapshot, key } => {
+                    let snapshot_id = snapshot.0;
+                    if let Some(snapshot) = snapshots.get(&snapshot_id) {
+                        let ok = db.get_at(&snapshot, &key);
+                        match ok {
+                            Err(e) => {
+                                message.resp_channel.send(Response::Error(e));
+                            },
+                            Ok(v) => {
+                                message.resp_channel.send(Response::Value(v));
+                            }
+                        };
+                    } else {
+                        message.resp_channel.send(Response::Error(Status { code: StatusCode::AsyncError, err: "Unknown snapshot reference: this is a bug".to_string() }));
+                    }
                 },
-                */
                 Request::Get { key } => {
                     let r = db.get(&key);
                     message.resp_channel.send(Response::Value(r));
                 },
-                /*Request::GetSnapshot => {
-                    message.resp_channel.send(Response::Snapshot(db.get_snapshot()));
+                Request::GetSnapshot => {
+                    snapshots.insert(snapshot_counter, db.get_snapshot());
+                    let sref = SnapshotRef(snapshot_counter);
+                    snapshot_counter += 1;
+                    message.resp_channel.send(Response::Snapshot(sref));
                 },
-                */
+                Request::DropSnapshot { snapshot } => {
+                    snapshots.remove(&snapshot.0);
+                    send_response(message.resp_channel, Ok(()));
+                }
                 Request::CompactRange { from, to } => {
                     let ok = db.compact_range(&from, &to);
                     send_response(message.resp_channel, ok);