changeset 583:b315ed595a1f

Make asyncdb usable
author Lewin Bormann <lbo@spheniscida.de>
date Wed, 28 Sep 2022 10:17:14 +0200
parents ab45bdbdedac
children adcd81a99cee
files Cargo.toml examples/asyncdb/Cargo.toml examples/asyncdb/src/main.rs src/asyncdb.rs src/lib.rs
diffstat 5 files changed, 89 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/Cargo.toml	Wed Sep 28 09:06:42 2022 +0200
+++ b/Cargo.toml	Wed Sep 28 10:17:14 2022 +0200
@@ -36,5 +36,5 @@
 path = "src/benches/maps_bench.rs"
 
 [workspace]
-members = ["examples/write-a-lot", "examples/leveldb-tool", "examples/word-analyze", "examples/kvserver", "examples/stresstest"]
+members = ["examples/write-a-lot", "examples/leveldb-tool", "examples/word-analyze",  "examples/stresstest", "examples/asyncdb"]
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/examples/asyncdb/Cargo.toml	Wed Sep 28 10:17:14 2022 +0200
@@ -0,0 +1,10 @@
+[package]
+name = "asyncdb"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+tokio = { version = "1.21", features = ["rt", "macros" ] }
+rusty-leveldb = { path = "../../", features = ["async"] }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/examples/asyncdb/src/main.rs	Wed Sep 28 10:17:14 2022 +0200
@@ -0,0 +1,16 @@
+use tokio::main;
+
+use rusty_leveldb::{AsyncDB, Options};
+
+#[main(flavor = "current_thread")]
+async fn main() {
+    let adb = AsyncDB::new("testdb", Options::default()).unwrap();
+
+    assert!(adb.put("Hello".as_bytes().to_owned(), "World".as_bytes().to_owned()).await.is_ok());
+
+    let r = adb.get("Hello".as_bytes().to_owned()).await;
+    assert_eq!(r, Ok(Some("World".as_bytes().to_owned())));
+
+    adb.flush().await.expect("flush()");
+    adb.close().await.expect("close()");
+}
--- a/src/asyncdb.rs	Wed Sep 28 09:06:42 2022 +0200
+++ b/src/asyncdb.rs	Wed Sep 28 10:17:14 2022 +0200
@@ -49,7 +49,65 @@
         Ok(AsyncDB { jh, send })
     }
 
-    async fn send_request(&self, req: Request) -> Result<Response> {
+    pub async fn close(&self) -> Result<()> {
+        let r = self.process_request(Request::Close).await?;
+        match r {
+            Response::OK => Ok(()),
+            Response::Error(s) => Err(s),
+            _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }),
+        }
+    }
+
+    pub async fn put(&self, key: Vec<u8>, val: Vec<u8>) -> Result<()> {
+        let r = self.process_request(Request::Put{key, val}).await?;
+        match r {
+            Response::OK => Ok(()),
+            Response::Error(s) => Err(s),
+            _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }),
+        }
+    }
+    pub async fn delete(&self, key: Vec<u8>) -> Result<()> {
+        let r = self.process_request(Request::Delete{key}).await?;
+        match r {
+            Response::OK => Ok(()),
+            Response::Error(s) => Err(s),
+            _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }),
+        }
+    }
+    pub async fn write(&self, batch: WriteBatch, sync: bool) -> Result<()> {
+        let r = self.process_request(Request::Write{batch, sync}).await?;
+        match r {
+            Response::OK => Ok(()),
+            Response::Error(s) => Err(s),
+            _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }),
+        }
+    }
+    pub async fn flush(&self) -> Result<()> {
+        let r = self.process_request(Request::Flush).await?;
+        match r {
+            Response::OK => Ok(()),
+            Response::Error(s) => Err(s),
+            _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }),
+        }
+    }
+    pub async fn get(&self, key: Vec<u8>) -> Result<Option<Vec<u8>>> {
+        let r = self.process_request(Request::Get { key }).await?;
+        match r {
+            Response::Value(v) => Ok(v),
+            Response::Error(s) => Err(s),
+            _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }),
+        }
+    }
+    pub async fn compact_range(&self, from: Vec<u8>, to: Vec<u8>) -> Result<()> {
+        let r = self.process_request(Request::CompactRange { from, to }).await?;
+        match r {
+            Response::OK => Ok(()),
+            Response::Error(s) => Err(s),
+            _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }),
+        }
+    }
+
+    async fn process_request(&self, req: Request) -> Result<Response> {
         let (tx, rx) = oneshot::channel();
         let m = Message { req, resp_channel: tx };
         if let Err(e) = self.send.send(m).await {
@@ -66,7 +124,7 @@
         while let Some(message) = recv.blocking_recv() {
             match message.req {
                 Request::Close => {
-                    message.resp_channel.send(Response::OK);
+                    message.resp_channel.send(Response::OK).ok();
                     recv.close();
                     return;
                 },
--- a/src/lib.rs	Wed Sep 28 09:06:42 2022 +0200
+++ b/src/lib.rs	Wed Sep 28 10:17:14 2022 +0200
@@ -73,6 +73,8 @@
 mod db_impl;
 mod db_iter;
 
+pub use asyncdb::AsyncDB;
+
 pub use cmp::{Cmp, DefaultCmp};
 pub use db_impl::DB;
 pub use db_iter::DBIterator;