Mercurial > lbo > hg > leveldb-rs
changeset 583:b315ed595a1f
Make asyncdb usable
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Wed, 28 Sep 2022 10:17:14 +0200 |
parents | ab45bdbdedac |
children | adcd81a99cee |
files | Cargo.toml examples/asyncdb/Cargo.toml examples/asyncdb/src/main.rs src/asyncdb.rs src/lib.rs |
diffstat | 5 files changed, 89 insertions(+), 3 deletions(-) [+] |
line wrap: on
line diff
--- a/Cargo.toml Wed Sep 28 09:06:42 2022 +0200 +++ b/Cargo.toml Wed Sep 28 10:17:14 2022 +0200 @@ -36,5 +36,5 @@ path = "src/benches/maps_bench.rs" [workspace] -members = ["examples/write-a-lot", "examples/leveldb-tool", "examples/word-analyze", "examples/kvserver", "examples/stresstest"] +members = ["examples/write-a-lot", "examples/leveldb-tool", "examples/word-analyze", "examples/stresstest", "examples/asyncdb"]
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/asyncdb/Cargo.toml Wed Sep 28 10:17:14 2022 +0200 @@ -0,0 +1,10 @@ +[package] +name = "asyncdb" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = { version = "1.21", features = ["rt", "macros" ] } +rusty-leveldb = { path = "../../", features = ["async"] }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/asyncdb/src/main.rs Wed Sep 28 10:17:14 2022 +0200 @@ -0,0 +1,16 @@ +use tokio::main; + +use rusty_leveldb::{AsyncDB, Options}; + +#[main(flavor = "current_thread")] +async fn main() { + let adb = AsyncDB::new("testdb", Options::default()).unwrap(); + + assert!(adb.put("Hello".as_bytes().to_owned(), "World".as_bytes().to_owned()).await.is_ok()); + + let r = adb.get("Hello".as_bytes().to_owned()).await; + assert_eq!(r, Ok(Some("World".as_bytes().to_owned()))); + + adb.flush().await.expect("flush()"); + adb.close().await.expect("close()"); +}
--- a/src/asyncdb.rs Wed Sep 28 09:06:42 2022 +0200 +++ b/src/asyncdb.rs Wed Sep 28 10:17:14 2022 +0200 @@ -49,7 +49,65 @@ Ok(AsyncDB { jh, send }) } - async fn send_request(&self, req: Request) -> Result<Response> { + pub async fn close(&self) -> Result<()> { + let r = self.process_request(Request::Close).await?; + match r { + Response::OK => Ok(()), + Response::Error(s) => Err(s), + _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + } + } + + pub async fn put(&self, key: Vec<u8>, val: Vec<u8>) -> Result<()> { + let r = self.process_request(Request::Put{key, val}).await?; + match r { + Response::OK => Ok(()), + Response::Error(s) => Err(s), + _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + } + } + pub async fn delete(&self, key: Vec<u8>) -> Result<()> { + let r = self.process_request(Request::Delete{key}).await?; + match r { + Response::OK => Ok(()), + Response::Error(s) => Err(s), + _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + } + } + pub async fn write(&self, batch: WriteBatch, sync: bool) -> Result<()> { + let r = self.process_request(Request::Write{batch, sync}).await?; + match r { + Response::OK => Ok(()), + Response::Error(s) => Err(s), + _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + } + } + pub async fn flush(&self) -> Result<()> { + let r = self.process_request(Request::Flush).await?; + match r { + Response::OK => Ok(()), + Response::Error(s) => Err(s), + _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + } + } + pub async fn get(&self, key: Vec<u8>) -> Result<Option<Vec<u8>>> { + let r = self.process_request(Request::Get { key }).await?; + match r { + Response::Value(v) => Ok(v), + Response::Error(s) => Err(s), + _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + } + } + pub async fn compact_range(&self, from: Vec<u8>, to: Vec<u8>) -> Result<()> { + let r = self.process_request(Request::CompactRange { from, to }).await?; + match r { + Response::OK => Ok(()), + Response::Error(s) => Err(s), + _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + } + } + + async fn process_request(&self, req: Request) -> Result<Response> { let (tx, rx) = oneshot::channel(); let m = Message { req, resp_channel: tx }; if let Err(e) = self.send.send(m).await { @@ -66,7 +124,7 @@ while let Some(message) = recv.blocking_recv() { match message.req { Request::Close => { - message.resp_channel.send(Response::OK); + message.resp_channel.send(Response::OK).ok(); recv.close(); return; },