changeset 581:4f825f33a455

Initial commit of asyncdb code
author Lewin Bormann <lbo@spheniscida.de>
date Wed, 28 Sep 2022 09:01:50 +0200
parents cbe7574ed46b
children ab45bdbdedac
files Cargo.toml src/asyncdb.rs src/error.rs src/lib.rs
diffstat 4 files changed, 135 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/Cargo.toml	Sun Aug 28 11:30:29 2022 -0700
+++ b/Cargo.toml	Wed Sep 28 09:01:50 2022 +0200
@@ -20,6 +20,12 @@
 errno = "0.2"
 fs2 = "0.4.3"
 
+tokio = { optional = true, features = ["rt", "sync"], version = ">= 1.21" }
+
+[features]
+default = ["async"]
+async = ["tokio"]
+
 [dev-dependencies]
 time-test = "0.2"
 bencher = "0.1"
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/asyncdb.rs	Wed Sep 28 09:01:50 2022 +0200
@@ -0,0 +1,125 @@
+
+use std::path::Path;
+
+use crate::{DB, Status, StatusCode, Options, Result, snapshot::Snapshot, WriteBatch};
+
+use tokio::sync::mpsc;
+use tokio::sync::oneshot;
+use tokio::task::{JoinHandle, spawn_blocking};
+
+const CHANNEL_BUFFER_SIZE: usize = 32;
+
+enum Request {
+    Close,
+    Put { key: Vec<u8>, val: Vec<u8> },
+    Delete { key: Vec<u8> },
+    Write { batch: WriteBatch, sync: bool },
+    Flush,
+    //GetAt { snapshot: Snapshot, key: Vec<u8> },
+    Get { key: Vec<u8> },
+    //GetSnapshot,
+    CompactRange { from: Vec<u8>, to: Vec<u8> }
+}
+
+enum Response {
+    OK,
+    Error(Status),
+    Value(Option<Vec<u8>>),
+    // Idea: don't send snapshots but opaque reference to a snapshot that doesn't leave the worker
+    // thread.
+    //Snapshot(Snapshot),
+}
+
+struct Message {
+    req: Request,
+    resp_channel: oneshot::Sender<Response>,
+}
+
+pub struct AsyncDB {
+    jh: JoinHandle<()>,
+    send: mpsc::Sender<Message>,
+}
+
+impl AsyncDB {
+
+    pub fn new<P: AsRef<Path>>(name: P, opts: Options) -> Result<AsyncDB> {
+        let db = DB::open(name, opts)?;
+        let (send, recv) = mpsc::channel(CHANNEL_BUFFER_SIZE);
+        let jh = spawn_blocking(move || AsyncDB::run_server(db, recv));
+        Ok(AsyncDB { jh, send })
+    }
+
+    async fn send_request(&self, req: Request) -> Result<Response> {
+        let (tx, rx) = oneshot::channel();
+        let m = Message { req, resp_channel: tx };
+        if let Err(e) = self.send.send(m).await {
+            return Err(Status { code: StatusCode::AsyncError, err: e.to_string() });
+        }
+        let resp = rx.await;
+        match resp {
+            Err(e) => Err(Status { code: StatusCode::AsyncError, err: e.to_string() }),
+            Ok(r) => Ok(r),
+        }
+    }
+
+    fn run_server(mut db: DB, mut recv: mpsc::Receiver<Message>) {
+        while let Some(message) = recv.blocking_recv() {
+            match message.req {
+                Request::Close => {
+                    message.resp_channel.send(Response::OK);
+                    recv.close();
+                    return;
+                },
+                Request::Put { key, val } => {
+                    let ok = db.put(&key, &val);
+                    send_response(message.resp_channel, ok);
+                },
+                Request::Delete { key } => {
+                    let ok = db.delete(&key);
+                    send_response(message.resp_channel, ok);
+                },
+                Request::Write { batch, sync } => {
+                    let ok = db.write(batch, sync);
+                    send_response(message.resp_channel, ok);
+                },
+                Request::Flush => {
+                    let ok = db.flush();
+                    send_response(message.resp_channel, ok);
+                },
+                /*Request::GetAt { snapshot, key } => {
+                    let ok = db.get_at(&snapshot, &key);
+                    match ok {
+                        Err(e) => {
+                            message.resp_channel.send(Response::Error(e));
+                        },
+                        Ok(v) => {
+                            message.resp_channel.send(Response::Value(v));
+                        }
+                    };
+                },
+                */
+                Request::Get { key } => {
+                    let r = db.get(&key);
+                    message.resp_channel.send(Response::Value(r));
+                },
+                /*Request::GetSnapshot => {
+                    message.resp_channel.send(Response::Snapshot(db.get_snapshot()));
+                },
+                */
+                Request::CompactRange { from, to } => {
+                    let ok = db.compact_range(&from, &to);
+                    send_response(message.resp_channel, ok);
+                }
+            }
+        }
+    }
+}
+
+fn send_response(ch: oneshot::Sender<Response>, result: Result<()>) {
+    if let Err(e) = result {
+        ch.send(Response::Error(e));
+    } else {
+        ch.send(Response::OK);
+    }
+}
+
--- a/src/error.rs	Sun Aug 28 11:30:29 2022 -0700
+++ b/src/error.rs	Wed Sep 28 09:01:50 2022 +0200
@@ -24,6 +24,7 @@
     NotFound,
     NotSupported,
     PermissionDenied,
+    AsyncError,
     Unknown,
     Errno(errno::Errno),
 }
--- a/src/lib.rs	Sun Aug 28 11:30:29 2022 -0700
+++ b/src/lib.rs	Wed Sep 28 09:01:50 2022 +0200
@@ -37,6 +37,9 @@
 #[macro_use]
 mod infolog;
 
+//#[cfg(feature = "async")]
+mod asyncdb;
+
 mod block;
 mod block_builder;
 mod blockhandle;