leveldb
db_impl.cc
Go to the documentation of this file.
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "db/db_impl.h"
6 
7 #include <algorithm>
8 #include <set>
9 #include <string>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <vector>
13 #include "db/builder.h"
14 #include "db/db_iter.h"
15 #include "db/dbformat.h"
16 #include "db/filename.h"
17 #include "db/log_reader.h"
18 #include "db/log_writer.h"
19 #include "db/memtable.h"
20 #include "db/table_cache.h"
21 #include "db/version_set.h"
23 #include "leveldb/db.h"
24 #include "leveldb/env.h"
25 #include "leveldb/status.h"
26 #include "leveldb/table.h"
27 #include "leveldb/table_builder.h"
28 #include "port/port.h"
29 #include "table/block.h"
30 #include "table/merger.h"
32 #include "util/coding.h"
33 #include "util/logging.h"
34 #include "util/mutexlock.h"
35 
36 namespace leveldb {
37 
38 const int kNumNonTableCacheFiles = 10;
39 
40 // Information kept for every waiting writer
44  bool sync;
45  bool done;
46  port::CondVar cv;
47 
48  explicit Writer(port::Mutex* mu) : cv(mu) { }
49 };
50 
53 
54  // Sequence numbers < smallest_snapshot are not significant since we
55  // will never have to service a snapshot below smallest_snapshot.
56  // Therefore if we have seen a sequence number S <= smallest_snapshot,
57  // we can drop all entries for the same key with sequence numbers < S.
59 
60  // Files produced by compaction
61  struct Output {
62  uint64_t number;
63  uint64_t file_size;
65  };
66  std::vector<Output> outputs;
67 
68  // State kept for output being generated
71 
72  uint64_t total_bytes;
73 
74  Output* current_output() { return &outputs[outputs.size()-1]; }
75 
77  : compaction(c),
78  outfile(NULL),
79  builder(NULL),
80  total_bytes(0) {
81  }
82 };
83 
84 // Fix user-supplied options to be reasonable
85 template <class T,class V>
86 static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
87  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
88  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
89 }
90 Options SanitizeOptions(const std::string& dbname,
91  const InternalKeyComparator* icmp,
92  const InternalFilterPolicy* ipolicy,
93  const Options& src) {
94  Options result = src;
95  result.comparator = icmp;
96  result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL;
97  ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
98  ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
99  ClipToRange(&result.max_file_size, 1<<20, 1<<30);
100  ClipToRange(&result.block_size, 1<<10, 4<<20);
101  if (result.info_log == NULL) {
102  // Open a log file in the same directory as the db
103  src.env->CreateDir(dbname); // In case it does not exist
104  src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
105  Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
106  if (!s.ok()) {
107  // No place suitable for logging
108  result.info_log = NULL;
109  }
110  }
111  if (result.block_cache == NULL) {
112  result.block_cache = NewLRUCache(8 << 20);
113  }
114  return result;
115 }
116 
117 DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
118  : env_(raw_options.env),
119  internal_comparator_(raw_options.comparator),
120  internal_filter_policy_(raw_options.filter_policy),
122  &internal_filter_policy_, raw_options)),
123  owns_info_log_(options_.info_log != raw_options.info_log),
124  owns_cache_(options_.block_cache != raw_options.block_cache),
125  dbname_(dbname),
126  db_lock_(NULL),
127  shutting_down_(NULL),
128  bg_cv_(&mutex_),
129  mem_(NULL),
130  imm_(NULL),
131  logfile_(NULL),
132  logfile_number_(0),
133  log_(NULL),
134  seed_(0),
135  tmp_batch_(new WriteBatch),
137  manual_compaction_(NULL) {
138  has_imm_.Release_Store(NULL);
139 
140  // Reserve ten files or so for other uses and give the rest to TableCache.
141  const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
142  table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
143 
146 }
147 
149  // Wait for background work to finish
150  mutex_.Lock();
151  shutting_down_.Release_Store(this); // Any non-NULL value is ok
152  while (bg_compaction_scheduled_) {
153  bg_cv_.Wait();
154  }
155  mutex_.Unlock();
156 
157  if (db_lock_ != NULL) {
159  }
160 
161  delete versions_;
162  if (mem_ != NULL) mem_->Unref();
163  if (imm_ != NULL) imm_->Unref();
164  delete tmp_batch_;
165  delete log_;
166  delete logfile_;
167  delete table_cache_;
168 
169  if (owns_info_log_) {
170  delete options_.info_log;
171  }
172  if (owns_cache_) {
173  delete options_.block_cache;
174  }
175 }
176 
178  VersionEdit new_db;
179  new_db.SetComparatorName(user_comparator()->Name());
180  new_db.SetLogNumber(0);
181  new_db.SetNextFile(2);
182  new_db.SetLastSequence(0);
183 
184  const std::string manifest = DescriptorFileName(dbname_, 1);
185  WritableFile* file;
186  Status s = env_->NewWritableFile(manifest, &file);
187  if (!s.ok()) {
188  return s;
189  }
190  {
191  log::Writer log(file);
192  std::string record;
193  new_db.EncodeTo(&record);
194  s = log.AddRecord(record);
195  if (s.ok()) {
196  s = file->Close();
197  }
198  }
199  delete file;
200  if (s.ok()) {
201  // Make "CURRENT" file that points to the new manifest file.
202  s = SetCurrentFile(env_, dbname_, 1);
203  } else {
204  env_->DeleteFile(manifest);
205  }
206  return s;
207 }
208 
210  if (s->ok() || options_.paranoid_checks) {
211  // No change needed
212  } else {
213  Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
214  *s = Status::OK();
215  }
216 }
217 
219  if (!bg_error_.ok()) {
220  // After a background error, we don't know whether a new version may
221  // or may not have been committed, so we cannot safely garbage collect.
222  return;
223  }
224 
225  // Make a set of all of the live files
226  std::set<uint64_t> live = pending_outputs_;
227  versions_->AddLiveFiles(&live);
228 
229  std::vector<std::string> filenames;
230  env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
231  uint64_t number;
232  FileType type;
233  for (size_t i = 0; i < filenames.size(); i++) {
234  if (ParseFileName(filenames[i], &number, &type)) {
235  bool keep = true;
236  switch (type) {
237  case kLogFile:
238  keep = ((number >= versions_->LogNumber()) ||
239  (number == versions_->PrevLogNumber()));
240  break;
241  case kDescriptorFile:
242  // Keep my manifest file, and any newer incarnations'
243  // (in case there is a race that allows other incarnations)
244  keep = (number >= versions_->ManifestFileNumber());
245  break;
246  case kTableFile:
247  keep = (live.find(number) != live.end());
248  break;
249  case kTempFile:
250  // Any temp files that are currently being written to must
251  // be recorded in pending_outputs_, which is inserted into "live"
252  keep = (live.find(number) != live.end());
253  break;
254  case kCurrentFile:
255  case kDBLockFile:
256  case kInfoLogFile:
257  keep = true;
258  break;
259  }
260 
261  if (!keep) {
262  if (type == kTableFile) {
263  table_cache_->Evict(number);
264  }
265  Log(options_.info_log, "Delete type=%d #%lld\n",
266  int(type),
267  static_cast<unsigned long long>(number));
268  env_->DeleteFile(dbname_ + "/" + filenames[i]);
269  }
270  }
271  }
272 }
273 
274 Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) {
275  mutex_.AssertHeld();
276 
277  // Ignore error from CreateDir since the creation of the DB is
278  // committed only when the descriptor is created, and this directory
279  // may already exist from a previous failed creation attempt.
281  assert(db_lock_ == NULL);
283  if (!s.ok()) {
284  return s;
285  }
286 
289  s = NewDB();
290  if (!s.ok()) {
291  return s;
292  }
293  } else {
295  dbname_, "does not exist (create_if_missing is false)");
296  }
297  } else {
300  dbname_, "exists (error_if_exists is true)");
301  }
302  }
303 
304  s = versions_->Recover(save_manifest);
305  if (!s.ok()) {
306  return s;
307  }
308  SequenceNumber max_sequence(0);
309 
310  // Recover from all newer log files than the ones named in the
311  // descriptor (new log files may have been added by the previous
312  // incarnation without registering them in the descriptor).
313  //
314  // Note that PrevLogNumber() is no longer used, but we pay
315  // attention to it in case we are recovering a database
316  // produced by an older version of leveldb.
317  const uint64_t min_log = versions_->LogNumber();
318  const uint64_t prev_log = versions_->PrevLogNumber();
319  std::vector<std::string> filenames;
320  s = env_->GetChildren(dbname_, &filenames);
321  if (!s.ok()) {
322  return s;
323  }
324  std::set<uint64_t> expected;
325  versions_->AddLiveFiles(&expected);
326  uint64_t number;
327  FileType type;
328  std::vector<uint64_t> logs;
329  for (size_t i = 0; i < filenames.size(); i++) {
330  if (ParseFileName(filenames[i], &number, &type)) {
331  expected.erase(number);
332  if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
333  logs.push_back(number);
334  }
335  }
336  if (!expected.empty()) {
337  char buf[50];
338  snprintf(buf, sizeof(buf), "%d missing files; e.g.",
339  static_cast<int>(expected.size()));
340  return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
341  }
342 
343  // Recover in the order in which the logs were generated
344  std::sort(logs.begin(), logs.end());
345  for (size_t i = 0; i < logs.size(); i++) {
346  s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
347  &max_sequence);
348  if (!s.ok()) {
349  return s;
350  }
351 
352  // The previous incarnation may not have written any MANIFEST
353  // records after allocating this log number. So we manually
354  // update the file number allocation counter in VersionSet.
355  versions_->MarkFileNumberUsed(logs[i]);
356  }
357 
358  if (versions_->LastSequence() < max_sequence) {
359  versions_->SetLastSequence(max_sequence);
360  }
361 
362  return Status::OK();
363 }
364 
365 Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
366  bool* save_manifest, VersionEdit* edit,
367  SequenceNumber* max_sequence) {
368  struct LogReporter : public log::Reader::Reporter {
369  Env* env;
370  Logger* info_log;
371  const char* fname;
372  Status* status; // NULL if options_.paranoid_checks==false
373  virtual void Corruption(size_t bytes, const Status& s) {
374  Log(info_log, "%s%s: dropping %d bytes; %s",
375  (this->status == NULL ? "(ignoring error) " : ""),
376  fname, static_cast<int>(bytes), s.ToString().c_str());
377  if (this->status != NULL && this->status->ok()) *this->status = s;
378  }
379  };
380 
381  mutex_.AssertHeld();
382 
383  // Open the log file
384  std::string fname = LogFileName(dbname_, log_number);
385  SequentialFile* file;
386  Status status = env_->NewSequentialFile(fname, &file);
387  if (!status.ok()) {
388  MaybeIgnoreError(&status);
389  return status;
390  }
391 
392  // Create the log reader.
393  LogReporter reporter;
394  reporter.env = env_;
395  reporter.info_log = options_.info_log;
396  reporter.fname = fname.c_str();
397  reporter.status = (options_.paranoid_checks ? &status : NULL);
398  // We intentionally make log::Reader do checksumming even if
399  // paranoid_checks==false so that corruptions cause entire commits
400  // to be skipped instead of propagating bad information (like overly
401  // large sequence numbers).
402  log::Reader reader(file, &reporter, true/*checksum*/,
403  0/*initial_offset*/);
404  Log(options_.info_log, "Recovering log #%llu",
405  (unsigned long long) log_number);
406 
407  // Read all the records and add to a memtable
408  std::string scratch;
409  Slice record;
410  WriteBatch batch;
411  int compactions = 0;
412  MemTable* mem = NULL;
413  while (reader.ReadRecord(&record, &scratch) &&
414  status.ok()) {
415  if (record.size() < 12) {
416  reporter.Corruption(
417  record.size(), Status::Corruption("log record too small"));
418  continue;
419  }
420  WriteBatchInternal::SetContents(&batch, record);
421 
422  if (mem == NULL) {
423  mem = new MemTable(internal_comparator_);
424  mem->Ref();
425  }
426  status = WriteBatchInternal::InsertInto(&batch, mem);
427  MaybeIgnoreError(&status);
428  if (!status.ok()) {
429  break;
430  }
431  const SequenceNumber last_seq =
433  WriteBatchInternal::Count(&batch) - 1;
434  if (last_seq > *max_sequence) {
435  *max_sequence = last_seq;
436  }
437 
439  compactions++;
440  *save_manifest = true;
441  status = WriteLevel0Table(mem, edit, NULL);
442  mem->Unref();
443  mem = NULL;
444  if (!status.ok()) {
445  // Reflect errors immediately so that conditions like full
446  // file-systems cause the DB::Open() to fail.
447  break;
448  }
449  }
450  }
451 
452  delete file;
453 
454  // See if we should keep reusing the last log file.
455  if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
456  assert(logfile_ == NULL);
457  assert(log_ == NULL);
458  assert(mem_ == NULL);
459  uint64_t lfile_size;
460  if (env_->GetFileSize(fname, &lfile_size).ok() &&
461  env_->NewAppendableFile(fname, &logfile_).ok()) {
462  Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
463  log_ = new log::Writer(logfile_, lfile_size);
464  logfile_number_ = log_number;
465  if (mem != NULL) {
466  mem_ = mem;
467  mem = NULL;
468  } else {
469  // mem can be NULL if lognum exists but was empty.
471  mem_->Ref();
472  }
473  }
474  }
475 
476  if (mem != NULL) {
477  // mem did not get reused; compact it.
478  if (status.ok()) {
479  *save_manifest = true;
480  status = WriteLevel0Table(mem, edit, NULL);
481  }
482  mem->Unref();
483  }
484 
485  return status;
486 }
487 
489  Version* base) {
490  mutex_.AssertHeld();
491  const uint64_t start_micros = env_->NowMicros();
492  FileMetaData meta;
493  meta.number = versions_->NewFileNumber();
494  pending_outputs_.insert(meta.number);
495  Iterator* iter = mem->NewIterator();
496  Log(options_.info_log, "Level-0 table #%llu: started",
497  (unsigned long long) meta.number);
498 
499  Status s;
500  {
501  mutex_.Unlock();
502  s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
503  mutex_.Lock();
504  }
505 
506  Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
507  (unsigned long long) meta.number,
508  (unsigned long long) meta.file_size,
509  s.ToString().c_str());
510  delete iter;
511  pending_outputs_.erase(meta.number);
512 
513 
514  // Note that if file_size is zero, the file has been deleted and
515  // should not be added to the manifest.
516  int level = 0;
517  if (s.ok() && meta.file_size > 0) {
518  const Slice min_user_key = meta.smallest.user_key();
519  const Slice max_user_key = meta.largest.user_key();
520  if (base != NULL) {
521  level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
522  }
523  edit->AddFile(level, meta.number, meta.file_size,
524  meta.smallest, meta.largest);
525  }
526 
527  CompactionStats stats;
528  stats.micros = env_->NowMicros() - start_micros;
529  stats.bytes_written = meta.file_size;
530  stats_[level].Add(stats);
531  return s;
532 }
533 
535  mutex_.AssertHeld();
536  assert(imm_ != NULL);
537 
538  // Save the contents of the memtable as a new Table
539  VersionEdit edit;
540  Version* base = versions_->current();
541  base->Ref();
542  Status s = WriteLevel0Table(imm_, &edit, base);
543  base->Unref();
544 
545  if (s.ok() && shutting_down_.Acquire_Load()) {
546  s = Status::IOError("Deleting DB during memtable compaction");
547  }
548 
549  // Replace immutable memtable with the generated Table
550  if (s.ok()) {
551  edit.SetPrevLogNumber(0);
552  edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
553  s = versions_->LogAndApply(&edit, &mutex_);
554  }
555 
556  if (s.ok()) {
557  // Commit to the new state
558  imm_->Unref();
559  imm_ = NULL;
560  has_imm_.Release_Store(NULL);
562  } else {
564  }
565 }
566 
567 void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
568  int max_level_with_files = 1;
569  {
570  MutexLock l(&mutex_);
571  Version* base = versions_->current();
572  for (int level = 1; level < config::kNumLevels; level++) {
573  if (base->OverlapInLevel(level, begin, end)) {
574  max_level_with_files = level;
575  }
576  }
577  }
578  TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
579  for (int level = 0; level < max_level_with_files; level++) {
580  TEST_CompactRange(level, begin, end);
581  }
582 }
583 
584 void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
585  assert(level >= 0);
586  assert(level + 1 < config::kNumLevels);
587 
588  InternalKey begin_storage, end_storage;
589 
590  ManualCompaction manual;
591  manual.level = level;
592  manual.done = false;
593  if (begin == NULL) {
594  manual.begin = NULL;
595  } else {
596  begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
597  manual.begin = &begin_storage;
598  }
599  if (end == NULL) {
600  manual.end = NULL;
601  } else {
602  end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
603  manual.end = &end_storage;
604  }
605 
606  MutexLock l(&mutex_);
607  while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
608  if (manual_compaction_ == NULL) { // Idle
609  manual_compaction_ = &manual;
611  } else { // Running either my compaction or another compaction.
612  bg_cv_.Wait();
613  }
614  }
615  if (manual_compaction_ == &manual) {
616  // Cancel my manual compaction since we aborted early for some reason.
617  manual_compaction_ = NULL;
618  }
619 }
620 
622  // NULL batch means just wait for earlier writes to be done
623  Status s = Write(WriteOptions(), NULL);
624  if (s.ok()) {
625  // Wait until the compaction completes
626  MutexLock l(&mutex_);
627  while (imm_ != NULL && bg_error_.ok()) {
628  bg_cv_.Wait();
629  }
630  if (imm_ != NULL) {
631  s = bg_error_;
632  }
633  }
634  return s;
635 }
636 
638  mutex_.AssertHeld();
639  if (bg_error_.ok()) {
640  bg_error_ = s;
641  bg_cv_.SignalAll();
642  }
643 }
644 
646  mutex_.AssertHeld();
648  // Already scheduled
649  } else if (shutting_down_.Acquire_Load()) {
650  // DB is being deleted; no more background compactions
651  } else if (!bg_error_.ok()) {
652  // Already got an error; no more changes
653  } else if (imm_ == NULL &&
654  manual_compaction_ == NULL &&
656  // No work to be done
657  } else {
659  env_->Schedule(&DBImpl::BGWork, this);
660  }
661 }
662 
663 void DBImpl::BGWork(void* db) {
664  reinterpret_cast<DBImpl*>(db)->BackgroundCall();
665 }
666 
668  MutexLock l(&mutex_);
669  assert(bg_compaction_scheduled_);
670  if (shutting_down_.Acquire_Load()) {
671  // No more background work when shutting down.
672  } else if (!bg_error_.ok()) {
673  // No more background work after a background error.
674  } else {
676  }
677 
678  bg_compaction_scheduled_ = false;
679 
680  // Previous compaction may have produced too many files in a level,
681  // so reschedule another compaction if needed.
683  bg_cv_.SignalAll();
684 }
685 
687  mutex_.AssertHeld();
688 
689  if (imm_ != NULL) {
690  CompactMemTable();
691  return;
692  }
693 
694  Compaction* c;
695  bool is_manual = (manual_compaction_ != NULL);
696  InternalKey manual_end;
697  if (is_manual) {
699  c = versions_->CompactRange(m->level, m->begin, m->end);
700  m->done = (c == NULL);
701  if (c != NULL) {
702  manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
703  }
705  "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
706  m->level,
707  (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
708  (m->end ? m->end->DebugString().c_str() : "(end)"),
709  (m->done ? "(end)" : manual_end.DebugString().c_str()));
710  } else {
711  c = versions_->PickCompaction();
712  }
713 
714  Status status;
715  if (c == NULL) {
716  // Nothing to do
717  } else if (!is_manual && c->IsTrivialMove()) {
718  // Move file to next level
719  assert(c->num_input_files(0) == 1);
720  FileMetaData* f = c->input(0, 0);
721  c->edit()->DeleteFile(c->level(), f->number);
722  c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
723  f->smallest, f->largest);
724  status = versions_->LogAndApply(c->edit(), &mutex_);
725  if (!status.ok()) {
726  RecordBackgroundError(status);
727  }
729  Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
730  static_cast<unsigned long long>(f->number),
731  c->level() + 1,
732  static_cast<unsigned long long>(f->file_size),
733  status.ToString().c_str(),
734  versions_->LevelSummary(&tmp));
735  } else {
736  CompactionState* compact = new CompactionState(c);
737  status = DoCompactionWork(compact);
738  if (!status.ok()) {
739  RecordBackgroundError(status);
740  }
741  CleanupCompaction(compact);
742  c->ReleaseInputs();
744  }
745  delete c;
746 
747  if (status.ok()) {
748  // Done
749  } else if (shutting_down_.Acquire_Load()) {
750  // Ignore compaction errors found during shutting down
751  } else {
753  "Compaction error: %s", status.ToString().c_str());
754  }
755 
756  if (is_manual) {
758  if (!status.ok()) {
759  m->done = true;
760  }
761  if (!m->done) {
762  // We only compacted part of the requested range. Update *m
763  // to the range that is left to be compacted.
764  m->tmp_storage = manual_end;
765  m->begin = &m->tmp_storage;
766  }
767  manual_compaction_ = NULL;
768  }
769 }
770 
772  mutex_.AssertHeld();
773  if (compact->builder != NULL) {
774  // May happen if we get a shutdown call in the middle of compaction
775  compact->builder->Abandon();
776  delete compact->builder;
777  } else {
778  assert(compact->outfile == NULL);
779  }
780  delete compact->outfile;
781  for (size_t i = 0; i < compact->outputs.size(); i++) {
782  const CompactionState::Output& out = compact->outputs[i];
783  pending_outputs_.erase(out.number);
784  }
785  delete compact;
786 }
787 
789  assert(compact != NULL);
790  assert(compact->builder == NULL);
791  uint64_t file_number;
792  {
793  mutex_.Lock();
794  file_number = versions_->NewFileNumber();
795  pending_outputs_.insert(file_number);
797  out.number = file_number;
798  out.smallest.Clear();
799  out.largest.Clear();
800  compact->outputs.push_back(out);
801  mutex_.Unlock();
802  }
803 
804  // Make the output file
805  std::string fname = TableFileName(dbname_, file_number);
806  Status s = env_->NewWritableFile(fname, &compact->outfile);
807  if (s.ok()) {
808  compact->builder = new TableBuilder(options_, compact->outfile);
809  }
810  return s;
811 }
812 
814  Iterator* input) {
815  assert(compact != NULL);
816  assert(compact->outfile != NULL);
817  assert(compact->builder != NULL);
818 
819  const uint64_t output_number = compact->current_output()->number;
820  assert(output_number != 0);
821 
822  // Check for iterator errors
823  Status s = input->status();
824  const uint64_t current_entries = compact->builder->NumEntries();
825  if (s.ok()) {
826  s = compact->builder->Finish();
827  } else {
828  compact->builder->Abandon();
829  }
830  const uint64_t current_bytes = compact->builder->FileSize();
831  compact->current_output()->file_size = current_bytes;
832  compact->total_bytes += current_bytes;
833  delete compact->builder;
834  compact->builder = NULL;
835 
836  // Finish and check for file errors
837  if (s.ok()) {
838  s = compact->outfile->Sync();
839  }
840  if (s.ok()) {
841  s = compact->outfile->Close();
842  }
843  delete compact->outfile;
844  compact->outfile = NULL;
845 
846  if (s.ok() && current_entries > 0) {
847  // Verify that the table is usable
849  output_number,
850  current_bytes);
851  s = iter->status();
852  delete iter;
853  if (s.ok()) {
855  "Generated table #%llu@%d: %lld keys, %lld bytes",
856  (unsigned long long) output_number,
857  compact->compaction->level(),
858  (unsigned long long) current_entries,
859  (unsigned long long) current_bytes);
860  }
861  }
862  return s;
863 }
864 
865 
867  mutex_.AssertHeld();
868  Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
869  compact->compaction->num_input_files(0),
870  compact->compaction->level(),
871  compact->compaction->num_input_files(1),
872  compact->compaction->level() + 1,
873  static_cast<long long>(compact->total_bytes));
874 
875  // Add compaction outputs
876  compact->compaction->AddInputDeletions(compact->compaction->edit());
877  const int level = compact->compaction->level();
878  for (size_t i = 0; i < compact->outputs.size(); i++) {
879  const CompactionState::Output& out = compact->outputs[i];
880  compact->compaction->edit()->AddFile(
881  level + 1,
882  out.number, out.file_size, out.smallest, out.largest);
883  }
884  return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
885 }
886 
888  const uint64_t start_micros = env_->NowMicros();
889  int64_t imm_micros = 0; // Micros spent doing imm_ compactions
890 
891  Log(options_.info_log, "Compacting %d@%d + %d@%d files",
892  compact->compaction->num_input_files(0),
893  compact->compaction->level(),
894  compact->compaction->num_input_files(1),
895  compact->compaction->level() + 1);
896 
897  assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
898  assert(compact->builder == NULL);
899  assert(compact->outfile == NULL);
900  if (snapshots_.empty()) {
902  } else {
904  }
905 
906  // Release mutex while we're actually doing the compaction work
907  mutex_.Unlock();
908 
909  Iterator* input = versions_->MakeInputIterator(compact->compaction);
910  input->SeekToFirst();
911  Status status;
912  ParsedInternalKey ikey;
913  std::string current_user_key;
914  bool has_current_user_key = false;
915  SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
916  for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
917  // Prioritize immutable compaction work
918  if (has_imm_.NoBarrier_Load() != NULL) {
919  const uint64_t imm_start = env_->NowMicros();
920  mutex_.Lock();
921  if (imm_ != NULL) {
922  CompactMemTable();
923  bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
924  }
925  mutex_.Unlock();
926  imm_micros += (env_->NowMicros() - imm_start);
927  }
928 
929  Slice key = input->key();
930  if (compact->compaction->ShouldStopBefore(key) &&
931  compact->builder != NULL) {
932  status = FinishCompactionOutputFile(compact, input);
933  if (!status.ok()) {
934  break;
935  }
936  }
937 
938  // Handle key/value, add to state, etc.
939  bool drop = false;
940  if (!ParseInternalKey(key, &ikey)) {
941  // Do not hide error keys
942  current_user_key.clear();
943  has_current_user_key = false;
944  last_sequence_for_key = kMaxSequenceNumber;
945  } else {
946  if (!has_current_user_key ||
947  user_comparator()->Compare(ikey.user_key,
948  Slice(current_user_key)) != 0) {
949  // First occurrence of this user key
950  current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
951  has_current_user_key = true;
952  last_sequence_for_key = kMaxSequenceNumber;
953  }
954 
955  if (last_sequence_for_key <= compact->smallest_snapshot) {
956  // Hidden by an newer entry for same user key
957  drop = true; // (A)
958  } else if (ikey.type == kTypeDeletion &&
959  ikey.sequence <= compact->smallest_snapshot &&
960  compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
961  // For this user key:
962  // (1) there is no data in higher levels
963  // (2) data in lower levels will have larger sequence numbers
964  // (3) data in layers that are being compacted here and have
965  // smaller sequence numbers will be dropped in the next
966  // few iterations of this loop (by rule (A) above).
967  // Therefore this deletion marker is obsolete and can be dropped.
968  drop = true;
969  }
970 
971  last_sequence_for_key = ikey.sequence;
972  }
973 #if 0
975  " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
976  "%d smallest_snapshot: %d",
977  ikey.user_key.ToString().c_str(),
978  (int)ikey.sequence, ikey.type, kTypeValue, drop,
979  compact->compaction->IsBaseLevelForKey(ikey.user_key),
980  (int)last_sequence_for_key, (int)compact->smallest_snapshot);
981 #endif
982 
983  if (!drop) {
984  // Open output file if necessary
985  if (compact->builder == NULL) {
986  status = OpenCompactionOutputFile(compact);
987  if (!status.ok()) {
988  break;
989  }
990  }
991  if (compact->builder->NumEntries() == 0) {
992  compact->current_output()->smallest.DecodeFrom(key);
993  }
994  compact->current_output()->largest.DecodeFrom(key);
995  compact->builder->Add(key, input->value());
996 
997  // Close output file if it is big enough
998  if (compact->builder->FileSize() >=
999  compact->compaction->MaxOutputFileSize()) {
1000  status = FinishCompactionOutputFile(compact, input);
1001  if (!status.ok()) {
1002  break;
1003  }
1004  }
1005  }
1006 
1007  input->Next();
1008  }
1009 
1010  if (status.ok() && shutting_down_.Acquire_Load()) {
1011  status = Status::IOError("Deleting DB during compaction");
1012  }
1013  if (status.ok() && compact->builder != NULL) {
1014  status = FinishCompactionOutputFile(compact, input);
1015  }
1016  if (status.ok()) {
1017  status = input->status();
1018  }
1019  delete input;
1020  input = NULL;
1021 
1022  CompactionStats stats;
1023  stats.micros = env_->NowMicros() - start_micros - imm_micros;
1024  for (int which = 0; which < 2; which++) {
1025  for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
1026  stats.bytes_read += compact->compaction->input(which, i)->file_size;
1027  }
1028  }
1029  for (size_t i = 0; i < compact->outputs.size(); i++) {
1030  stats.bytes_written += compact->outputs[i].file_size;
1031  }
1032 
1033  mutex_.Lock();
1034  stats_[compact->compaction->level() + 1].Add(stats);
1035 
1036  if (status.ok()) {
1037  status = InstallCompactionResults(compact);
1038  }
1039  if (!status.ok()) {
1040  RecordBackgroundError(status);
1041  }
1044  "compacted to: %s", versions_->LevelSummary(&tmp));
1045  return status;
1046 }
1047 
1048 namespace {
1049 struct IterState {
1050  port::Mutex* mu;
1054 };
1055 
1056 static void CleanupIteratorState(void* arg1, void* arg2) {
1057  IterState* state = reinterpret_cast<IterState*>(arg1);
1058  state->mu->Lock();
1059  state->mem->Unref();
1060  if (state->imm != NULL) state->imm->Unref();
1061  state->version->Unref();
1062  state->mu->Unlock();
1063  delete state;
1064 }
1065 } // namespace
1066 
1068  SequenceNumber* latest_snapshot,
1069  uint32_t* seed) {
1070  IterState* cleanup = new IterState;
1071  mutex_.Lock();
1072  *latest_snapshot = versions_->LastSequence();
1073 
1074  // Collect together all needed child iterators
1075  std::vector<Iterator*> list;
1076  list.push_back(mem_->NewIterator());
1077  mem_->Ref();
1078  if (imm_ != NULL) {
1079  list.push_back(imm_->NewIterator());
1080  imm_->Ref();
1081  }
1082  versions_->current()->AddIterators(options, &list);
1083  Iterator* internal_iter =
1084  NewMergingIterator(&internal_comparator_, &list[0], list.size());
1085  versions_->current()->Ref();
1086 
1087  cleanup->mu = &mutex_;
1088  cleanup->mem = mem_;
1089  cleanup->imm = imm_;
1090  cleanup->version = versions_->current();
1091  internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
1092 
1093  *seed = ++seed_;
1094  mutex_.Unlock();
1095  return internal_iter;
1096 }
1097 
1099  SequenceNumber ignored;
1100  uint32_t ignored_seed;
1101  return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
1102 }
1103 
1105  MutexLock l(&mutex_);
1107 }
1108 
1110  const Slice& key,
1111  std::string* value) {
1112  Status s;
1113  MutexLock l(&mutex_);
1114  SequenceNumber snapshot;
1115  if (options.snapshot != NULL) {
1116  snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
1117  } else {
1118  snapshot = versions_->LastSequence();
1119  }
1120 
1121  MemTable* mem = mem_;
1122  MemTable* imm = imm_;
1123  Version* current = versions_->current();
1124  mem->Ref();
1125  if (imm != NULL) imm->Ref();
1126  current->Ref();
1127 
1128  bool have_stat_update = false;
1129  Version::GetStats stats;
1130 
1131  // Unlock while reading from files and memtables
1132  {
1133  mutex_.Unlock();
1134  // First look in the memtable, then in the immutable memtable (if any).
1135  LookupKey lkey(key, snapshot);
1136  if (mem->Get(lkey, value, &s)) {
1137  // Done
1138  } else if (imm != NULL && imm->Get(lkey, value, &s)) {
1139  // Done
1140  } else {
1141  s = current->Get(options, lkey, value, &stats);
1142  have_stat_update = true;
1143  }
1144  mutex_.Lock();
1145  }
1146 
1147  if (have_stat_update && current->UpdateStats(stats)) {
1149  }
1150  mem->Unref();
1151  if (imm != NULL) imm->Unref();
1152  current->Unref();
1153  return s;
1154 }
1155 
1157  SequenceNumber latest_snapshot;
1158  uint32_t seed;
1159  Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
1160  return NewDBIterator(
1161  this, user_comparator(), iter,
1162  (options.snapshot != NULL
1163  ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
1164  : latest_snapshot),
1165  seed);
1166 }
1167 
1169  MutexLock l(&mutex_);
1170  if (versions_->current()->RecordReadSample(key)) {
1172  }
1173 }
1174 
1176  MutexLock l(&mutex_);
1177  return snapshots_.New(versions_->LastSequence());
1178 }
1179 
1181  MutexLock l(&mutex_);
1182  snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
1183 }
1184 
1185 // Convenience methods
1186 Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
1187  return DB::Put(o, key, val);
1188 }
1189 
1190 Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
1191  return DB::Delete(options, key);
1192 }
1193 
1194 Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
1195  Writer w(&mutex_);
1196  w.batch = my_batch;
1197  w.sync = options.sync;
1198  w.done = false;
1199 
1200  MutexLock l(&mutex_);
1201  writers_.push_back(&w);
1202  while (!w.done && &w != writers_.front()) {
1203  w.cv.Wait();
1204  }
1205  if (w.done) {
1206  return w.status;
1207  }
1208 
1209  // May temporarily unlock and wait.
1210  Status status = MakeRoomForWrite(my_batch == NULL);
1211  uint64_t last_sequence = versions_->LastSequence();
1212  Writer* last_writer = &w;
1213  if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
1214  WriteBatch* updates = BuildBatchGroup(&last_writer);
1215  WriteBatchInternal::SetSequence(updates, last_sequence + 1);
1216  last_sequence += WriteBatchInternal::Count(updates);
1217 
1218  // Add to log and apply to memtable. We can release the lock
1219  // during this phase since &w is currently responsible for logging
1220  // and protects against concurrent loggers and concurrent writes
1221  // into mem_.
1222  {
1223  mutex_.Unlock();
1224  status = log_->AddRecord(WriteBatchInternal::Contents(updates));
1225  bool sync_error = false;
1226  if (status.ok() && options.sync) {
1227  status = logfile_->Sync();
1228  if (!status.ok()) {
1229  sync_error = true;
1230  }
1231  }
1232  if (status.ok()) {
1233  status = WriteBatchInternal::InsertInto(updates, mem_);
1234  }
1235  mutex_.Lock();
1236  if (sync_error) {
1237  // The state of the log file is indeterminate: the log record we
1238  // just added may or may not show up when the DB is re-opened.
1239  // So we force the DB into a mode where all future writes fail.
1240  RecordBackgroundError(status);
1241  }
1242  }
1243  if (updates == tmp_batch_) tmp_batch_->Clear();
1244 
1245  versions_->SetLastSequence(last_sequence);
1246  }
1247 
1248  while (true) {
1249  Writer* ready = writers_.front();
1250  writers_.pop_front();
1251  if (ready != &w) {
1252  ready->status = status;
1253  ready->done = true;
1254  ready->cv.Signal();
1255  }
1256  if (ready == last_writer) break;
1257  }
1258 
1259  // Notify new head of write queue
1260  if (!writers_.empty()) {
1261  writers_.front()->cv.Signal();
1262  }
1263 
1264  return status;
1265 }
1266 
1267 // REQUIRES: Writer list must be non-empty
1268 // REQUIRES: First writer must have a non-NULL batch
1270  assert(!writers_.empty());
1271  Writer* first = writers_.front();
1272  WriteBatch* result = first->batch;
1273  assert(result != NULL);
1274 
1275  size_t size = WriteBatchInternal::ByteSize(first->batch);
1276 
1277  // Allow the group to grow up to a maximum size, but if the
1278  // original write is small, limit the growth so we do not slow
1279  // down the small write too much.
1280  size_t max_size = 1 << 20;
1281  if (size <= (128<<10)) {
1282  max_size = size + (128<<10);
1283  }
1284 
1285  *last_writer = first;
1286  std::deque<Writer*>::iterator iter = writers_.begin();
1287  ++iter; // Advance past "first"
1288  for (; iter != writers_.end(); ++iter) {
1289  Writer* w = *iter;
1290  if (w->sync && !first->sync) {
1291  // Do not include a sync write into a batch handled by a non-sync write.
1292  break;
1293  }
1294 
1295  if (w->batch != NULL) {
1296  size += WriteBatchInternal::ByteSize(w->batch);
1297  if (size > max_size) {
1298  // Do not make batch too big
1299  break;
1300  }
1301 
1302  // Append to *result
1303  if (result == first->batch) {
1304  // Switch to temporary batch instead of disturbing caller's batch
1305  result = tmp_batch_;
1306  assert(WriteBatchInternal::Count(result) == 0);
1307  WriteBatchInternal::Append(result, first->batch);
1308  }
1309  WriteBatchInternal::Append(result, w->batch);
1310  }
1311  *last_writer = w;
1312  }
1313  return result;
1314 }
1315 
1316 // REQUIRES: mutex_ is held
1317 // REQUIRES: this thread is currently at the front of the writer queue
1319  mutex_.AssertHeld();
1320  assert(!writers_.empty());
1321  bool allow_delay = !force;
1322  Status s;
1323  while (true) {
1324  if (!bg_error_.ok()) {
1325  // Yield previous error
1326  s = bg_error_;
1327  break;
1328  } else if (
1329  allow_delay &&
1331  // We are getting close to hitting a hard limit on the number of
1332  // L0 files. Rather than delaying a single write by several
1333  // seconds when we hit the hard limit, start delaying each
1334  // individual write by 1ms to reduce latency variance. Also,
1335  // this delay hands over some CPU to the compaction thread in
1336  // case it is sharing the same core as the writer.
1337  mutex_.Unlock();
1338  env_->SleepForMicroseconds(1000);
1339  allow_delay = false; // Do not delay a single write more than once
1340  mutex_.Lock();
1341  } else if (!force &&
1343  // There is room in current memtable
1344  break;
1345  } else if (imm_ != NULL) {
1346  // We have filled up the current memtable, but the previous
1347  // one is still being compacted, so we wait.
1348  Log(options_.info_log, "Current memtable full; waiting...\n");
1349  bg_cv_.Wait();
1351  // There are too many level-0 files.
1352  Log(options_.info_log, "Too many L0 files; waiting...\n");
1353  bg_cv_.Wait();
1354  } else {
1355  // Attempt to switch to a new memtable and trigger compaction of old
1356  assert(versions_->PrevLogNumber() == 0);
1357  uint64_t new_log_number = versions_->NewFileNumber();
1358  WritableFile* lfile = NULL;
1359  s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
1360  if (!s.ok()) {
1361  // Avoid chewing through file number space in a tight loop.
1362  versions_->ReuseFileNumber(new_log_number);
1363  break;
1364  }
1365  delete log_;
1366  delete logfile_;
1367  logfile_ = lfile;
1368  logfile_number_ = new_log_number;
1369  log_ = new log::Writer(lfile);
1370  imm_ = mem_;
1371  has_imm_.Release_Store(imm_);
1373  mem_->Ref();
1374  force = false; // Do not force another compaction if have room
1376  }
1377  }
1378  return s;
1379 }
1380 
1381 bool DBImpl::GetProperty(const Slice& property, std::string* value) {
1382  value->clear();
1383 
1384  MutexLock l(&mutex_);
1385  Slice in = property;
1386  Slice prefix("leveldb.");
1387  if (!in.starts_with(prefix)) return false;
1388  in.remove_prefix(prefix.size());
1389 
1390  if (in.starts_with("num-files-at-level")) {
1391  in.remove_prefix(strlen("num-files-at-level"));
1392  uint64_t level;
1393  bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
1394  if (!ok || level >= config::kNumLevels) {
1395  return false;
1396  } else {
1397  char buf[100];
1398  snprintf(buf, sizeof(buf), "%d",
1399  versions_->NumLevelFiles(static_cast<int>(level)));
1400  *value = buf;
1401  return true;
1402  }
1403  } else if (in == "stats") {
1404  char buf[200];
1405  snprintf(buf, sizeof(buf),
1406  " Compactions\n"
1407  "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1408  "--------------------------------------------------\n"
1409  );
1410  value->append(buf);
1411  for (int level = 0; level < config::kNumLevels; level++) {
1412  int files = versions_->NumLevelFiles(level);
1413  if (stats_[level].micros > 0 || files > 0) {
1414  snprintf(
1415  buf, sizeof(buf),
1416  "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
1417  level,
1418  files,
1419  versions_->NumLevelBytes(level) / 1048576.0,
1420  stats_[level].micros / 1e6,
1421  stats_[level].bytes_read / 1048576.0,
1422  stats_[level].bytes_written / 1048576.0);
1423  value->append(buf);
1424  }
1425  }
1426  return true;
1427  } else if (in == "sstables") {
1428  *value = versions_->current()->DebugString();
1429  return true;
1430  } else if (in == "approximate-memory-usage") {
1431  size_t total_usage = options_.block_cache->TotalCharge();
1432  if (mem_) {
1433  total_usage += mem_->ApproximateMemoryUsage();
1434  }
1435  if (imm_) {
1436  total_usage += imm_->ApproximateMemoryUsage();
1437  }
1438  char buf[50];
1439  snprintf(buf, sizeof(buf), "%llu",
1440  static_cast<unsigned long long>(total_usage));
1441  value->append(buf);
1442  return true;
1443  }
1444 
1445  return false;
1446 }
1447 
1449  const Range* range, int n,
1450  uint64_t* sizes) {
1451  // TODO(opt): better implementation
1452  Version* v;
1453  {
1454  MutexLock l(&mutex_);
1455  versions_->current()->Ref();
1456  v = versions_->current();
1457  }
1458 
1459  for (int i = 0; i < n; i++) {
1460  // Convert user_key into a corresponding internal key.
1461  InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1462  InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1463  uint64_t start = versions_->ApproximateOffsetOf(v, k1);
1464  uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
1465  sizes[i] = (limit >= start ? limit - start : 0);
1466  }
1467 
1468  {
1469  MutexLock l(&mutex_);
1470  v->Unref();
1471  }
1472 }
1473 
1474 // Default implementations of convenience methods that subclasses of DB
1475 // can call if they wish
1476 Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
1477  WriteBatch batch;
1478  batch.Put(key, value);
1479  return Write(opt, &batch);
1480 }
1481 
1482 Status DB::Delete(const WriteOptions& opt, const Slice& key) {
1483  WriteBatch batch;
1484  batch.Delete(key);
1485  return Write(opt, &batch);
1486 }
1487 
1488 DB::~DB() { }
1489 
1490 Status DB::Open(const Options& options, const std::string& dbname,
1491  DB** dbptr) {
1492  *dbptr = NULL;
1493 
1494  DBImpl* impl = new DBImpl(options, dbname);
1495  impl->mutex_.Lock();
1496  VersionEdit edit;
1497  // Recover handles create_if_missing, error_if_exists
1498  bool save_manifest = false;
1499  Status s = impl->Recover(&edit, &save_manifest);
1500  if (s.ok() && impl->mem_ == NULL) {
1501  // Create new log and a corresponding memtable.
1502  uint64_t new_log_number = impl->versions_->NewFileNumber();
1503  WritableFile* lfile;
1504  s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
1505  &lfile);
1506  if (s.ok()) {
1507  edit.SetLogNumber(new_log_number);
1508  impl->logfile_ = lfile;
1509  impl->logfile_number_ = new_log_number;
1510  impl->log_ = new log::Writer(lfile);
1511  impl->mem_ = new MemTable(impl->internal_comparator_);
1512  impl->mem_->Ref();
1513  }
1514  }
1515  if (s.ok() && save_manifest) {
1516  edit.SetPrevLogNumber(0); // No older logs needed after recovery.
1517  edit.SetLogNumber(impl->logfile_number_);
1518  s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
1519  }
1520  if (s.ok()) {
1521  impl->DeleteObsoleteFiles();
1522  impl->MaybeScheduleCompaction();
1523  }
1524  impl->mutex_.Unlock();
1525  if (s.ok()) {
1526  assert(impl->mem_ != NULL);
1527  *dbptr = impl;
1528  } else {
1529  delete impl;
1530  }
1531  return s;
1532 }
1533 
1535 }
1536 
1537 Status DestroyDB(const std::string& dbname, const Options& options) {
1538  Env* env = options.env;
1539  std::vector<std::string> filenames;
1540  // Ignore error in case directory does not exist
1541  env->GetChildren(dbname, &filenames);
1542  if (filenames.empty()) {
1543  return Status::OK();
1544  }
1545 
1546  FileLock* lock;
1547  const std::string lockname = LockFileName(dbname);
1548  Status result = env->LockFile(lockname, &lock);
1549  if (result.ok()) {
1550  uint64_t number;
1551  FileType type;
1552  for (size_t i = 0; i < filenames.size(); i++) {
1553  if (ParseFileName(filenames[i], &number, &type) &&
1554  type != kDBLockFile) { // Lock file will be deleted at end
1555  Status del = env->DeleteFile(dbname + "/" + filenames[i]);
1556  if (result.ok() && !del.ok()) {
1557  result = del;
1558  }
1559  }
1560  }
1561  env->UnlockFile(lock); // Ignore error since state is already gone
1562  env->DeleteFile(lockname);
1563  env->DeleteDir(dbname); // Ignore error in case dir contains other files
1564  }
1565  return result;
1566 }
1567 
1568 } // namespace leveldb
FileMetaData * input(int which, int i) const
Definition: version_set.h:340
CompactionStats stats_[config::kNumLevels]
Definition: db_impl.h:191
uint64_t ApproximateOffsetOf(Version *v, const InternalKey &key)
void TEST_CompactRange(int level, const Slice *begin, const Slice *end)
Definition: db_impl.cc:584
virtual Status status() const =0
VersionSet * versions_
Definition: db_impl.h:171
Env *const env_
Definition: db_impl.h:122
const int kNumNonTableCacheFiles
Definition: db_impl.cc:38
Status RecoverLogFile(uint64_t log_number, bool last_log, bool *save_manifest, VersionEdit *edit, SequenceNumber *max_sequence) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:365
static const SequenceNumber kMaxSequenceNumber
Definition: dbformat.h:67
virtual const Snapshot * GetSnapshot()
Definition: db_impl.cc:1175
Status Recover(VersionEdit *edit, bool *save_manifest) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:274
virtual Iterator * NewIterator(const ReadOptions &)
Definition: db_impl.cc:1156
void RecordBackgroundError(const Status &s)
Definition: db_impl.cc:637
void BackgroundCall()
Definition: db_impl.cc:667
SequenceNumber sequence
Definition: dbformat.h:72
uint64_t PrevLogNumber() const
Definition: version_set.h:225
SequenceNumber number_
Definition: snapshot.h:19
Version * current() const
Definition: version_set.h:185
void SetPrevLogNumber(uint64_t num)
Definition: version_edit.h:43
bool ShouldStopBefore(const Slice &internal_key)
static void ClipToRange(T *ptr, V minvalue, V maxvalue)
Definition: db_impl.cc:86
virtual Status UnlockFile(FileLock *lock)=0
int PickLevelForMemTableOutput(const Slice &smallest_user_key, const Slice &largest_user_key)
Definition: version_set.cc:502
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:534
int level() const
Definition: version_set.h:330
bool ParseFileName(const std::string &fname, uint64_t *number, FileType *type)
Definition: filename.cc:80
size_t write_buffer_size
Definition: options.h:83
void SetLastSequence(SequenceNumber seq)
Definition: version_edit.h:51
static const int kL0_StopWritesTrigger
Definition: dbformat.h:31
virtual Status Put(const WriteOptions &, const Slice &key, const Slice &value)
Definition: db_impl.cc:1186
static const int kNumLevels
Definition: dbformat.h:22
std::string OldInfoLogFileName(const std::string &dbname)
Definition: filename.cc:68
Status FinishCompactionOutputFile(CompactionState *compact, Iterator *input)
Definition: db_impl.cc:813
void SetLogNumber(uint64_t num)
Definition: version_edit.h:39
port::CondVar bg_cv_
Definition: db_impl.h:139
void SetComparatorName(const Slice &name)
Definition: version_edit.h:35
static size_t ByteSize(const WriteBatch *batch)
bool owns_cache_
Definition: db_impl.h:127
virtual Slice key() const =0
static const int kL0_SlowdownWritesTrigger
Definition: dbformat.h:28
void Add(const CompactionStats &c)
Definition: db_impl.h:185
virtual Slice value() const =0
InternalKey smallest
Definition: version_edit.h:22
virtual ~DB()
Definition: db_impl.cc:1488
Compaction *const compaction
Definition: db_impl.cc:52
void DeleteFile(int level, uint64_t file)
Definition: version_edit.h:75
bool UpdateStats(const GetStats &stats)
Definition: version_set.cc:431
TableCache * table_cache_
Definition: db_impl.h:131
const InternalKeyComparator internal_comparator_
Definition: db_impl.h:123
Writer(port::Mutex *mu)
Definition: db_impl.cc:48
void SetNextFile(uint64_t num)
Definition: version_edit.h:47
static void CleanupIteratorState(void *arg1, void *arg2)
Definition: db_impl.cc:1056
Iterator * MakeInputIterator(Compaction *c)
WritableFile * logfile_
Definition: db_impl.h:143
static void SetSequence(WriteBatch *batch, SequenceNumber seq)
Definition: write_batch.cc:94
Status LogAndApply(VersionEdit *edit, port::Mutex *mu) EXCLUSIVE_LOCKS_REQUIRED(mu)
Definition: version_set.cc:820
virtual void SleepForMicroseconds(int micros)=0
static Status InsertInto(const WriteBatch *batch, MemTable *memtable)
Definition: write_batch.cc:128
MemTable * imm_
Definition: db_impl.h:141
virtual void SeekToFirst()=0
void MaybeIgnoreError(Status *s) const
Definition: db_impl.cc:209
void RecordReadSample(Slice key)
Definition: db_impl.cc:1168
FileLock * db_lock_
Definition: db_impl.h:134
bool empty() const
Definition: slice.h:46
void DeleteObsoleteFiles()
Definition: db_impl.cc:218
void Log(Logger *info_log, const char *format,...)
Definition: env.cc:31
std::string InfoLogFileName(const std::string &dbname)
Definition: filename.cc:63
uint64_t MaxOutputFileSize() const
Definition: version_set.h:343
std::string ToString() const
Definition: slice.h:66
Slice user_key() const
Definition: dbformat.h:159
port::Mutex mutex_
Definition: db_impl.h:137
std::string DebugString() const
Definition: dbformat.cc:34
virtual bool GetProperty(const Slice &property, std::string *value)
Definition: db_impl.cc:1381
virtual void Next()=0
CompactionState(Compaction *c)
Definition: db_impl.cc:76
virtual Status GetFileSize(const std::string &fname, uint64_t *file_size)=0
virtual ~Snapshot()
Definition: db_impl.cc:1534
virtual Status NewWritableFile(const std::string &fname, WritableFile **result)=0
Iterator * NewDBIterator(DBImpl *db, const Comparator *user_key_comparator, Iterator *internal_iter, SequenceNumber sequence, uint32_t seed)
Definition: db_iter.cc:308
bool ParseInternalKey(const Slice &internal_key, ParsedInternalKey *result)
Definition: dbformat.h:176
virtual Status Put(const WriteOptions &options, const Slice &key, const Slice &value)=0
Definition: db_impl.cc:1476
size_t max_file_size
Definition: options.h:125
Status DestroyDB(const std::string &dbname, const Options &options)
Definition: db_impl.cc:1537
uint64_t SequenceNumber
Definition: dbformat.h:63
uint64_t ManifestFileNumber() const
Definition: version_set.h:188
WriteBatch * batch
Definition: db_impl.cc:43
int64_t TEST_MaxNextLevelOverlappingBytes()
Definition: db_impl.cc:1104
virtual void GetApproximateSizes(const Range *range, int n, uint64_t *sizes)
Definition: db_impl.cc:1448
std::string TableFileName(const std::string &name, uint64_t number)
Definition: filename.cc:32
virtual void ReleaseSnapshot(const Snapshot *snapshot)
Definition: db_impl.cc:1180
Status NewDB()
Definition: db_impl.cc:177
virtual Status Close()=0
static Status Corruption(const Slice &msg, const Slice &msg2=Slice())
Definition: status.h:38
int64_t NumLevelBytes(int level) const
void Evict(uint64_t file_number)
Definition: table_cache.cc:121
bool OverlapInLevel(int level, const Slice *smallest_user_key, const Slice *largest_user_key)
Definition: version_set.cc:495
uint64_t NewFileNumber()
Definition: version_set.h:191
virtual Status CreateDir(const std::string &dirname)=0
bool ConsumeDecimalNumber(Slice *in, uint64_t *val)
Definition: logging.cc:48
std::string ToString() const
Definition: status.cc:36
static Status OK()
Definition: status.h:32
static const ValueType kValueTypeForSeek
Definition: dbformat.h:61
WriteBatch * tmp_batch_
Definition: db_impl.h:150
SnapshotImpl * oldest() const
Definition: snapshot.h:39
bool ReadRecord(Slice *record, std::string *scratch)
Definition: log_reader.cc:60
std::string DebugString() const
Definition: version_set.cc:574
MemTable * mem_
Definition: db_impl.h:140
bool NeedsCompaction() const
Definition: version_set.h:251
void AddIterators(const ReadOptions &, std::vector< Iterator *> *iters)
Definition: version_set.cc:234
Iterator * TEST_NewInternalIterator()
Definition: db_impl.cc:1098
void Delete(const Slice &key)
Definition: write_batch.cc:105
virtual Status Delete(const WriteOptions &options, const Slice &key)=0
Definition: db_impl.cc:1482
WriteBatch * BuildBatchGroup(Writer **last_writer)
Definition: db_impl.cc:1269
std::string DescriptorFileName(const std::string &dbname, uint64_t number)
Definition: filename.cc:42
bool empty() const
Definition: snapshot.h:38
int NumLevelFiles(int level) const
static Status Open(const Options &options, const std::string &name, DB **dbptr)
Definition: db_impl.cc:1490
uint64_t FileSize() const
virtual void CompactRange(const Slice *begin, const Slice *end)
Definition: db_impl.cc:567
void MarkFileNumberUsed(uint64_t number)
static char dbname[200]
Definition: c_test.c:15
static void SetContents(WriteBatch *batch, const Slice &contents)
Definition: write_batch.cc:136
virtual size_t TotalCharge() const =0
ManualCompaction * manual_compaction_
Definition: db_impl.h:169
void DecodeFrom(const Slice &s)
Definition: dbformat.h:153
port::AtomicPointer has_imm_
Definition: db_impl.h:142
Status TEST_CompactMemTable()
Definition: db_impl.cc:621
Status SetCurrentFile(Env *env, const std::string &dbname, uint64_t descriptor_number)
Definition: filename.cc:126
std::string CurrentFileName(const std::string &dbname)
Definition: filename.cc:50
virtual Status Delete(const WriteOptions &, const Slice &key)
Definition: db_impl.cc:1190
bool IsTrivialMove() const
void EncodeTo(std::string *dst) const
Definition: version_edit.cc:41
void ReuseFileNumber(uint64_t file_number)
Definition: version_set.h:196
static void BGWork(void *db)
Definition: db_impl.cc:663
virtual Status LockFile(const std::string &fname, FileLock **lock)=0
uint64_t LogNumber() const
Definition: version_set.h:221
virtual Status RenameFile(const std::string &src, const std::string &target)=0
uint64_t NumEntries() const
bool create_if_missing
Definition: options.h:45
const std::string dbname_
Definition: db_impl.h:128
SequenceNumber smallest_snapshot
Definition: db_impl.cc:58
virtual bool FileExists(const std::string &fname)=0
uint32_t seed_
Definition: db_impl.h:146
Status Recover(bool *save_manifest)
Definition: version_set.cc:905
bool IsBaseLevelForKey(const Slice &user_key)
log::Writer * log_
Definition: db_impl.h:145
Logger * info_log
Definition: options.h:68
uint64_t LastSequence() const
Definition: version_set.h:209
void Delete(const SnapshotImpl *s)
Definition: snapshot.h:53
Status DoCompactionWork(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:887
port::CondVar cv
Definition: db_impl.cc:46
void remove_prefix(size_t n)
Definition: slice.h:59
virtual Status NewAppendableFile(const std::string &fname, WritableFile **result)
Definition: env.cc:12
Cache * block_cache
Definition: options.h:98
Iterator * NewMergingIterator(const Comparator *cmp, Iterator **list, int n)
Definition: merger.cc:186
Iterator * NewIterator(const ReadOptions &options, uint64_t file_number, uint64_t file_size, Table **tableptr=NULL)
Definition: table_cache.cc:82
bool RecordReadSample(Slice key)
Definition: version_set.cc:444
const char * LevelSummary(LevelSummaryStorage *scratch) const
virtual void Schedule(void(*function)(void *arg), void *arg)=0
int max_open_files
Definition: options.h:90
static int Count(const WriteBatch *batch)
Definition: write_batch.cc:82
Cache * NewLRUCache(size_t capacity)
Definition: cache.cc:401
Status MakeRoomForWrite(bool force) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:1318
const InternalFilterPolicy internal_filter_policy_
Definition: db_impl.h:124
SnapshotList snapshots_
Definition: db_impl.h:152
Status OpenCompactionOutputFile(CompactionState *compact)
Definition: db_impl.cc:788
uint64_t logfile_number_
Definition: db_impl.h:144
std::vector< Output > outputs
Definition: db_impl.cc:66
const Comparator * user_comparator() const
Definition: db_impl.h:197
bool paranoid_checks
Definition: options.h:57
const FilterPolicy * filter_policy
Definition: options.h:154
Compaction * CompactRange(int level, const InternalKey *begin, const InternalKey *end)
const Options options_
Definition: db_impl.h:125
DBImpl(const Options &options, const std::string &dbname)
Definition: db_impl.cc:117
bool ok() const
Definition: status.h:52
VersionEdit * edit()
Definition: version_set.h:334
const Comparator * comparator
Definition: options.h:41
static Status InvalidArgument(const Slice &msg, const Slice &msg2=Slice())
Definition: status.h:44
virtual Status NewSequentialFile(const std::string &fname, SequentialFile **result)=0
Compaction * PickCompaction()
bool owns_info_log_
Definition: db_impl.h:126
std::string LogFileName(const std::string &name, uint64_t number)
Definition: filename.cc:27
virtual Status Write(const WriteOptions &options, WriteBatch *updates)
Definition: db_impl.cc:1194
Status AddRecord(const Slice &slice)
Definition: log_writer.cc:36
static Slice Contents(const WriteBatch *batch)
Options SanitizeOptions(const std::string &dbname, const InternalKeyComparator *icmp, const InternalFilterPolicy *ipolicy, const Options &src)
Definition: db_impl.cc:90
size_t ApproximateMemoryUsage()
Definition: memtable.cc:31
virtual uint64_t NowMicros()=0
Status BuildTable(const std::string &dbname, Env *env, const Options &options, TableCache *table_cache, Iterator *iter, FileMetaData *meta)
Definition: builder.cc:17
void RegisterCleanup(CleanupFunction function, void *arg1, void *arg2)
Definition: iterator.cc:26
size_t block_size
Definition: options.h:106
bool starts_with(const Slice &x) const
Definition: slice.h:75
void CleanupCompaction(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:771
bool Get(const LookupKey &key, std::string *value, Status *s)
Definition: memtable.cc:108
void AddFile(int level, uint64_t file, uint64_t file_size, const InternalKey &smallest, const InternalKey &largest)
Definition: version_edit.h:62
Iterator * NewIterator()
Definition: memtable.cc:78
const InternalKey * end
Definition: db_impl.h:166
const Snapshot * snapshot
Definition: options.h:177
bool error_if_exists
Definition: options.h:49
void SetLastSequence(uint64_t s)
Definition: version_set.h:212
FileType
Definition: filename.h:20
virtual Status Sync()=0
Status InstallCompactionResults(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:866
virtual ~DBImpl()
Definition: db_impl.cc:148
Status Get(const ReadOptions &, const LookupKey &key, std::string *val, GetStats *stats)
Definition: version_set.cc:332
const InternalKey * begin
Definition: db_impl.h:165
std::set< uint64_t > pending_outputs_
Definition: db_impl.h:156
const char * data() const
Definition: slice.h:40
void AddLiveFiles(std::set< uint64_t > *live)
Definition: db.h:44
bool bg_compaction_scheduled_
Definition: db_impl.h:159
virtual Status DeleteFile(const std::string &fname)=0
virtual Status DeleteDir(const std::string &dirname)=0
static void Append(WriteBatch *dst, const WriteBatch *src)
Definition: write_batch.cc:141
static Status IOError(const Slice &msg, const Slice &msg2=Slice())
Definition: status.h:47
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:645
size_t size() const
Definition: slice.h:43
void AddInputDeletions(VersionEdit *edit)
virtual bool Valid() const =0
Status bg_error_
Definition: db_impl.h:174
virtual Status NewLogger(const std::string &fname, Logger **result)=0
void Put(const Slice &key, const Slice &value)
Definition: write_batch.cc:98
port::AtomicPointer shutting_down_
Definition: db_impl.h:138
const SnapshotImpl * New(SequenceNumber seq)
Definition: snapshot.h:42
static SequenceNumber Sequence(const WriteBatch *batch)
Definition: write_batch.cc:90
Status WriteLevel0Table(MemTable *mem, VersionEdit *edit, Version *base) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:488
virtual Status GetChildren(const std::string &dir, std::vector< std::string > *result)=0
void Add(const Slice &key, const Slice &value)
int64_t MaxNextLevelOverlappingBytes()
std::deque< Writer * > writers_
Definition: db_impl.h:149
Iterator * NewInternalIterator(const ReadOptions &, SequenceNumber *latest_snapshot, uint32_t *seed)
Definition: db_impl.cc:1067
int num_input_files(int which) const
Definition: version_set.h:337
std::string LockFileName(const std::string &dbname)
Definition: filename.cc:54
void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:686
virtual Status Get(const ReadOptions &options, const Slice &key, std::string *value)
Definition: db_impl.cc:1109