view src/asyncdb.rs @ 581:4f825f33a455

Initial commit of asyncdb code
author Lewin Bormann <lbo@spheniscida.de>
date Wed, 28 Sep 2022 09:01:50 +0200
parents
children b315ed595a1f
line wrap: on
line source


use std::path::Path;

use crate::{DB, Status, StatusCode, Options, Result, snapshot::Snapshot, WriteBatch};

use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::{JoinHandle, spawn_blocking};

const CHANNEL_BUFFER_SIZE: usize = 32;

enum Request {
    Close,
    Put { key: Vec<u8>, val: Vec<u8> },
    Delete { key: Vec<u8> },
    Write { batch: WriteBatch, sync: bool },
    Flush,
    //GetAt { snapshot: Snapshot, key: Vec<u8> },
    Get { key: Vec<u8> },
    //GetSnapshot,
    CompactRange { from: Vec<u8>, to: Vec<u8> }
}

enum Response {
    OK,
    Error(Status),
    Value(Option<Vec<u8>>),
    // Idea: don't send snapshots but opaque reference to a snapshot that doesn't leave the worker
    // thread.
    //Snapshot(Snapshot),
}

struct Message {
    req: Request,
    resp_channel: oneshot::Sender<Response>,
}

pub struct AsyncDB {
    jh: JoinHandle<()>,
    send: mpsc::Sender<Message>,
}

impl AsyncDB {

    pub fn new<P: AsRef<Path>>(name: P, opts: Options) -> Result<AsyncDB> {
        let db = DB::open(name, opts)?;
        let (send, recv) = mpsc::channel(CHANNEL_BUFFER_SIZE);
        let jh = spawn_blocking(move || AsyncDB::run_server(db, recv));
        Ok(AsyncDB { jh, send })
    }

    async fn send_request(&self, req: Request) -> Result<Response> {
        let (tx, rx) = oneshot::channel();
        let m = Message { req, resp_channel: tx };
        if let Err(e) = self.send.send(m).await {
            return Err(Status { code: StatusCode::AsyncError, err: e.to_string() });
        }
        let resp = rx.await;
        match resp {
            Err(e) => Err(Status { code: StatusCode::AsyncError, err: e.to_string() }),
            Ok(r) => Ok(r),
        }
    }

    fn run_server(mut db: DB, mut recv: mpsc::Receiver<Message>) {
        while let Some(message) = recv.blocking_recv() {
            match message.req {
                Request::Close => {
                    message.resp_channel.send(Response::OK);
                    recv.close();
                    return;
                },
                Request::Put { key, val } => {
                    let ok = db.put(&key, &val);
                    send_response(message.resp_channel, ok);
                },
                Request::Delete { key } => {
                    let ok = db.delete(&key);
                    send_response(message.resp_channel, ok);
                },
                Request::Write { batch, sync } => {
                    let ok = db.write(batch, sync);
                    send_response(message.resp_channel, ok);
                },
                Request::Flush => {
                    let ok = db.flush();
                    send_response(message.resp_channel, ok);
                },
                /*Request::GetAt { snapshot, key } => {
                    let ok = db.get_at(&snapshot, &key);
                    match ok {
                        Err(e) => {
                            message.resp_channel.send(Response::Error(e));
                        },
                        Ok(v) => {
                            message.resp_channel.send(Response::Value(v));
                        }
                    };
                },
                */
                Request::Get { key } => {
                    let r = db.get(&key);
                    message.resp_channel.send(Response::Value(r));
                },
                /*Request::GetSnapshot => {
                    message.resp_channel.send(Response::Snapshot(db.get_snapshot()));
                },
                */
                Request::CompactRange { from, to } => {
                    let ok = db.compact_range(&from, &to);
                    send_response(message.resp_channel, ok);
                }
            }
        }
    }
}

fn send_response(ch: oneshot::Sender<Response>, result: Result<()>) {
    if let Err(e) = result {
        ch.send(Response::Error(e));
    } else {
        ch.send(Response::OK);
    }
}