Mercurial > lbo > hg > leveldb-rs
changeset 585:90ec4071176f
Implement snapshots in asyncdb
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Wed, 28 Sep 2022 11:15:09 +0200 |
parents | adcd81a99cee |
children | acb33b545ce1 |
files | src/asyncdb.rs |
diffstat | 1 files changed, 60 insertions(+), 17 deletions(-) [+] |
line wrap: on
line diff
--- a/src/asyncdb.rs Wed Sep 28 11:14:57 2022 +0200 +++ b/src/asyncdb.rs Wed Sep 28 11:15:09 2022 +0200 @@ -1,6 +1,8 @@ +use std::collections::hash_map::HashMap; use std::path::Path; + use crate::{DB, Status, StatusCode, Options, Result, snapshot::Snapshot, WriteBatch}; use tokio::sync::mpsc; @@ -9,15 +11,19 @@ const CHANNEL_BUFFER_SIZE: usize = 32; +#[derive(Clone, Copy)] +pub struct SnapshotRef(usize); + enum Request { Close, Put { key: Vec<u8>, val: Vec<u8> }, Delete { key: Vec<u8> }, Write { batch: WriteBatch, sync: bool }, Flush, - //GetAt { snapshot: Snapshot, key: Vec<u8> }, + GetAt { snapshot: SnapshotRef, key: Vec<u8> }, Get { key: Vec<u8> }, - //GetSnapshot, + GetSnapshot, + DropSnapshot { snapshot: SnapshotRef }, CompactRange { from: Vec<u8>, to: Vec<u8> } } @@ -27,7 +33,7 @@ Value(Option<Vec<u8>>), // Idea: don't send snapshots but opaque reference to a snapshot that doesn't leave the worker // thread. - //Snapshot(Snapshot), + Snapshot(SnapshotRef), } struct Message { @@ -98,6 +104,30 @@ _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), } } + pub async fn get_at(&self, snapshot: SnapshotRef, key: Vec<u8>) -> Result<Option<Vec<u8>>> { + let r = self.process_request(Request::GetAt { snapshot, key }).await?; + match r { + Response::Value(v) => Ok(v), + Response::Error(s) => Err(s), + _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + } + } + pub async fn get_snapshot(&self) -> Result<SnapshotRef> { + let r = self.process_request(Request::GetSnapshot).await?; + match r { + Response::Snapshot(sr) => Ok(sr), + _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + } + } + /// As snapshots returned by `AsyncDB::get_snapshot()` are sort-of "weak references" to an + /// actual snapshot, they need to be dropped explicitly. + pub async fn drop_snapshot(&self, snapshot: SnapshotRef) -> Result<()> { + let r = self.process_request(Request::DropSnapshot { snapshot }).await?; + match r { + Response::OK => Ok(()), + _ => Err(Status { code: StatusCode::AsyncError, err: "Wrong response type in AsyncDB.".to_string() }), + } + } pub async fn compact_range(&self, from: Vec<u8>, to: Vec<u8>) -> Result<()> { let r = self.process_request(Request::CompactRange { from, to }).await?; match r { @@ -121,6 +151,9 @@ } fn run_server(mut db: DB, mut recv: mpsc::Receiver<Message>) { + let mut snapshots = HashMap::new(); + let mut snapshot_counter: usize = 0; + while let Some(message) = recv.blocking_recv() { match message.req { Request::Close => { @@ -144,26 +177,36 @@ let ok = db.flush(); send_response(message.resp_channel, ok); }, - /*Request::GetAt { snapshot, key } => { - let ok = db.get_at(&snapshot, &key); - match ok { - Err(e) => { - message.resp_channel.send(Response::Error(e)); - }, - Ok(v) => { - message.resp_channel.send(Response::Value(v)); - } - }; + Request::GetAt { snapshot, key } => { + let snapshot_id = snapshot.0; + if let Some(snapshot) = snapshots.get(&snapshot_id) { + let ok = db.get_at(&snapshot, &key); + match ok { + Err(e) => { + message.resp_channel.send(Response::Error(e)); + }, + Ok(v) => { + message.resp_channel.send(Response::Value(v)); + } + }; + } else { + message.resp_channel.send(Response::Error(Status { code: StatusCode::AsyncError, err: "Unknown snapshot reference: this is a bug".to_string() })); + } }, - */ Request::Get { key } => { let r = db.get(&key); message.resp_channel.send(Response::Value(r)); }, - /*Request::GetSnapshot => { - message.resp_channel.send(Response::Snapshot(db.get_snapshot())); + Request::GetSnapshot => { + snapshots.insert(snapshot_counter, db.get_snapshot()); + let sref = SnapshotRef(snapshot_counter); + snapshot_counter += 1; + message.resp_channel.send(Response::Snapshot(sref)); }, - */ + Request::DropSnapshot { snapshot } => { + snapshots.remove(&snapshot.0); + send_response(message.resp_channel, Ok(())); + } Request::CompactRange { from, to } => { let ok = db.compact_range(&from, &to); send_response(message.resp_channel, ok);