Mercurial > lbo > hg > leveldb-rs
changeset 614:e41530366262
Merge pull request #33 from KAIYOHUGO/master
Add Compressor
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 15 Jul 2023 22:09:44 +0200 |
parents | 2abfa394ba2d (current diff) 3cfa3633c012 (diff) |
children | f48ec74f26b0 |
files | src/db_impl.rs |
diffstat | 15 files changed, 291 insertions(+), 71 deletions(-) [+] |
line wrap: on
line diff
--- a/Cargo.toml Sat Jul 15 22:06:55 2023 +0200 +++ b/Cargo.toml Sat Jul 15 22:09:44 2023 +0200 @@ -36,5 +36,11 @@ path = "src/benches/maps_bench.rs" [workspace] -members = ["examples/write-a-lot", "examples/leveldb-tool", "examples/word-analyze", "examples/stresstest", "examples/asyncdb"] - +members = [ + "examples/write-a-lot", + "examples/leveldb-tool", + "examples/word-analyze", + "examples/stresstest", + "examples/asyncdb", + "examples/mcpe", +]
--- a/examples/leveldb-tool/src/main.rs Sat Jul 15 22:06:55 2023 +0200 +++ b/examples/leveldb-tool/src/main.rs Sat Jul 15 22:09:44 2023 +0200 @@ -1,6 +1,6 @@ extern crate rusty_leveldb; -use rusty_leveldb::{LdbIterator, Options, DB}; +use rusty_leveldb::{compressor, CompressorId, LdbIterator, Options, DB}; use std::env::args; use std::io::{self, Write}; @@ -59,7 +59,7 @@ let mut opt = Options::default(); opt.reuse_logs = false; opt.reuse_manifest = false; - opt.compression_type = rusty_leveldb::CompressionType::CompressionSnappy; + opt.compressor = compressor::SnappyCompressor::ID; let mut db = DB::open("tooldb", opt).unwrap(); match args[1].as_str() {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/mcpe/Cargo.toml Sat Jul 15 22:09:44 2023 +0200 @@ -0,0 +1,8 @@ +[package] +name = "mcpe" +version = "0.1.0" +edition = "2021" + +[dependencies] +miniz_oxide = "0.7.1" +rusty-leveldb = { path = "../../" }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/mcpe/README.md Sat Jul 15 22:09:44 2023 +0200 @@ -0,0 +1,5 @@ +# MCPE + +This example show how to customize compression method. + +This setup is compatible to [Mojang's leveldb](https://github.com/Mojang/leveldb-mcpe). \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/mcpe/src/main.rs Sat Jul 15 22:09:44 2023 +0200 @@ -0,0 +1,70 @@ +use miniz_oxide::deflate::{compress_to_vec, compress_to_vec_zlib}; +use miniz_oxide::inflate::{decompress_to_vec, decompress_to_vec_zlib}; +use rusty_leveldb::{Compressor, CompressorList, Options, DB}; +use std::rc::Rc; + +struct ZlibCompressor(u8); + +impl ZlibCompressor { + /// level 0-10 + pub fn new(level: u8) -> Self { + assert!(level <= 10); + Self(level) + } +} + +impl Compressor for ZlibCompressor { + fn encode(&self, block: Vec<u8>) -> rusty_leveldb::Result<Vec<u8>> { + Ok(compress_to_vec_zlib(&block, self.0)) + } + + fn decode(&self, block: Vec<u8>) -> rusty_leveldb::Result<Vec<u8>> { + decompress_to_vec_zlib(&block).map_err(|e| rusty_leveldb::Status { + code: rusty_leveldb::StatusCode::CompressionError, + err: e.to_string(), + }) + } +} + +struct RawZlibCompressor(u8); + +impl RawZlibCompressor { + /// level 0-10 + pub fn new(level: u8) -> Self { + assert!(level <= 10); + Self(level) + } +} + +impl Compressor for RawZlibCompressor { + fn encode(&self, block: Vec<u8>) -> rusty_leveldb::Result<Vec<u8>> { + Ok(compress_to_vec(&block, self.0)) + } + + fn decode(&self, block: Vec<u8>) -> rusty_leveldb::Result<Vec<u8>> { + decompress_to_vec(&block).map_err(|e| rusty_leveldb::Status { + code: rusty_leveldb::StatusCode::CompressionError, + err: e.to_string(), + }) + } +} + +pub fn mcpe_options(compression_level: u8) -> Options { + let mut opt = Options::default(); + opt.compressor = 0; + let mut list = CompressorList::new(); + list.set_with_id(0, RawZlibCompressor::new(compression_level)); + list.set_with_id(1, ZlibCompressor::new(compression_level)); + opt.compressor_list = Rc::new(list); + opt +} + +fn main() { + let path = "mcpe_db"; + let compression_level = 10; + let opt = mcpe_options(compression_level); + let mut db = DB::open(path, opt).unwrap(); + db.put(b"~local_player", b"NBT data goes here").unwrap(); + let value = db.get(b"~local_player").unwrap(); + assert_eq!(&value, b"NBT data goes here") +}
--- a/examples/stresstest/src/main.rs Sat Jul 15 22:06:55 2023 +0200 +++ b/examples/stresstest/src/main.rs Sat Jul 15 22:09:44 2023 +0200 @@ -1,5 +1,5 @@ use rand::distributions::{Alphanumeric, DistString}; -use rusty_leveldb::{Options, DB}; +use rusty_leveldb::{compressor, CompressorId, Options, DB}; const KEY_LEN: usize = 4; const VAL_LEN: usize = 8; @@ -12,7 +12,7 @@ fn write(db: &mut DB, n: usize) { time_test::time_test!("write"); - for i in 0..n { + for _ in 0..n { let (k, v) = (gen_string(KEY_LEN), gen_string(VAL_LEN)); db.put(k.as_bytes(), v.as_bytes()).unwrap(); @@ -27,10 +27,10 @@ fn read(db: &mut DB, n: usize) -> usize { let mut succ = 0; time_test::time_test!("read"); - for i in 0..n { + for _ in 0..n { let k = gen_string(KEY_LEN); - if let Some(v) = db.get(k.as_bytes()) { + if let Some(_) = db.get(k.as_bytes()) { succ += 1; } } @@ -38,20 +38,20 @@ } fn main() { - let N = 100_000; + let n = 100_000; let m = 10; let path = "stresstestdb"; let mut entries = 0; for i in 0..m { let mut opt = Options::default(); - opt.compression_type = rusty_leveldb::CompressionType::CompressionSnappy; + opt.compressor = compressor::SnappyCompressor::ID; let mut db = DB::open(path, opt).unwrap(); - write(&mut db, N); - entries += N; + write(&mut db, n); + entries += n; println!("Wrote {} entries ({}/{})", entries, i + 1, m); - let s = read(&mut db, N); - println!("Read back {} entries (found {}) ({}/{})", N, s, i + 1, m); + let s = read(&mut db, n); + println!("Read back {} entries (found {}) ({}/{})", n, s, i + 1, m); } }
--- a/examples/word-analyze/src/main.rs Sat Jul 15 22:06:55 2023 +0200 +++ b/examples/word-analyze/src/main.rs Sat Jul 15 22:09:44 2023 +0200 @@ -1,3 +1,4 @@ +use leveldb::CompressorId; use rusty_leveldb as leveldb; use std::fs::OpenOptions; @@ -35,7 +36,7 @@ fn main() { let mut opts = leveldb::Options::default(); - opts.compression_type = leveldb::CompressionType::CompressionNone; + opts.compressor = leveldb::compressor::NoneCompressor::ID; let db = leveldb::DB::open("wordsdb", opts).unwrap(); run(db).unwrap();
--- a/examples/write-a-lot/src/main.rs Sat Jul 15 22:06:55 2023 +0200 +++ b/examples/write-a-lot/src/main.rs Sat Jul 15 22:09:44 2023 +0200 @@ -2,9 +2,7 @@ extern crate rusty_leveldb; use rand::Rng; -use rusty_leveldb::CompressionType; -use rusty_leveldb::Options; -use rusty_leveldb::DB; +use rusty_leveldb::{compressor, CompressorId, Options, DB}; use std::error::Error; use std::iter::FromIterator; @@ -31,7 +29,7 @@ fn main() { let mut opt = Options::default(); - opt.compression_type = CompressionType::CompressionSnappy; + opt.compressor = compressor::SnappyCompressor::ID; let mut db = DB::open("test1", opt).unwrap(); fill_db(&mut db, 32768).unwrap();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/compressor.rs Sat Jul 15 22:09:44 2023 +0200 @@ -0,0 +1,77 @@ +/// Custom compression method +/// +/// ``` +/// # use rusty_leveldb::{Compressor, CompressorId}; +/// +/// #[derive(Debug, Clone, Copy, Default)] +/// pub struct CustomCompressor; +/// +/// impl CompressorId for CustomCompressor { +/// // a unique id to identify what compressor should DB use +/// const ID: u8 = 42; +/// } +/// +/// impl Compressor for CustomCompressor { +/// fn encode(&self, block: Vec<u8>) -> rusty_leveldb::Result<Vec<u8>> { +/// // Do something +/// Ok(block) +/// } +/// +/// fn decode(&self, block: Vec<u8>) -> rusty_leveldb::Result<Vec<u8>> { +/// // Do something +/// Ok(block) +/// } +/// } +/// ``` +/// +/// See [crate::CompressorList] for usage +pub trait Compressor { + fn encode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>>; + + fn decode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>>; +} + +/// Set default compressor id +pub trait CompressorId { + const ID: u8; +} + +/// A compressor that do **Nothing** +/// +/// It default id is `0` +#[derive(Debug, Clone, Copy, Default)] +pub struct NoneCompressor; + +impl CompressorId for NoneCompressor { + const ID: u8 = 0; +} + +impl Compressor for NoneCompressor { + fn encode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> { + Ok(block) + } + + fn decode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> { + Ok(block) + } +} + +/// A compressor that compress data with Google's Snappy +/// +/// It default id is `1` +#[derive(Debug, Clone, Copy, Default)] +pub struct SnappyCompressor; + +impl CompressorId for SnappyCompressor { + const ID: u8 = 1; +} + +impl Compressor for SnappyCompressor { + fn encode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> { + Ok(snap::raw::Encoder::new().compress_vec(&block)?) + } + + fn decode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> { + Ok(snap::raw::Decoder::new().decompress_vec(&block)?) + } +}
--- a/src/db_impl.rs Sat Jul 15 22:06:55 2023 +0200 +++ b/src/db_impl.rs Sat Jul 15 22:09:44 2023 +0200 @@ -890,10 +890,7 @@ Ok(()) } - fn finish_compaction_output( - &mut self, - cs: &mut CompactionState, - ) -> Result<()> { + fn finish_compaction_output(&mut self, cs: &mut CompactionState) -> Result<()> { assert!(cs.builder.is_some()); let output_num = cs.current_output().num; assert!(output_num > 0);
--- a/src/lib.rs Sat Jul 15 22:06:55 2023 +0200 +++ b/src/lib.rs Sat Jul 15 22:09:44 2023 +0200 @@ -73,10 +73,13 @@ mod db_impl; mod db_iter; +pub mod compressor; + #[cfg(feature = "async")] pub use asyncdb::AsyncDB; pub use cmp::{Cmp, DefaultCmp}; +pub use compressor::{Compressor, CompressorId}; pub use db_impl::DB; pub use db_iter::DBIterator; pub use disk_env::PosixDiskEnv; @@ -84,7 +87,7 @@ pub use error::{Result, Status, StatusCode}; pub use filter::{BloomPolicy, FilterPolicy}; pub use mem_env::MemEnv; -pub use options::{in_memory, CompressionType, Options}; +pub use options::{in_memory, CompressorList, Options}; pub use skipmap::SkipMap; pub use types::LdbIterator; pub use write_batch::WriteBatch;
--- a/src/options.rs Sat Jul 15 22:06:55 2023 +0200 +++ b/src/options.rs Sat Jul 15 22:09:44 2023 +0200 @@ -1,12 +1,13 @@ use crate::block::Block; use crate::cache::Cache; use crate::cmp::{Cmp, DefaultCmp}; -use crate::disk_env; +use crate::compressor::{self, Compressor, CompressorId}; use crate::env::Env; -use crate::filter; use crate::infolog::{self, Logger}; use crate::mem_env::MemEnv; use crate::types::{share, Shared}; +use crate::{disk_env, Result}; +use crate::{filter, Status, StatusCode}; use std::default::Default; use std::rc::Rc; @@ -19,20 +20,6 @@ const WRITE_BUFFER_SIZE: usize = 4 * MB; const DEFAULT_BITS_PER_KEY: u32 = 10; // NOTE: This may need to be optimized. -#[derive(Clone, Copy, PartialEq, Debug)] -pub enum CompressionType { - CompressionNone = 0, - CompressionSnappy = 1, -} - -pub fn int_to_compressiontype(i: u32) -> Option<CompressionType> { - match i { - 0 => Some(CompressionType::CompressionNone), - 1 => Some(CompressionType::CompressionSnappy), - _ => None, - } -} - /// Options contains general parameters for a LevelDB instance. Most of the names are /// self-explanatory; the defaults are defined in the `Default` implementation. #[derive(Clone)] @@ -49,9 +36,13 @@ pub block_cache: Shared<Cache<Block>>, pub block_size: usize, pub block_restart_interval: usize, + /// Compressor id in compressor list + /// /// Note: you have to open a database with the same compression type as it was written to, in /// order to not lose data! (this is a bug and will be fixed) - pub compression_type: CompressionType, + pub compressor: u8, + + pub compressor_list: Rc<CompressorList>, pub reuse_logs: bool, pub reuse_manifest: bool, pub filter_policy: filter::BoxedFilterPolicy, @@ -75,12 +66,65 @@ block_restart_interval: 16, reuse_logs: true, reuse_manifest: true, - compression_type: CompressionType::CompressionNone, + compressor: 0, + compressor_list: Rc::new(CompressorList::default()), filter_policy: Rc::new(Box::new(filter::BloomPolicy::new(DEFAULT_BITS_PER_KEY))), } } } +/// Customize compressor method for leveldb +/// +/// `Default` value is like the code below +/// ``` +/// # use rusty_leveldb::{compressor, CompressorList}; +/// let mut list = CompressorList::new(); +/// list.set(compressor::NoneCompressor); +/// list.set(compressor::SnappyCompressor); +/// ``` +pub struct CompressorList([Option<Box<dyn Compressor>>; 256]); + +impl CompressorList { + /// Create a **Empty** compressor list + pub fn new() -> Self { + const INIT: Option<Box<dyn Compressor>> = None; + Self([INIT; 256]) + } + + /// Set compressor with the id in `CompressorId` trait + pub fn set<T>(&mut self, compressor: T) + where + T: Compressor + CompressorId + 'static, + { + self.set_with_id(T::ID, compressor) + } + + /// Set compressor with id + pub fn set_with_id(&mut self, id: u8, compressor: impl Compressor + 'static) { + self.0[id as usize] = Some(Box::new(compressor)); + } + + pub fn is_set(&self, id: u8) -> bool { + self.0[id as usize].is_some() + } + + pub fn get(&self, id: u8) -> Result<&Box<dyn Compressor + 'static>> { + self.0[id as usize].as_ref().ok_or_else(|| Status { + code: StatusCode::NotSupported, + err: format!("invalid compression id `{}`", id), + }) + } +} + +impl Default for CompressorList { + fn default() -> Self { + let mut list = Self::new(); + list.set(compressor::NoneCompressor); + list.set(compressor::SnappyCompressor); + list + } +} + /// Returns Options that will cause a database to exist purely in-memory instead of being stored on /// disk. This is useful for testing or ephemeral databases. pub fn in_memory() -> Options {
--- a/src/table_block.rs Sat Jul 15 22:06:55 2023 +0200 +++ b/src/table_block.rs Sat Jul 15 22:09:44 2023 +0200 @@ -5,7 +5,7 @@ use crate::filter; use crate::filter_block::FilterBlockReader; use crate::log::unmask_crc; -use crate::options::{self, CompressionType, Options}; +use crate::options::Options; use crate::table_builder; use crc::crc32::{self, Hasher32}; @@ -69,18 +69,12 @@ ), ); } + let compressor_list = opt.compressor_list.clone(); - if let Some(ctype) = options::int_to_compressiontype(compress[0] as u32) { - match ctype { - CompressionType::CompressionNone => Ok(Block::new(opt, buf)), - CompressionType::CompressionSnappy => { - let decoded = snap::raw::Decoder::new().decompress_vec(buf.as_slice())?; - Ok(Block::new(opt, decoded)) - } - } - } else { - err(StatusCode::InvalidData, "invalid compression type") - } + Ok(Block::new( + opt, + compressor_list.get(compress[0])?.decode(buf)?, + )) } /// Verify checksum of block
--- a/src/table_builder.rs Sat Jul 15 22:06:55 2023 +0200 +++ b/src/table_builder.rs Sat Jul 15 22:09:44 2023 +0200 @@ -2,12 +2,13 @@ use crate::block_builder::BlockBuilder; use crate::blockhandle::BlockHandle; use crate::cmp::InternalKeyCmp; +use crate::compressor::{self, Compressor, CompressorId}; use crate::error::Result; use crate::filter::{InternalFilterPolicy, NoFilterPolicy}; use crate::filter_block::FilterBlockBuilder; use crate::key_types::InternalKey; use crate::log::mask_crc; -use crate::options::{CompressionType, Options}; +use crate::options::Options; use std::cmp::Ordering; use std::io::Write; @@ -178,8 +179,10 @@ self.prev_block_last_key = Vec::from(block.last_key()); let contents = block.finish(); - let ctype = self.opt.compression_type; - let handle = self.write_block(contents, ctype)?; + let compressor_list = self.opt.compressor_list.clone(); + let compressor = compressor_list.get(self.opt.compressor)?; + + let handle = self.write_block(contents, (self.opt.compressor, compressor))?; let mut handle_enc = [0 as u8; 16]; let enc_len = handle.encode_to(&mut handle_enc); @@ -198,19 +201,21 @@ } /// Calculates the checksum, writes the block to disk and updates the offset. - fn write_block(&mut self, block: BlockContents, ctype: CompressionType) -> Result<BlockHandle> { - let mut data = block; - if ctype == CompressionType::CompressionSnappy { - data = snap::raw::Encoder::new().compress_vec(&data)?; - } + fn write_block( + &mut self, + block: BlockContents, + compressor_id_pair: (u8, &Box<dyn Compressor>), + ) -> Result<BlockHandle> { + let (ctype, compressor) = compressor_id_pair; + let data = compressor.encode(block)?; let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); digest.write(&data); - digest.write(&[ctype as u8; TABLE_BLOCK_COMPRESS_LEN]); + digest.write(&[ctype; TABLE_BLOCK_COMPRESS_LEN]); self.dst.write(&data)?; - self.dst.write(&[ctype as u8; TABLE_BLOCK_COMPRESS_LEN])?; + self.dst.write(&[ctype; TABLE_BLOCK_COMPRESS_LEN])?; self.dst.write_fixedint(mask_crc(digest.sum32()))?; let handle = BlockHandle::new(self.offset, data.len()); @@ -221,7 +226,12 @@ pub fn finish(mut self) -> Result<usize> { assert!(self.data_block.is_some()); - let ctype = self.opt.compression_type; + + let compressor_list = self.opt.compressor_list.clone(); + let compressor_id_pair = ( + self.opt.compressor, + compressor_list.get(self.opt.compressor)?, + ); // If there's a pending data block, write it if self.data_block.as_ref().unwrap().entries() > 0 { @@ -241,7 +251,13 @@ let fblock = self.filter_block.take().unwrap(); let filter_key = format!("filter.{}", fblock.filter_name()); let fblock_data = fblock.finish(); - let fblock_handle = self.write_block(fblock_data, CompressionType::CompressionNone)?; + let fblock_handle = self.write_block( + fblock_data, + ( + compressor::NoneCompressor::ID, + &(Box::new(compressor::NoneCompressor) as Box<dyn Compressor>), + ), + )?; let mut handle_enc = [0 as u8; 16]; let enc_len = fblock_handle.encode_to(&mut handle_enc); @@ -251,11 +267,11 @@ // write metaindex block let meta_ix = meta_ix_block.finish(); - let meta_ix_handle = self.write_block(meta_ix, ctype)?; + let meta_ix_handle = self.write_block(meta_ix, compressor_id_pair)?; // write index block let index_cont = self.index_block.take().unwrap().finish(); - let ix_handle = self.write_block(index_cont, ctype)?; + let ix_handle = self.write_block(index_cont, compressor_id_pair)?; // write footer. let footer = Footer::new(meta_ix_handle, ix_handle); @@ -292,7 +308,7 @@ let mut d = Vec::with_capacity(512); let mut opt = options::for_test(); opt.block_restart_interval = 3; - opt.compression_type = CompressionType::CompressionSnappy; + opt.compressor = compressor::SnappyCompressor::ID; let mut b = TableBuilder::new_raw(opt, &mut d); let data = vec![
--- a/src/table_reader.rs Sat Jul 15 22:06:55 2023 +0200 +++ b/src/table_reader.rs Sat Jul 15 22:09:44 2023 +0200 @@ -374,12 +374,13 @@ #[cfg(test)] mod tests { + use crate::compressor::CompressorId; use crate::filter::BloomPolicy; use crate::key_types::LookupKey; - use crate::options::{self, CompressionType}; use crate::table_builder::TableBuilder; use crate::test_util::{test_iterator_properties, LdbIteratorIter}; use crate::types::{current_key_val, LdbIterator}; + use crate::{compressor, options}; use super::*; @@ -405,7 +406,7 @@ let mut opt = options::for_test(); opt.block_restart_interval = 2; opt.block_size = 32; - opt.compression_type = CompressionType::CompressionSnappy; + opt.compressor = compressor::SnappyCompressor::ID; { // Uses the standard comparator in opt.