changeset 291:d2a96aac4edf

db_impl: Handle empty tables better; add recovery tests.
author Lewin Bormann <lbo@spheniscida.de>
date Tue, 26 Sep 2017 19:01:29 +0200
parents 4c403f9e8391
children fac1ed1b3f7d
files src/db_impl.rs
diffstat 1 files changed, 50 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/src/db_impl.rs	Tue Sep 26 19:00:23 2017 +0200
+++ b/src/db_impl.rs	Tue Sep 26 19:01:29 2017 +0200
@@ -91,7 +91,7 @@
         let mut ve = VersionEdit::new();
         let save_manifest = db.recover(&mut ve)?;
 
-        // Create log file if not existing.
+        // Create log file if an old one is not being reused.
         if db.log.is_none() {
             let lognum = db.vset.new_file_number();
             let logfile =
@@ -227,6 +227,7 @@
 
             batch.set_contents(&scratch);
             batch.insert_into_memtable(batch.sequence(), &mut mem);
+            save_manifest = true;
 
             let last_seq = batch.sequence() + batch.count() as u64 - 1;
             if last_seq > max_seq {
@@ -234,7 +235,6 @@
             }
             if mem.approx_mem_usage() > self.opt.write_buffer_size {
                 compactions += 1;
-                save_manifest = true;
                 self.write_l0_table(&mem, ve, None)?;
                 mem = MemTable::new(cmp.clone());
             }
@@ -244,13 +244,14 @@
         // Check if we can reuse the last log file.
         if self.opt.reuse_logs && is_last && compactions == 0 {
             assert!(self.log.is_none());
+            log!(self.opt.log, "reusing log file {}", filename);
             let oldsize = self.opt.env.size_of(Path::new(&filename))?;
             let oldfile = self.opt.env.open_appendable_file(Path::new(&filename))?;
             let lw = LogWriter::new_with_off(oldfile, oldsize);
             self.log = Some(lw);
             self.log_num = Some(log_num);
             self.mem = mem;
-        } else {
+        } else if mem.len() > 0 {
             // Log is not reused, so write out the accumulated memtable.
             self.write_l0_table(&mem, ve, None)?;
         }
@@ -390,6 +391,12 @@
         let fmd = build_table(&self.name, &self.opt, memt.iter(), num)?;
         log!(self.opt.log, "L0 table {:06} has {} bytes", num, fmd.size);
 
+        // Wrote empty table.
+        if fmd.size == 0 {
+            self.vset.reuse_file_number(num);
+            return Ok(());
+        }
+
         let cache_result = self.cache.borrow_mut().get_table(num);
         if let Err(e) = cache_result {
             log!(self.opt.log,
@@ -572,7 +579,6 @@
     fn delete_obsolete_files(&mut self) -> Result<()> {
         let files = self.vset.live_files();
         let filenames = self.opt.env.children(Path::new(&self.name))?;
-        log!(self.opt.log, "{:?}", filenames);
         for name in filenames {
             if let Ok((num, typ)) = parse_file_name(&name) {
                 log!(self.opt.log, "{} {:?}", num, typ);
@@ -673,7 +679,6 @@
                                    -> Result<FileMetaData> {
     from.reset();
     let filename = table_file_name(dbname, num);
-    let mut md = FileMetaData::default();
 
     let (mut kbuf, mut vbuf) = (vec![], vec![]);
     let mut firstkey = None;
@@ -701,10 +706,15 @@
         return Err(e);
     }
 
-    md.num = num;
-    md.size = opt.env.size_of(Path::new(&filename))?;
-    md.smallest = firstkey.unwrap();
-    md.largest = kbuf;
+    let mut md = FileMetaData::default();
+    if firstkey.is_none() {
+        opt.env.delete(Path::new(&filename)).is_ok();
+    } else {
+        md.num = num;
+        md.size = opt.env.size_of(Path::new(&filename))?;
+        md.smallest = firstkey.unwrap();
+        md.largest = kbuf;
+    }
     Ok(md)
 }
 
@@ -790,21 +800,49 @@
         {
             let db = DB::open("db", opt.clone()).unwrap();
 
+            println!("children after: {:?}",
+                     env.children(Path::new("db/")).unwrap());
             assert!(env.exists(Path::new("db/CURRENT")).unwrap());
+            // Database is initialized.
             assert!(env.exists(Path::new("db/MANIFEST-000001")).unwrap());
             assert!(env.exists(Path::new("db/LOCK")).unwrap());
             assert!(env.exists(Path::new("db/000003.log")).unwrap());
         }
 
         {
-            let mut opt = opt;
+            println!("children before: {:?}",
+                     env.children(Path::new("db/")).unwrap());
+            let mut opt = opt.clone();
             opt.reuse_manifest = false;
             let mut db = DB::open("db", opt).unwrap();
 
+            println!("children after: {:?}",
+                     env.children(Path::new("db/")).unwrap());
             // Obsolete manifest is deleted.
             assert!(!env.exists(Path::new("db/MANIFEST-000001")).unwrap());
-            assert!(env.exists(Path::new("db/000005.log")).unwrap());
-            println!("{:?}", env.children(Path::new("db/")).unwrap());
+            // New manifest is created.
+            assert!(env.exists(Path::new("db/MANIFEST-000002")).unwrap());
+            // Obsolete log file is deleted.
+            assert!(!env.exists(Path::new("db/000003.log")).unwrap());
+            assert!(env.exists(Path::new("db/000004.log")).unwrap());
+        }
+
+        {
+            println!("children before: {:?}",
+                     env.children(Path::new("db/")).unwrap());
+            // reuse_manifest above causes the old manifest to be deleted as obsolete, but no new
+            // manifest is written. CURRENT becomes stale.
+            let mut opt = opt.clone();
+            opt.reuse_logs = true;
+            let mut db = DB::open("db", opt).unwrap();
+
+            println!("children after: {:?}",
+                     env.children(Path::new("db/")).unwrap());
+            assert!(!env.exists(Path::new("db/MANIFEST-000001")).unwrap());
+            assert!(env.exists(Path::new("db/MANIFEST-000002")).unwrap());
+            assert!(env.exists(Path::new("db/000004.log")).unwrap());
+            // 000004 should be reused, no new log file should be created.
+            assert!(!env.exists(Path::new("db/000006.log")).unwrap());
         }
     }