leveldb
corruption_test.cc
Go to the documentation of this file.
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "leveldb/db.h"
6 
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <sys/stat.h>
10 #include <sys/types.h>
11 #include "leveldb/cache.h"
12 #include "leveldb/env.h"
13 #include "leveldb/table.h"
14 #include "leveldb/write_batch.h"
15 #include "db/db_impl.h"
16 #include "db/filename.h"
17 #include "db/log_format.h"
18 #include "db/version_set.h"
19 #include "util/logging.h"
20 #include "util/testharness.h"
21 #include "util/testutil.h"
22 
23 namespace leveldb {
24 
25 static const int kValueSize = 1000;
26 
28  public:
30  std::string dbname_;
33  DB* db_;
34 
36  tiny_cache_ = NewLRUCache(100);
37  options_.env = &env_;
38  options_.block_cache = tiny_cache_;
39  dbname_ = test::TmpDir() + "/corruption_test";
40  DestroyDB(dbname_, options_);
41 
42  db_ = NULL;
43  options_.create_if_missing = true;
44  Reopen();
45  options_.create_if_missing = false;
46  }
47 
49  delete db_;
50  DestroyDB(dbname_, Options());
51  delete tiny_cache_;
52  }
53 
55  delete db_;
56  db_ = NULL;
57  return DB::Open(options_, dbname_, &db_);
58  }
59 
60  void Reopen() {
62  }
63 
64  void RepairDB() {
65  delete db_;
66  db_ = NULL;
67  ASSERT_OK(::leveldb::RepairDB(dbname_, options_));
68  }
69 
70  void Build(int n) {
71  std::string key_space, value_space;
72  WriteBatch batch;
73  for (int i = 0; i < n; i++) {
74  //if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n);
75  Slice key = Key(i, &key_space);
76  batch.Clear();
77  batch.Put(key, Value(i, &value_space));
78  WriteOptions options;
79  // Corrupt() doesn't work without this sync on windows; stat reports 0 for
80  // the file size.
81  if (i == n - 1) {
82  options.sync = true;
83  }
84  ASSERT_OK(db_->Write(options, &batch));
85  }
86  }
87 
88  void Check(int min_expected, int max_expected) {
89  int next_expected = 0;
90  int missed = 0;
91  int bad_keys = 0;
92  int bad_values = 0;
93  int correct = 0;
94  std::string value_space;
95  Iterator* iter = db_->NewIterator(ReadOptions());
96  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
97  uint64_t key;
98  Slice in(iter->key());
99  if (in == "" || in == "~") {
100  // Ignore boundary keys.
101  continue;
102  }
103  if (!ConsumeDecimalNumber(&in, &key) ||
104  !in.empty() ||
105  key < next_expected) {
106  bad_keys++;
107  continue;
108  }
109  missed += (key - next_expected);
110  next_expected = key + 1;
111  if (iter->value() != Value(key, &value_space)) {
112  bad_values++;
113  } else {
114  correct++;
115  }
116  }
117  delete iter;
118 
119  fprintf(stderr,
120  "expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%d\n",
121  min_expected, max_expected, correct, bad_keys, bad_values, missed);
122  ASSERT_LE(min_expected, correct);
123  ASSERT_GE(max_expected, correct);
124  }
125 
126  void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) {
127  // Pick file to corrupt
128  std::vector<std::string> filenames;
129  ASSERT_OK(env_.GetChildren(dbname_, &filenames));
130  uint64_t number;
131  FileType type;
132  std::string fname;
133  int picked_number = -1;
134  for (size_t i = 0; i < filenames.size(); i++) {
135  if (ParseFileName(filenames[i], &number, &type) &&
136  type == filetype &&
137  int(number) > picked_number) { // Pick latest file
138  fname = dbname_ + "/" + filenames[i];
139  picked_number = number;
140  }
141  }
142  ASSERT_TRUE(!fname.empty()) << filetype;
143 
144  struct stat sbuf;
145  if (stat(fname.c_str(), &sbuf) != 0) {
146  const char* msg = strerror(errno);
147  ASSERT_TRUE(false) << fname << ": " << msg;
148  }
149 
150  if (offset < 0) {
151  // Relative to end of file; make it absolute
152  if (-offset > sbuf.st_size) {
153  offset = 0;
154  } else {
155  offset = sbuf.st_size + offset;
156  }
157  }
158  if (offset > sbuf.st_size) {
159  offset = sbuf.st_size;
160  }
161  if (offset + bytes_to_corrupt > sbuf.st_size) {
162  bytes_to_corrupt = sbuf.st_size - offset;
163  }
164 
165  // Do it
166  std::string contents;
167  Status s = ReadFileToString(Env::Default(), fname, &contents);
168  ASSERT_TRUE(s.ok()) << s.ToString();
169  for (int i = 0; i < bytes_to_corrupt; i++) {
170  contents[i + offset] ^= 0x80;
171  }
172  s = WriteStringToFile(Env::Default(), contents, fname);
173  ASSERT_TRUE(s.ok()) << s.ToString();
174  }
175 
176  int Property(const std::string& name) {
177  std::string property;
178  int result;
179  if (db_->GetProperty(name, &property) &&
180  sscanf(property.c_str(), "%d", &result) == 1) {
181  return result;
182  } else {
183  return -1;
184  }
185  }
186 
187  // Return the ith key
188  Slice Key(int i, std::string* storage) {
189  char buf[100];
190  snprintf(buf, sizeof(buf), "%016d", i);
191  storage->assign(buf, strlen(buf));
192  return Slice(*storage);
193  }
194 
195  // Return the value to associate with the specified key
196  Slice Value(int k, std::string* storage) {
197  Random r(k);
198  return test::RandomString(&r, kValueSize, storage);
199  }
200 };
201 
202 TEST(CorruptionTest, Recovery) {
203  Build(100);
204  Check(100, 100);
205  Corrupt(kLogFile, 19, 1); // WriteBatch tag for first record
206  Corrupt(kLogFile, log::kBlockSize + 1000, 1); // Somewhere in second block
207  Reopen();
208 
209  // The 64 records in the first two log blocks are completely lost.
210  Check(36, 36);
211 }
212 
213 TEST(CorruptionTest, RecoverWriteError) {
214  env_.writable_file_error_ = true;
215  Status s = TryReopen();
216  ASSERT_TRUE(!s.ok());
217 }
218 
219 TEST(CorruptionTest, NewFileErrorDuringWrite) {
220  // Do enough writing to force minor compaction
221  env_.writable_file_error_ = true;
222  const int num = 3 + (Options().write_buffer_size / kValueSize);
223  std::string value_storage;
224  Status s;
225  for (int i = 0; s.ok() && i < num; i++) {
226  WriteBatch batch;
227  batch.Put("a", Value(100, &value_storage));
228  s = db_->Write(WriteOptions(), &batch);
229  }
230  ASSERT_TRUE(!s.ok());
232  env_.writable_file_error_ = false;
233  Reopen();
234 }
235 
236 TEST(CorruptionTest, TableFile) {
237  Build(100);
238  DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
239  dbi->TEST_CompactMemTable();
240  dbi->TEST_CompactRange(0, NULL, NULL);
241  dbi->TEST_CompactRange(1, NULL, NULL);
242 
243  Corrupt(kTableFile, 100, 1);
244  Check(90, 99);
245 }
246 
247 TEST(CorruptionTest, TableFileRepair) {
248  options_.block_size = 2 * kValueSize; // Limit scope of corruption
249  options_.paranoid_checks = true;
250  Reopen();
251  Build(100);
252  DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
253  dbi->TEST_CompactMemTable();
254  dbi->TEST_CompactRange(0, NULL, NULL);
255  dbi->TEST_CompactRange(1, NULL, NULL);
256 
257  Corrupt(kTableFile, 100, 1);
258  RepairDB();
259  Reopen();
260  Check(95, 99);
261 }
262 
263 TEST(CorruptionTest, TableFileIndexData) {
264  Build(10000); // Enough to build multiple Tables
265  DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
266  dbi->TEST_CompactMemTable();
267 
268  Corrupt(kTableFile, -2000, 500);
269  Reopen();
270  Check(5000, 9999);
271 }
272 
273 TEST(CorruptionTest, MissingDescriptor) {
274  Build(1000);
275  RepairDB();
276  Reopen();
277  Check(1000, 1000);
278 }
279 
280 TEST(CorruptionTest, SequenceNumberRecovery) {
281  ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1"));
282  ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2"));
283  ASSERT_OK(db_->Put(WriteOptions(), "foo", "v3"));
284  ASSERT_OK(db_->Put(WriteOptions(), "foo", "v4"));
285  ASSERT_OK(db_->Put(WriteOptions(), "foo", "v5"));
286  RepairDB();
287  Reopen();
288  std::string v;
289  ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
290  ASSERT_EQ("v5", v);
291  // Write something. If sequence number was not recovered properly,
292  // it will be hidden by an earlier write.
293  ASSERT_OK(db_->Put(WriteOptions(), "foo", "v6"));
294  ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
295  ASSERT_EQ("v6", v);
296  Reopen();
297  ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
298  ASSERT_EQ("v6", v);
299 }
300 
301 TEST(CorruptionTest, CorruptedDescriptor) {
302  ASSERT_OK(db_->Put(WriteOptions(), "foo", "hello"));
303  DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
304  dbi->TEST_CompactMemTable();
305  dbi->TEST_CompactRange(0, NULL, NULL);
306 
307  Corrupt(kDescriptorFile, 0, 1000);
308  Status s = TryReopen();
309  ASSERT_TRUE(!s.ok());
310 
311  RepairDB();
312  Reopen();
313  std::string v;
314  ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
315  ASSERT_EQ("hello", v);
316 }
317 
318 TEST(CorruptionTest, CompactionInputError) {
319  Build(10);
320  DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
321  dbi->TEST_CompactMemTable();
322  const int last = config::kMaxMemCompactLevel;
323  ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));
324 
325  Corrupt(kTableFile, 100, 1);
326  Check(5, 9);
327 
328  // Force compactions by writing lots of values
329  Build(10000);
330  Check(10000, 10000);
331 }
332 
333 TEST(CorruptionTest, CompactionInputErrorParanoid) {
334  options_.paranoid_checks = true;
335  options_.write_buffer_size = 512 << 10;
336  Reopen();
337  DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
338 
339  // Make multiple inputs so we need to compact.
340  for (int i = 0; i < 2; i++) {
341  Build(10);
342  dbi->TEST_CompactMemTable();
343  Corrupt(kTableFile, 100, 1);
344  env_.SleepForMicroseconds(100000);
345  }
346  dbi->CompactRange(NULL, NULL);
347 
348  // Write must fail because of corrupted table
349  std::string tmp1, tmp2;
350  Status s = db_->Put(WriteOptions(), Key(5, &tmp1), Value(5, &tmp2));
351  ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
352 }
353 
354 TEST(CorruptionTest, UnrelatedKeys) {
355  Build(10);
356  DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
357  dbi->TEST_CompactMemTable();
358  Corrupt(kTableFile, 100, 1);
359 
360  std::string tmp1, tmp2;
361  ASSERT_OK(db_->Put(WriteOptions(), Key(1000, &tmp1), Value(1000, &tmp2)));
362  std::string v;
363  ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
364  ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
365  dbi->TEST_CompactMemTable();
366  ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
367  ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
368 }
369 
370 } // namespace leveldb
371 
372 int main(int argc, char** argv) {
374 }
void TEST_CompactRange(int level, const Slice *begin, const Slice *end)
Definition: db_impl.cc:584
Status WriteStringToFile(Env *env, const Slice &data, const std::string &fname)
Definition: env.cc:62
bool ParseFileName(const std::string &fname, uint64_t *number, FileType *type)
Definition: filename.cc:80
size_t write_buffer_size
Definition: options.h:83
std::string TmpDir()
Definition: testharness.cc:60
int main(int argc, char **argv)
void Corrupt(FileType filetype, int offset, int bytes_to_corrupt)
virtual Slice key() const =0
#define ASSERT_LE(a, b)
Definition: testharness.h:111
virtual Slice value() const =0
#define ASSERT_OK(s)
Definition: testharness.h:106
void SleepForMicroseconds(int micros)
Definition: env.h:342
virtual void SeekToFirst()=0
int RunAllTests()
Definition: testharness.cc:36
static const int kValueSize
virtual void Next()=0
static const int kBlockSize
Definition: log_format.h:27
virtual Status Put(const WriteOptions &options, const Slice &key, const Slice &value)=0
Definition: db_impl.cc:1476
Status RepairDB(const std::string &dbname, const Options &options)
Definition: repair.cc:456
#define ASSERT_EQ(a, b)
Definition: testharness.h:107
Status DestroyDB(const std::string &dbname, const Options &options)
Definition: db_impl.cc:1537
Slice Key(int i, std::string *storage)
bool ConsumeDecimalNumber(Slice *in, uint64_t *val)
Definition: logging.cc:48
virtual bool GetProperty(const Slice &property, std::string *value)=0
std::string ToString() const
Definition: status.cc:36
int Property(const std::string &name)
virtual Status Get(const ReadOptions &options, const Slice &key, std::string *value)=0
static Status Open(const Options &options, const std::string &name, DB **dbptr)
Definition: db_impl.cc:1490
virtual void CompactRange(const Slice *begin, const Slice *end)
Definition: db_impl.cc:567
static const int kMaxMemCompactLevel
Definition: dbformat.h:39
Status TEST_CompactMemTable()
Definition: db_impl.cc:621
#define ASSERT_TRUE(c)
Definition: testharness.h:105
TEST(AutoCompactTest, ReadAll)
Slice Value(int k, std::string *storage)
bool create_if_missing
Definition: options.h:45
#define ASSERT_GE(a, b)
Definition: testharness.h:109
Cache * block_cache
Definition: options.h:98
virtual Status Write(const WriteOptions &options, WriteBatch *updates)=0
Cache * NewLRUCache(size_t capacity)
Definition: cache.cc:401
bool paranoid_checks
Definition: options.h:57
bool ok() const
Definition: status.h:52
virtual Iterator * NewIterator(const ReadOptions &options)=0
size_t block_size
Definition: options.h:106
FileType
Definition: filename.h:20
Slice RandomString(Random *rnd, int len, std::string *dst)
Definition: testutil.cc:12
Status GetChildren(const std::string &dir, std::vector< std::string > *r)
Definition: env.h:311
static Env * Default()
Definition: env_posix.cc:613
Definition: db.h:44
virtual bool Valid() const =0
void Put(const Slice &key, const Slice &value)
Definition: write_batch.cc:98
std::string NumberToString(uint64_t num)
Definition: logging.cc:36
void Check(int min_expected, int max_expected)
Status ReadFileToString(Env *env, const std::string &fname, std::string *data)
Definition: env.cc:72