Mercurial > lbo > hg > sstable
view src/table_block.rs @ 119:f159b9014b75 default tip
Added tag v0.10.0 for changeset f75ba3eceb26
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Wed, 29 Dec 2021 10:05:08 +0100 |
parents | f76c3689831a |
children |
line wrap: on
line source
use crate::block::Block; use crate::blockhandle::BlockHandle; use crate::error::{err, Result, StatusCode}; use crate::filter; use crate::filter_block::FilterBlockReader; use crate::options::{self, CompressionType, Options}; use crate::table_builder; use crate::types::{unmask_crc, RandomAccess}; use crc::crc32::{self, Hasher32}; use integer_encoding::FixedInt; use snap::Decoder; /// Reads the data for the specified block handle from a file. fn read_bytes(f: &dyn RandomAccess, location: &BlockHandle) -> Result<Vec<u8>> { let mut buf = vec![0; location.size()]; f.read_at(location.offset(), &mut buf).map(|_| buf) } /// Reads a serialized filter block from a file and returns a FilterBlockReader. pub fn read_filter_block( src: &dyn RandomAccess, location: &BlockHandle, policy: filter::BoxedFilterPolicy, ) -> Result<FilterBlockReader> { if location.size() == 0 { return err( StatusCode::InvalidArgument, "no filter block in empty location", ); } let buf = read_bytes(src, location)?; Ok(FilterBlockReader::new_owned(policy, buf)) } /// Reads a table block from a random-access source. /// A table block consists of [bytes..., compress (1B), checksum (4B)]; the handle only refers to /// the location and length of [bytes...]. pub fn read_table_block( opt: Options, f: &dyn RandomAccess, location: &BlockHandle, ) -> Result<Block> { // The block is denoted by offset and length in BlockHandle. A block in an encoded // table is followed by 1B compression type and 4B checksum. // The checksum refers to the compressed contents. let block_data_size = location.size(); let location = BlockHandle::new( location.offset(), block_data_size + table_builder::TABLE_BLOCK_CKSUM_LEN + table_builder::TABLE_BLOCK_COMPRESS_LEN, ); let mut buf = read_bytes(f, &location)?; let mut compress = buf.split_off(block_data_size); let cksum = compress.split_off(table_builder::TABLE_BLOCK_COMPRESS_LEN); if !verify_table_block(&buf, compress[0], unmask_crc(u32::decode_fixed(&cksum))) { return err( StatusCode::Corruption, &format!( "checksum verification failed for block at {}", location.offset() ), ); } if let Some(ctype) = options::int_to_compressiontype(compress[0] as u32) { match ctype { CompressionType::CompressionNone => Ok(Block::new(opt, buf)), CompressionType::CompressionSnappy => { let decoded = Decoder::new().decompress_vec(&buf)?; Ok(Block::new(opt, decoded)) } } } else { err(StatusCode::InvalidData, "invalid compression type") } } /// Verify checksum of block fn verify_table_block(data: &[u8], compression: u8, want: u32) -> bool { let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); digest.write(data); digest.write(&[compression; 1]); digest.sum32() == want }