Mercurial > lbo > hg > leveldb-rs
changeset 586:acb33b545ce1
Finish async API for now
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Wed, 28 Sep 2022 11:26:41 +0200 |
parents | 90ec4071176f |
children | c0454e18fca0 |
files | src/asyncdb.rs |
diffstat | 1 files changed, 98 insertions(+), 44 deletions(-) [+] |
line wrap: on
line diff
--- a/src/asyncdb.rs Wed Sep 28 11:15:09 2022 +0200 +++ b/src/asyncdb.rs Wed Sep 28 11:26:41 2022 +0200 @@ -1,19 +1,18 @@ - use std::collections::hash_map::HashMap; use std::path::Path; - -use crate::{DB, Status, StatusCode, Options, Result, snapshot::Snapshot, WriteBatch}; +use crate::{Options, Result, Status, StatusCode, WriteBatch, DB}; use tokio::sync::mpsc; use tokio::sync::oneshot; -use tokio::task::{JoinHandle, spawn_blocking}; +use tokio::task::{spawn_blocking, JoinHandle}; const CHANNEL_BUFFER_SIZE: usize = 32; #[derive(Clone, Copy)] pub struct SnapshotRef(usize); +/// A request sent to the database thread. enum Request { Close, Put { key: Vec<u8>, val: Vec<u8> }, @@ -24,30 +23,35 @@ Get { key: Vec<u8> }, GetSnapshot, DropSnapshot { snapshot: SnapshotRef }, - CompactRange { from: Vec<u8>, to: Vec<u8> } + CompactRange { from: Vec<u8>, to: Vec<u8> }, } +/// A response received from the database thread. enum Response { OK, Error(Status), Value(Option<Vec<u8>>), - // Idea: don't send snapshots but opaque reference to a snapshot that doesn't leave the worker - // thread. Snapshot(SnapshotRef), } +/// Contains both a request and a back-channel for the reply. struct Message { req: Request, resp_channel: oneshot::Sender<Response>, } +/// `AsyncDB` makes it easy to use LevelDB in a tokio runtime. +/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented. +/// +/// TODO: Make it work in other runtimes as well. This is a matter of adapting the blocking thread +/// mechanism as well as the channel types. pub struct AsyncDB { jh: JoinHandle<()>, send: mpsc::Sender<Message>, } impl AsyncDB { - + /// Create a new or open an existing database. pub fn new<P: AsRef<Path>>(name: P, opts: Options) -> Result<AsyncDB> { let db = DB::open(name, opts)?; let (send, recv) = mpsc::channel(CHANNEL_BUFFER_SIZE); @@ -60,32 +64,44 @@ match r { Response::OK => Ok(()), Response::Error(s) => Err(s), - _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + _ => Err(Status { + code: StatusCode::AsyncError, + err: "Wrong response type in AsyncDB.".to_string(), + }), } } pub async fn put(&self, key: Vec<u8>, val: Vec<u8>) -> Result<()> { - let r = self.process_request(Request::Put{key, val}).await?; + let r = self.process_request(Request::Put { key, val }).await?; match r { Response::OK => Ok(()), Response::Error(s) => Err(s), - _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + _ => Err(Status { + code: StatusCode::AsyncError, + err: "Wrong response type in AsyncDB.".to_string(), + }), } } pub async fn delete(&self, key: Vec<u8>) -> Result<()> { - let r = self.process_request(Request::Delete{key}).await?; + let r = self.process_request(Request::Delete { key }).await?; match r { Response::OK => Ok(()), Response::Error(s) => Err(s), - _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + _ => Err(Status { + code: StatusCode::AsyncError, + err: "Wrong response type in AsyncDB.".to_string(), + }), } } pub async fn write(&self, batch: WriteBatch, sync: bool) -> Result<()> { - let r = self.process_request(Request::Write{batch, sync}).await?; + let r = self.process_request(Request::Write { batch, sync }).await?; match r { Response::OK => Ok(()), Response::Error(s) => Err(s), - _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + _ => Err(Status { + code: StatusCode::AsyncError, + err: "Wrong response type in AsyncDB.".to_string(), + }), } } pub async fn flush(&self) -> Result<()> { @@ -93,7 +109,10 @@ match r { Response::OK => Ok(()), Response::Error(s) => Err(s), - _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + _ => Err(Status { + code: StatusCode::AsyncError, + err: "Wrong response type in AsyncDB.".to_string(), + }), } } pub async fn get(&self, key: Vec<u8>) -> Result<Option<Vec<u8>>> { @@ -101,51 +120,81 @@ match r { Response::Value(v) => Ok(v), Response::Error(s) => Err(s), - _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + _ => Err(Status { + code: StatusCode::AsyncError, + err: "Wrong response type in AsyncDB.".to_string(), + }), } } pub async fn get_at(&self, snapshot: SnapshotRef, key: Vec<u8>) -> Result<Option<Vec<u8>>> { - let r = self.process_request(Request::GetAt { snapshot, key }).await?; + let r = self + .process_request(Request::GetAt { snapshot, key }) + .await?; match r { Response::Value(v) => Ok(v), Response::Error(s) => Err(s), - _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + _ => Err(Status { + code: StatusCode::AsyncError, + err: "Wrong response type in AsyncDB.".to_string(), + }), } } pub async fn get_snapshot(&self) -> Result<SnapshotRef> { let r = self.process_request(Request::GetSnapshot).await?; match r { Response::Snapshot(sr) => Ok(sr), - _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + _ => Err(Status { + code: StatusCode::AsyncError, + err: "Wrong response type in AsyncDB.".to_string(), + }), } } /// As snapshots returned by `AsyncDB::get_snapshot()` are sort-of "weak references" to an /// actual snapshot, they need to be dropped explicitly. pub async fn drop_snapshot(&self, snapshot: SnapshotRef) -> Result<()> { - let r = self.process_request(Request::DropSnapshot { snapshot }).await?; + let r = self + .process_request(Request::DropSnapshot { snapshot }) + .await?; match r { Response::OK => Ok(()), - _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + _ => Err(Status { + code: StatusCode::AsyncError, + err: "Wrong response type in AsyncDB.".to_string(), + }), } } pub async fn compact_range(&self, from: Vec<u8>, to: Vec<u8>) -> Result<()> { - let r = self.process_request(Request::CompactRange { from, to }).await?; + let r = self + .process_request(Request::CompactRange { from, to }) + .await?; match r { Response::OK => Ok(()), Response::Error(s) => Err(s), - _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + _ => Err(Status { + code: StatusCode::AsyncError, + err: "Wrong response type in AsyncDB.".to_string(), + }), } } async fn process_request(&self, req: Request) -> Result<Response> { let (tx, rx) = oneshot::channel(); - let m = Message { req, resp_channel: tx }; + let m = Message { + req, + resp_channel: tx, + }; if let Err(e) = self.send.send(m).await { - return Err(Status { code: StatusCode::AsyncError, err: e.to_string() }); + return Err(Status { + code: StatusCode::AsyncError, + err: e.to_string(), + }); } let resp = rx.await; match resp { - Err(e) => Err(Status { code: StatusCode::AsyncError, err: e.to_string() }), + Err(e) => Err(Status { + code: StatusCode::AsyncError, + err: e.to_string(), + }), Ok(r) => Ok(r), } } @@ -160,49 +209,55 @@ message.resp_channel.send(Response::OK).ok(); recv.close(); return; - }, + } Request::Put { key, val } => { let ok = db.put(&key, &val); send_response(message.resp_channel, ok); - }, + } Request::Delete { key } => { let ok = db.delete(&key); send_response(message.resp_channel, ok); - }, + } Request::Write { batch, sync } => { let ok = db.write(batch, sync); send_response(message.resp_channel, ok); - }, + } Request::Flush => { let ok = db.flush(); send_response(message.resp_channel, ok); - }, + } Request::GetAt { snapshot, key } => { let snapshot_id = snapshot.0; if let Some(snapshot) = snapshots.get(&snapshot_id) { let ok = db.get_at(&snapshot, &key); match ok { Err(e) => { - message.resp_channel.send(Response::Error(e)); - }, + message.resp_channel.send(Response::Error(e)).ok(); + } Ok(v) => { - message.resp_channel.send(Response::Value(v)); + message.resp_channel.send(Response::Value(v)).ok(); } }; } else { - message.resp_channel.send(Response::Error(Status { code: StatusCode::AsyncError, err: "Unknown snapshot reference: this is a bug".to_string() })); + message + .resp_channel + .send(Response::Error(Status { + code: StatusCode::AsyncError, + err: "Unknown snapshot reference: this is a bug".to_string(), + })) + .ok(); } - }, + } Request::Get { key } => { let r = db.get(&key); - message.resp_channel.send(Response::Value(r)); - }, + message.resp_channel.send(Response::Value(r)).ok(); + } Request::GetSnapshot => { snapshots.insert(snapshot_counter, db.get_snapshot()); let sref = SnapshotRef(snapshot_counter); snapshot_counter += 1; - message.resp_channel.send(Response::Snapshot(sref)); - }, + message.resp_channel.send(Response::Snapshot(sref)).ok(); + } Request::DropSnapshot { snapshot } => { snapshots.remove(&snapshot.0); send_response(message.resp_channel, Ok(())); @@ -218,9 +273,8 @@ fn send_response(ch: oneshot::Sender<Response>, result: Result<()>) { if let Err(e) = result { - ch.send(Response::Error(e)); + ch.send(Response::Error(e)).ok(); } else { - ch.send(Response::OK); + ch.send(Response::OK).ok(); } } -