leveldb
log_test.cc
Go to the documentation of this file.
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "db/log_reader.h"
6 #include "db/log_writer.h"
7 #include "leveldb/env.h"
8 #include "util/coding.h"
9 #include "util/crc32c.h"
10 #include "util/random.h"
11 #include "util/testharness.h"
12 
13 namespace leveldb {
14 namespace log {
15 
16 // Construct a string of the specified length made out of the supplied
17 // partial string.
18 static std::string BigString(const std::string& partial_string, size_t n) {
19  std::string result;
20  while (result.size() < n) {
21  result.append(partial_string);
22  }
23  result.resize(n);
24  return result;
25 }
26 
27 // Construct a string from a number
28 static std::string NumberString(int n) {
29  char buf[50];
30  snprintf(buf, sizeof(buf), "%d.", n);
31  return std::string(buf);
32 }
33 
34 // Return a skewed potentially long string
35 static std::string RandomSkewedString(int i, Random* rnd) {
36  return BigString(NumberString(i), rnd->Skewed(17));
37 }
38 
39 class LogTest {
40  private:
41  class StringDest : public WritableFile {
42  public:
43  std::string contents_;
44 
45  virtual Status Close() { return Status::OK(); }
46  virtual Status Flush() { return Status::OK(); }
47  virtual Status Sync() { return Status::OK(); }
48  virtual Status Append(const Slice& slice) {
49  contents_.append(slice.data(), slice.size());
50  return Status::OK();
51  }
52  };
53 
54  class StringSource : public SequentialFile {
55  public:
59  StringSource() : force_error_(false), returned_partial_(false) { }
60 
61  virtual Status Read(size_t n, Slice* result, char* scratch) {
62  ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
63 
64  if (force_error_) {
65  force_error_ = false;
66  returned_partial_ = true;
67  return Status::Corruption("read error");
68  }
69 
70  if (contents_.size() < n) {
71  n = contents_.size();
72  returned_partial_ = true;
73  }
74  *result = Slice(contents_.data(), n);
75  contents_.remove_prefix(n);
76  return Status::OK();
77  }
78 
79  virtual Status Skip(uint64_t n) {
80  if (n > contents_.size()) {
81  contents_.clear();
82  return Status::NotFound("in-memory file skipped past end");
83  }
84 
85  contents_.remove_prefix(n);
86 
87  return Status::OK();
88  }
89  };
90 
92  public:
94  std::string message_;
95 
96  ReportCollector() : dropped_bytes_(0) { }
97  virtual void Corruption(size_t bytes, const Status& status) {
98  dropped_bytes_ += bytes;
99  message_.append(status.ToString());
100  }
101  };
102 
106  bool reading_;
109 
110  // Record metadata for testing initial offset functionality
114 
115  public:
116  LogTest() : reading_(false),
117  writer_(new Writer(&dest_)),
118  reader_(new Reader(&source_, &report_, true/*checksum*/,
119  0/*initial_offset*/)) {
120  }
121 
123  delete writer_;
124  delete reader_;
125  }
126 
128  delete writer_;
129  writer_ = new Writer(&dest_, dest_.contents_.size());
130  }
131 
132  void Write(const std::string& msg) {
133  ASSERT_TRUE(!reading_) << "Write() after starting to read";
134  writer_->AddRecord(Slice(msg));
135  }
136 
137  size_t WrittenBytes() const {
138  return dest_.contents_.size();
139  }
140 
141  std::string Read() {
142  if (!reading_) {
143  reading_ = true;
144  source_.contents_ = Slice(dest_.contents_);
145  }
146  std::string scratch;
147  Slice record;
148  if (reader_->ReadRecord(&record, &scratch)) {
149  return record.ToString();
150  } else {
151  return "EOF";
152  }
153  }
154 
155  void IncrementByte(int offset, int delta) {
156  dest_.contents_[offset] += delta;
157  }
158 
159  void SetByte(int offset, char new_byte) {
160  dest_.contents_[offset] = new_byte;
161  }
162 
163  void ShrinkSize(int bytes) {
164  dest_.contents_.resize(dest_.contents_.size() - bytes);
165  }
166 
167  void FixChecksum(int header_offset, int len) {
168  // Compute crc of type/len/data
169  uint32_t crc = crc32c::Value(&dest_.contents_[header_offset+6], 1 + len);
170  crc = crc32c::Mask(crc);
171  EncodeFixed32(&dest_.contents_[header_offset], crc);
172  }
173 
174  void ForceError() {
175  source_.force_error_ = true;
176  }
177 
178  size_t DroppedBytes() const {
179  return report_.dropped_bytes_;
180  }
181 
182  std::string ReportMessage() const {
183  return report_.message_;
184  }
185 
186  // Returns OK iff recorded error message contains "msg"
187  std::string MatchError(const std::string& msg) const {
188  if (report_.message_.find(msg) == std::string::npos) {
189  return report_.message_;
190  } else {
191  return "OK";
192  }
193  }
194 
196  for (int i = 0; i < num_initial_offset_records_; i++) {
197  std::string record(initial_offset_record_sizes_[i],
198  static_cast<char>('a' + i));
199  Write(record);
200  }
201  }
202 
203  void StartReadingAt(uint64_t initial_offset) {
204  delete reader_;
205  reader_ = new Reader(&source_, &report_, true/*checksum*/, initial_offset);
206  }
207 
208  void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
210  reading_ = true;
211  source_.contents_ = Slice(dest_.contents_);
212  Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
213  WrittenBytes() + offset_past_end);
214  Slice record;
215  std::string scratch;
216  ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
217  delete offset_reader;
218  }
219 
220  void CheckInitialOffsetRecord(uint64_t initial_offset,
221  int expected_record_offset) {
223  reading_ = true;
224  source_.contents_ = Slice(dest_.contents_);
225  Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
226  initial_offset);
227 
228  // Read all records from expected_record_offset through the last one.
229  ASSERT_LT(expected_record_offset, num_initial_offset_records_);
230  for (; expected_record_offset < num_initial_offset_records_;
231  ++expected_record_offset) {
232  Slice record;
233  std::string scratch;
234  ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
235  ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
236  record.size());
237  ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
238  offset_reader->LastRecordOffset());
239  ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
240  }
241  delete offset_reader;
242  }
243 };
244 
246  {10000, // Two sizable records in first block
247  10000,
248  2 * log::kBlockSize - 1000, // Span three blocks
249  1,
250  13716, // Consume all but two bytes of block 3.
251  log::kBlockSize - kHeaderSize, // Consume the entirety of block 4.
252  };
253 
255  {0,
256  kHeaderSize + 10000,
257  2 * (kHeaderSize + 10000),
258  2 * (kHeaderSize + 10000) +
259  (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
260  2 * (kHeaderSize + 10000) +
261  (2 * log::kBlockSize - 1000) + 3 * kHeaderSize
262  + kHeaderSize + 1,
263  3 * log::kBlockSize,
264  };
265 
266 // LogTest::initial_offset_last_record_offsets_ must be defined before this.
268  sizeof(LogTest::initial_offset_last_record_offsets_)/sizeof(uint64_t);
269 
270 TEST(LogTest, Empty) {
271  ASSERT_EQ("EOF", Read());
272 }
273 
274 TEST(LogTest, ReadWrite) {
275  Write("foo");
276  Write("bar");
277  Write("");
278  Write("xxxx");
279  ASSERT_EQ("foo", Read());
280  ASSERT_EQ("bar", Read());
281  ASSERT_EQ("", Read());
282  ASSERT_EQ("xxxx", Read());
283  ASSERT_EQ("EOF", Read());
284  ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
285 }
286 
287 TEST(LogTest, ManyBlocks) {
288  for (int i = 0; i < 100000; i++) {
289  Write(NumberString(i));
290  }
291  for (int i = 0; i < 100000; i++) {
292  ASSERT_EQ(NumberString(i), Read());
293  }
294  ASSERT_EQ("EOF", Read());
295 }
296 
297 TEST(LogTest, Fragmentation) {
298  Write("small");
299  Write(BigString("medium", 50000));
300  Write(BigString("large", 100000));
301  ASSERT_EQ("small", Read());
302  ASSERT_EQ(BigString("medium", 50000), Read());
303  ASSERT_EQ(BigString("large", 100000), Read());
304  ASSERT_EQ("EOF", Read());
305 }
306 
307 TEST(LogTest, MarginalTrailer) {
308  // Make a trailer that is exactly the same length as an empty record.
309  const int n = kBlockSize - 2*kHeaderSize;
310  Write(BigString("foo", n));
311  ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
312  Write("");
313  Write("bar");
314  ASSERT_EQ(BigString("foo", n), Read());
315  ASSERT_EQ("", Read());
316  ASSERT_EQ("bar", Read());
317  ASSERT_EQ("EOF", Read());
318 }
319 
320 TEST(LogTest, MarginalTrailer2) {
321  // Make a trailer that is exactly the same length as an empty record.
322  const int n = kBlockSize - 2*kHeaderSize;
323  Write(BigString("foo", n));
324  ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
325  Write("bar");
326  ASSERT_EQ(BigString("foo", n), Read());
327  ASSERT_EQ("bar", Read());
328  ASSERT_EQ("EOF", Read());
329  ASSERT_EQ(0, DroppedBytes());
330  ASSERT_EQ("", ReportMessage());
331 }
332 
333 TEST(LogTest, ShortTrailer) {
334  const int n = kBlockSize - 2*kHeaderSize + 4;
335  Write(BigString("foo", n));
336  ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
337  Write("");
338  Write("bar");
339  ASSERT_EQ(BigString("foo", n), Read());
340  ASSERT_EQ("", Read());
341  ASSERT_EQ("bar", Read());
342  ASSERT_EQ("EOF", Read());
343 }
344 
345 TEST(LogTest, AlignedEof) {
346  const int n = kBlockSize - 2*kHeaderSize + 4;
347  Write(BigString("foo", n));
348  ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
349  ASSERT_EQ(BigString("foo", n), Read());
350  ASSERT_EQ("EOF", Read());
351 }
352 
353 TEST(LogTest, OpenForAppend) {
354  Write("hello");
355  ReopenForAppend();
356  Write("world");
357  ASSERT_EQ("hello", Read());
358  ASSERT_EQ("world", Read());
359  ASSERT_EQ("EOF", Read());
360 }
361 
362 TEST(LogTest, RandomRead) {
363  const int N = 500;
364  Random write_rnd(301);
365  for (int i = 0; i < N; i++) {
366  Write(RandomSkewedString(i, &write_rnd));
367  }
368  Random read_rnd(301);
369  for (int i = 0; i < N; i++) {
370  ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
371  }
372  ASSERT_EQ("EOF", Read());
373 }
374 
375 // Tests of all the error paths in log_reader.cc follow:
376 
377 TEST(LogTest, ReadError) {
378  Write("foo");
379  ForceError();
380  ASSERT_EQ("EOF", Read());
382  ASSERT_EQ("OK", MatchError("read error"));
383 }
384 
385 TEST(LogTest, BadRecordType) {
386  Write("foo");
387  // Type is stored in header[6]
388  IncrementByte(6, 100);
389  FixChecksum(0, 3);
390  ASSERT_EQ("EOF", Read());
391  ASSERT_EQ(3, DroppedBytes());
392  ASSERT_EQ("OK", MatchError("unknown record type"));
393 }
394 
395 TEST(LogTest, TruncatedTrailingRecordIsIgnored) {
396  Write("foo");
397  ShrinkSize(4); // Drop all payload as well as a header byte
398  ASSERT_EQ("EOF", Read());
399  // Truncated last record is ignored, not treated as an error.
400  ASSERT_EQ(0, DroppedBytes());
401  ASSERT_EQ("", ReportMessage());
402 }
403 
404 TEST(LogTest, BadLength) {
405  const int kPayloadSize = kBlockSize - kHeaderSize;
406  Write(BigString("bar", kPayloadSize));
407  Write("foo");
408  // Least significant size byte is stored in header[4].
409  IncrementByte(4, 1);
410  ASSERT_EQ("foo", Read());
412  ASSERT_EQ("OK", MatchError("bad record length"));
413 }
414 
415 TEST(LogTest, BadLengthAtEndIsIgnored) {
416  Write("foo");
417  ShrinkSize(1);
418  ASSERT_EQ("EOF", Read());
419  ASSERT_EQ(0, DroppedBytes());
420  ASSERT_EQ("", ReportMessage());
421 }
422 
423 TEST(LogTest, ChecksumMismatch) {
424  Write("foo");
425  IncrementByte(0, 10);
426  ASSERT_EQ("EOF", Read());
427  ASSERT_EQ(10, DroppedBytes());
428  ASSERT_EQ("OK", MatchError("checksum mismatch"));
429 }
430 
431 TEST(LogTest, UnexpectedMiddleType) {
432  Write("foo");
433  SetByte(6, kMiddleType);
434  FixChecksum(0, 3);
435  ASSERT_EQ("EOF", Read());
436  ASSERT_EQ(3, DroppedBytes());
437  ASSERT_EQ("OK", MatchError("missing start"));
438 }
439 
440 TEST(LogTest, UnexpectedLastType) {
441  Write("foo");
442  SetByte(6, kLastType);
443  FixChecksum(0, 3);
444  ASSERT_EQ("EOF", Read());
445  ASSERT_EQ(3, DroppedBytes());
446  ASSERT_EQ("OK", MatchError("missing start"));
447 }
448 
449 TEST(LogTest, UnexpectedFullType) {
450  Write("foo");
451  Write("bar");
452  SetByte(6, kFirstType);
453  FixChecksum(0, 3);
454  ASSERT_EQ("bar", Read());
455  ASSERT_EQ("EOF", Read());
456  ASSERT_EQ(3, DroppedBytes());
457  ASSERT_EQ("OK", MatchError("partial record without end"));
458 }
459 
460 TEST(LogTest, UnexpectedFirstType) {
461  Write("foo");
462  Write(BigString("bar", 100000));
463  SetByte(6, kFirstType);
464  FixChecksum(0, 3);
465  ASSERT_EQ(BigString("bar", 100000), Read());
466  ASSERT_EQ("EOF", Read());
467  ASSERT_EQ(3, DroppedBytes());
468  ASSERT_EQ("OK", MatchError("partial record without end"));
469 }
470 
471 TEST(LogTest, MissingLastIsIgnored) {
472  Write(BigString("bar", kBlockSize));
473  // Remove the LAST block, including header.
474  ShrinkSize(14);
475  ASSERT_EQ("EOF", Read());
476  ASSERT_EQ("", ReportMessage());
477  ASSERT_EQ(0, DroppedBytes());
478 }
479 
480 TEST(LogTest, PartialLastIsIgnored) {
481  Write(BigString("bar", kBlockSize));
482  // Cause a bad record length in the LAST block.
483  ShrinkSize(1);
484  ASSERT_EQ("EOF", Read());
485  ASSERT_EQ("", ReportMessage());
486  ASSERT_EQ(0, DroppedBytes());
487 }
488 
489 TEST(LogTest, SkipIntoMultiRecord) {
490  // Consider a fragmented record:
491  // first(R1), middle(R1), last(R1), first(R2)
492  // If initial_offset points to a record after first(R1) but before first(R2)
493  // incomplete fragment errors are not actual errors, and must be suppressed
494  // until a new first or full record is encountered.
495  Write(BigString("foo", 3*kBlockSize));
496  Write("correct");
498 
499  ASSERT_EQ("correct", Read());
500  ASSERT_EQ("", ReportMessage());
501  ASSERT_EQ(0, DroppedBytes());
502  ASSERT_EQ("EOF", Read());
503 }
504 
505 TEST(LogTest, ErrorJoinsRecords) {
506  // Consider two fragmented records:
507  // first(R1) last(R1) first(R2) last(R2)
508  // where the middle two fragments disappear. We do not want
509  // first(R1),last(R2) to get joined and returned as a valid record.
510 
511  // Write records that span two blocks
512  Write(BigString("foo", kBlockSize));
513  Write(BigString("bar", kBlockSize));
514  Write("correct");
515 
516  // Wipe the middle block
517  for (int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
518  SetByte(offset, 'x');
519  }
520 
521  ASSERT_EQ("correct", Read());
522  ASSERT_EQ("EOF", Read());
523  const size_t dropped = DroppedBytes();
524  ASSERT_LE(dropped, 2*kBlockSize + 100);
525  ASSERT_GE(dropped, 2*kBlockSize);
526 }
527 
528 TEST(LogTest, ReadStart) {
530 }
531 
532 TEST(LogTest, ReadSecondOneOff) {
534 }
535 
536 TEST(LogTest, ReadSecondTenThousand) {
537  CheckInitialOffsetRecord(10000, 1);
538 }
539 
540 TEST(LogTest, ReadSecondStart) {
541  CheckInitialOffsetRecord(10007, 1);
542 }
543 
544 TEST(LogTest, ReadThirdOneOff) {
545  CheckInitialOffsetRecord(10008, 2);
546 }
547 
548 TEST(LogTest, ReadThirdStart) {
549  CheckInitialOffsetRecord(20014, 2);
550 }
551 
552 TEST(LogTest, ReadFourthOneOff) {
553  CheckInitialOffsetRecord(20015, 3);
554 }
555 
556 TEST(LogTest, ReadFourthFirstBlockTrailer) {
558 }
559 
560 TEST(LogTest, ReadFourthMiddleBlock) {
562 }
563 
564 TEST(LogTest, ReadFourthLastBlock) {
566 }
567 
568 TEST(LogTest, ReadFourthStart) {
570  2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
571  3);
572 }
573 
574 TEST(LogTest, ReadInitialOffsetIntoBlockPadding) {
576 }
577 
578 TEST(LogTest, ReadEnd) {
580 }
581 
582 TEST(LogTest, ReadPastEnd) {
584 }
585 
586 } // namespace log
587 } // namespace leveldb
588 
589 int main(int argc, char** argv) {
591 }
static Status NotFound(const Slice &msg, const Slice &msg2=Slice())
Definition: status.h:35
static uint64_t initial_offset_last_record_offsets_[]
Definition: log_test.cc:112
void IncrementByte(int offset, int delta)
Definition: log_test.cc:155
#define ASSERT_LE(a, b)
Definition: testharness.h:111
static const int kHeaderSize
Definition: log_format.h:30
virtual Status Append(const Slice &slice)
Definition: log_test.cc:48
int main(int argc, char **argv)
Definition: log_test.cc:589
void EncodeFixed32(char *buf, uint32_t value)
Definition: coding.cc:9
static std::string BigString(const std::string &partial_string, size_t n)
Definition: log_test.cc:18
int RunAllTests()
Definition: testharness.cc:36
size_t DroppedBytes() const
Definition: log_test.cc:178
std::string ToString() const
Definition: slice.h:66
static const int kBlockSize
Definition: log_format.h:27
uint32_t Mask(uint32_t crc)
Definition: crc32c.h:31
TEST(LogTest, Empty)
Definition: log_test.cc:270
void WriteInitialOffsetLog()
Definition: log_test.cc:195
#define ASSERT_EQ(a, b)
Definition: testharness.h:107
static size_t initial_offset_record_sizes_[]
Definition: log_test.cc:111
static Status Corruption(const Slice &msg, const Slice &msg2=Slice())
Definition: status.h:38
static int num_initial_offset_records_
Definition: log_test.cc:113
void FixChecksum(int header_offset, int len)
Definition: log_test.cc:167
std::string ToString() const
Definition: status.cc:36
static Status OK()
Definition: status.h:32
bool ReadRecord(Slice *record, std::string *scratch)
Definition: log_reader.cc:60
virtual void Corruption(size_t bytes, const Status &status)
Definition: log_test.cc:97
virtual Status Skip(uint64_t n)
Definition: log_test.cc:79
#define ASSERT_LT(a, b)
Definition: testharness.h:112
#define ASSERT_TRUE(c)
Definition: testharness.h:105
void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end)
Definition: log_test.cc:208
void Write(const std::string &msg)
Definition: log_test.cc:132
#define ASSERT_GE(a, b)
Definition: testharness.h:109
uint32_t Skewed(int max_log)
Definition: random.h:57
void remove_prefix(size_t n)
Definition: slice.h:59
void StartReadingAt(uint64_t initial_offset)
Definition: log_test.cc:203
uint32_t Value(const char *data, size_t n)
Definition: crc32c.h:20
static std::string NumberString(int n)
Definition: log_test.cc:28
Status AddRecord(const Slice &slice)
Definition: log_writer.cc:36
void clear()
Definition: slice.h:56
StringSource source_
Definition: log_test.cc:104
static std::string RandomSkewedString(int i, Random *rnd)
Definition: log_test.cc:35
ReportCollector report_
Definition: log_test.cc:105
std::string ReportMessage() const
Definition: log_test.cc:182
const char * data() const
Definition: slice.h:40
virtual Status Read(size_t n, Slice *result, char *scratch)
Definition: log_test.cc:61
void SetByte(int offset, char new_byte)
Definition: log_test.cc:159
size_t WrittenBytes() const
Definition: log_test.cc:137
size_t size() const
Definition: slice.h:43
uint64_t LastRecordOffset()
Definition: log_reader.cc:184
void CheckInitialOffsetRecord(uint64_t initial_offset, int expected_record_offset)
Definition: log_test.cc:220
std::string MatchError(const std::string &msg) const
Definition: log_test.cc:187
std::string Read()
Definition: log_test.cc:141
void ShrinkSize(int bytes)
Definition: log_test.cc:163