Mercurial > lbo > hg > leveldb-rs
changeset 609:68887f0a3e83
Add Compressor
author | kaiyohugo <41114603+KAIYOHUGO@users.noreply.github.com> |
---|---|
date | Tue, 04 Jul 2023 21:50:32 +0800 |
parents | 08549e2435ff |
children | 9a561cd122c3 |
files | examples/leveldb-tool/src/main.rs examples/stresstest/src/main.rs examples/word-analyze/src/main.rs examples/write-a-lot/src/main.rs src/compressor.rs src/db_impl.rs src/error.rs src/lib.rs src/options.rs src/table_block.rs src/table_builder.rs src/table_reader.rs |
diffstat | 12 files changed, 125 insertions(+), 57 deletions(-) [+] |
line wrap: on
line diff
--- a/examples/leveldb-tool/src/main.rs Tue Jun 20 17:52:43 2023 +0200 +++ b/examples/leveldb-tool/src/main.rs Tue Jul 04 21:50:32 2023 +0800 @@ -1,6 +1,6 @@ extern crate rusty_leveldb; -use rusty_leveldb::{LdbIterator, Options, DB}; +use rusty_leveldb::{CompressorId, LdbIterator, Options, DB}; use std::env::args; use std::io::{self, Write}; @@ -59,7 +59,7 @@ let mut opt = Options::default(); opt.reuse_logs = false; opt.reuse_manifest = false; - opt.compression_type = rusty_leveldb::CompressionType::CompressionSnappy; + opt.compressor = rusty_leveldb::compressor::SnappyCompressor::ID; let mut db = DB::open("tooldb", opt).unwrap(); match args[1].as_str() {
--- a/examples/stresstest/src/main.rs Tue Jun 20 17:52:43 2023 +0200 +++ b/examples/stresstest/src/main.rs Tue Jul 04 21:50:32 2023 +0800 @@ -1,5 +1,5 @@ use rand::distributions::{Alphanumeric, DistString}; -use rusty_leveldb::{Options, DB}; +use rusty_leveldb::{compressor, CompressorId, Options, DB}; const KEY_LEN: usize = 4; const VAL_LEN: usize = 8; @@ -45,7 +45,7 @@ for i in 0..m { let mut opt = Options::default(); - opt.compression_type = rusty_leveldb::CompressionType::CompressionSnappy; + opt.compressor = compressor::SnappyCompressor::ID; let mut db = DB::open(path, opt).unwrap(); write(&mut db, N); entries += N;
--- a/examples/word-analyze/src/main.rs Tue Jun 20 17:52:43 2023 +0200 +++ b/examples/word-analyze/src/main.rs Tue Jul 04 21:50:32 2023 +0800 @@ -1,3 +1,4 @@ +use leveldb::CompressorId; use rusty_leveldb as leveldb; use std::fs::OpenOptions; @@ -35,7 +36,7 @@ fn main() { let mut opts = leveldb::Options::default(); - opts.compression_type = leveldb::CompressionType::CompressionNone; + opts.compressor = leveldb::compressor::NoneCompressor::ID; let db = leveldb::DB::open("wordsdb", opts).unwrap(); run(db).unwrap();
--- a/examples/write-a-lot/src/main.rs Tue Jun 20 17:52:43 2023 +0200 +++ b/examples/write-a-lot/src/main.rs Tue Jul 04 21:50:32 2023 +0800 @@ -2,9 +2,7 @@ extern crate rusty_leveldb; use rand::Rng; -use rusty_leveldb::CompressionType; -use rusty_leveldb::Options; -use rusty_leveldb::DB; +use rusty_leveldb::{compressor, CompressorId, Options, DB}; use std::error::Error; use std::iter::FromIterator; @@ -31,7 +29,7 @@ fn main() { let mut opt = Options::default(); - opt.compression_type = CompressionType::CompressionSnappy; + opt.compressor = compressor::SnappyCompressor::ID; let mut db = DB::open("test1", opt).unwrap(); fill_db(&mut db, 32768).unwrap();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/compressor.rs Tue Jul 04 21:50:32 2023 +0800 @@ -0,0 +1,55 @@ +pub trait Compressor { + fn encode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>>; + + fn decode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>>; +} + +pub trait CompressorId { + const ID: u8; +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct NoneCompressor; + +impl NoneCompressor { + pub fn new() -> Box<dyn Compressor> { + Box::new(Self) + } +} + +impl CompressorId for NoneCompressor { + const ID: u8 = 0; +} + +impl Compressor for NoneCompressor { + fn encode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> { + Ok(block) + } + + fn decode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> { + Ok(block) + } +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct SnappyCompressor; + +impl SnappyCompressor { + pub fn new() -> Box<dyn Compressor> { + Box::new(Self) + } +} + +impl CompressorId for SnappyCompressor { + const ID: u8 = 1; +} + +impl Compressor for SnappyCompressor { + fn encode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> { + Ok(snap::raw::Encoder::new().compress_vec(&block)?) + } + + fn decode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> { + Ok(snap::raw::Decoder::new().decompress_vec(&block)?) + } +}
--- a/src/db_impl.rs Tue Jun 20 17:52:43 2023 +0200 +++ b/src/db_impl.rs Tue Jul 04 21:50:32 2023 +0800 @@ -110,6 +110,9 @@ /// depends on the options set (`create_if_missing`, `error_if_exists`). pub fn open<P: AsRef<Path>>(name: P, opt: Options) -> Result<DB> { let name = name.as_ref(); + if opt.compressor_list[opt.compressor as usize].is_none() { + err(StatusCode::InvalidOption, "need set compressor")?; + } let mut db = DB::new(name, opt); let mut ve = VersionEdit::new(); let save_manifest = db.recover(&mut ve)?; @@ -889,10 +892,7 @@ Ok(()) } - fn finish_compaction_output( - &mut self, - cs: &mut CompactionState, - ) -> Result<()> { + fn finish_compaction_output(&mut self, cs: &mut CompactionState) -> Result<()> { assert!(cs.builder.is_some()); let output_num = cs.current_output().num; assert!(output_num > 0);
--- a/src/error.rs Tue Jun 20 17:52:43 2023 +0200 +++ b/src/error.rs Tue Jul 04 21:50:32 2023 +0800 @@ -20,6 +20,7 @@ IOError, InvalidArgument, InvalidData, + InvalidOption, LockError, NotFound, NotSupported,
--- a/src/lib.rs Tue Jun 20 17:52:43 2023 +0200 +++ b/src/lib.rs Tue Jul 04 21:50:32 2023 +0800 @@ -73,10 +73,13 @@ mod db_impl; mod db_iter; +pub mod compressor; + #[cfg(feature = "async")] pub use asyncdb::AsyncDB; pub use cmp::{Cmp, DefaultCmp}; +pub use compressor::{Compressor, CompressorId}; pub use db_impl::DB; pub use db_iter::DBIterator; pub use disk_env::PosixDiskEnv; @@ -84,7 +87,7 @@ pub use error::{Result, Status, StatusCode}; pub use filter::{BloomPolicy, FilterPolicy}; pub use mem_env::MemEnv; -pub use options::{in_memory, CompressionType, Options}; +pub use options::{in_memory, Options}; pub use skipmap::SkipMap; pub use types::LdbIterator; pub use write_batch::WriteBatch;
--- a/src/options.rs Tue Jun 20 17:52:43 2023 +0200 +++ b/src/options.rs Tue Jul 04 21:50:32 2023 +0800 @@ -1,6 +1,7 @@ use crate::block::Block; use crate::cache::Cache; use crate::cmp::{Cmp, DefaultCmp}; +use crate::compressor::Compressor; use crate::disk_env; use crate::env::Env; use crate::filter; @@ -19,20 +20,6 @@ const WRITE_BUFFER_SIZE: usize = 4 * MB; const DEFAULT_BITS_PER_KEY: u32 = 10; // NOTE: This may need to be optimized. -#[derive(Clone, Copy, PartialEq, Debug)] -pub enum CompressionType { - CompressionNone = 0, - CompressionSnappy = 1, -} - -pub fn int_to_compressiontype(i: u32) -> Option<CompressionType> { - match i { - 0 => Some(CompressionType::CompressionNone), - 1 => Some(CompressionType::CompressionSnappy), - _ => None, - } -} - /// Options contains general parameters for a LevelDB instance. Most of the names are /// self-explanatory; the defaults are defined in the `Default` implementation. #[derive(Clone)] @@ -51,7 +38,9 @@ pub block_restart_interval: usize, /// Note: you have to open a database with the same compression type as it was written to, in /// order to not lose data! (this is a bug and will be fixed) - pub compression_type: CompressionType, + pub compressor: u8, + + pub compressor_list: Rc<[Option<Box<dyn Compressor>>; 256]>, pub reuse_logs: bool, pub reuse_manifest: bool, pub filter_policy: filter::BoxedFilterPolicy, @@ -59,6 +48,11 @@ impl Default for Options { fn default() -> Options { + const INIT: Option<Box<dyn Compressor>> = None; + let mut compressor_list = [INIT; 256]; + compressor_list[0] = Some(crate::compressor::NoneCompressor::new()); + compressor_list[1] = Some(crate::compressor::SnappyCompressor::new()); + Options { cmp: Rc::new(Box::new(DefaultCmp)), env: Rc::new(Box::new(disk_env::PosixDiskEnv::new())), @@ -75,7 +69,8 @@ block_restart_interval: 16, reuse_logs: true, reuse_manifest: true, - compression_type: CompressionType::CompressionNone, + compressor: 0, + compressor_list: Rc::new(compressor_list), filter_policy: Rc::new(Box::new(filter::BloomPolicy::new(DEFAULT_BITS_PER_KEY))), } }
--- a/src/table_block.rs Tue Jun 20 17:52:43 2023 +0200 +++ b/src/table_block.rs Tue Jul 04 21:50:32 2023 +0800 @@ -5,7 +5,7 @@ use crate::filter; use crate::filter_block::FilterBlockReader; use crate::log::unmask_crc; -use crate::options::{self, CompressionType, Options}; +use crate::options::Options; use crate::table_builder; use crc::crc32::{self, Hasher32}; @@ -69,15 +69,9 @@ ), ); } - - if let Some(ctype) = options::int_to_compressiontype(compress[0] as u32) { - match ctype { - CompressionType::CompressionNone => Ok(Block::new(opt, buf)), - CompressionType::CompressionSnappy => { - let decoded = snap::raw::Decoder::new().decompress_vec(buf.as_slice())?; - Ok(Block::new(opt, decoded)) - } - } + let compressor_list = opt.compressor_list.clone(); + if let Some(compressor) = compressor_list[compress[0] as usize].as_ref() { + Ok(Block::new(opt, compressor.decode(buf)?)) } else { err(StatusCode::InvalidData, "invalid compression type") }
--- a/src/table_builder.rs Tue Jun 20 17:52:43 2023 +0200 +++ b/src/table_builder.rs Tue Jul 04 21:50:32 2023 +0800 @@ -2,12 +2,13 @@ use crate::block_builder::BlockBuilder; use crate::blockhandle::BlockHandle; use crate::cmp::InternalKeyCmp; +use crate::compressor::{self, Compressor, CompressorId}; use crate::error::Result; use crate::filter::{InternalFilterPolicy, NoFilterPolicy}; use crate::filter_block::FilterBlockBuilder; use crate::key_types::InternalKey; use crate::log::mask_crc; -use crate::options::{CompressionType, Options}; +use crate::options::Options; use std::cmp::Ordering; use std::io::Write; @@ -178,8 +179,12 @@ self.prev_block_last_key = Vec::from(block.last_key()); let contents = block.finish(); - let ctype = self.opt.compression_type; - let handle = self.write_block(contents, ctype)?; + let compressor_list = self.opt.compressor_list.clone(); + let compressor = compressor_list[self.opt.compressor as usize] + .as_ref() + .unwrap(); + + let handle = self.write_block(contents, (self.opt.compressor, compressor))?; let mut handle_enc = [0 as u8; 16]; let enc_len = handle.encode_to(&mut handle_enc); @@ -198,19 +203,21 @@ } /// Calculates the checksum, writes the block to disk and updates the offset. - fn write_block(&mut self, block: BlockContents, ctype: CompressionType) -> Result<BlockHandle> { - let mut data = block; - if ctype == CompressionType::CompressionSnappy { - data = snap::raw::Encoder::new().compress_vec(&data)?; - } + fn write_block( + &mut self, + block: BlockContents, + compressor_id_pair: (u8, &Box<dyn Compressor>), + ) -> Result<BlockHandle> { + let (ctype, compressor) = compressor_id_pair; + let data = compressor.encode(block)?; let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); digest.write(&data); - digest.write(&[ctype as u8; TABLE_BLOCK_COMPRESS_LEN]); + digest.write(&[ctype; TABLE_BLOCK_COMPRESS_LEN]); self.dst.write(&data)?; - self.dst.write(&[ctype as u8; TABLE_BLOCK_COMPRESS_LEN])?; + self.dst.write(&[ctype; TABLE_BLOCK_COMPRESS_LEN])?; self.dst.write_fixedint(mask_crc(digest.sum32()))?; let handle = BlockHandle::new(self.offset, data.len()); @@ -221,7 +228,14 @@ pub fn finish(mut self) -> Result<usize> { assert!(self.data_block.is_some()); - let ctype = self.opt.compression_type; + + let compressor_list = self.opt.compressor_list.clone(); + let compressor_id_pair = ( + self.opt.compressor, + compressor_list[self.opt.compressor as usize] + .as_ref() + .unwrap(), + ); // If there's a pending data block, write it if self.data_block.as_ref().unwrap().entries() > 0 { @@ -241,7 +255,13 @@ let fblock = self.filter_block.take().unwrap(); let filter_key = format!("filter.{}", fblock.filter_name()); let fblock_data = fblock.finish(); - let fblock_handle = self.write_block(fblock_data, CompressionType::CompressionNone)?; + let fblock_handle = self.write_block( + fblock_data, + ( + compressor::NoneCompressor::ID, + &compressor::NoneCompressor::new(), + ), + )?; let mut handle_enc = [0 as u8; 16]; let enc_len = fblock_handle.encode_to(&mut handle_enc); @@ -251,11 +271,11 @@ // write metaindex block let meta_ix = meta_ix_block.finish(); - let meta_ix_handle = self.write_block(meta_ix, ctype)?; + let meta_ix_handle = self.write_block(meta_ix, compressor_id_pair)?; // write index block let index_cont = self.index_block.take().unwrap().finish(); - let ix_handle = self.write_block(index_cont, ctype)?; + let ix_handle = self.write_block(index_cont, compressor_id_pair)?; // write footer. let footer = Footer::new(meta_ix_handle, ix_handle); @@ -292,7 +312,7 @@ let mut d = Vec::with_capacity(512); let mut opt = options::for_test(); opt.block_restart_interval = 3; - opt.compression_type = CompressionType::CompressionSnappy; + opt.compressor = compressor::SnappyCompressor::ID; let mut b = TableBuilder::new_raw(opt, &mut d); let data = vec![
--- a/src/table_reader.rs Tue Jun 20 17:52:43 2023 +0200 +++ b/src/table_reader.rs Tue Jul 04 21:50:32 2023 +0800 @@ -374,12 +374,13 @@ #[cfg(test)] mod tests { + use crate::compressor::CompressorId; use crate::filter::BloomPolicy; use crate::key_types::LookupKey; - use crate::options::{self, CompressionType}; use crate::table_builder::TableBuilder; use crate::test_util::{test_iterator_properties, LdbIteratorIter}; use crate::types::{current_key_val, LdbIterator}; + use crate::{compressor, options}; use super::*; @@ -405,7 +406,7 @@ let mut opt = options::for_test(); opt.block_restart_interval = 2; opt.block_size = 32; - opt.compression_type = CompressionType::CompressionSnappy; + opt.compressor = compressor::SnappyCompressor::ID; { // Uses the standard comparator in opt.