leveldb
Classes | Public Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Friends | List of all members
leveldb::DBImpl Class Reference

#include <db_impl.h>

Inheritance diagram for leveldb::DBImpl:
Inheritance graph
[legend]
Collaboration diagram for leveldb::DBImpl:
Collaboration graph
[legend]

Classes

struct  CompactionState
 
struct  CompactionStats
 
struct  ManualCompaction
 
struct  Writer
 

Public Member Functions

 DBImpl (const Options &options, const std::string &dbname)
 
virtual ~DBImpl ()
 
virtual Status Put (const WriteOptions &, const Slice &key, const Slice &value)
 
virtual Status Delete (const WriteOptions &, const Slice &key)
 
virtual Status Write (const WriteOptions &options, WriteBatch *updates)
 
virtual Status Get (const ReadOptions &options, const Slice &key, std::string *value)
 
virtual IteratorNewIterator (const ReadOptions &)
 
virtual const SnapshotGetSnapshot ()
 
virtual void ReleaseSnapshot (const Snapshot *snapshot)
 
virtual bool GetProperty (const Slice &property, std::string *value)
 
virtual void GetApproximateSizes (const Range *range, int n, uint64_t *sizes)
 
virtual void CompactRange (const Slice *begin, const Slice *end)
 
void TEST_CompactRange (int level, const Slice *begin, const Slice *end)
 
Status TEST_CompactMemTable ()
 
IteratorTEST_NewInternalIterator ()
 
int64_t TEST_MaxNextLevelOverlappingBytes ()
 
void RecordReadSample (Slice key)
 
- Public Member Functions inherited from leveldb::DB
 DB ()
 
virtual ~DB ()
 

Private Member Functions

IteratorNewInternalIterator (const ReadOptions &, SequenceNumber *latest_snapshot, uint32_t *seed)
 
Status NewDB ()
 
Status Recover (VersionEdit *edit, bool *save_manifest) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
 
void MaybeIgnoreError (Status *s) const
 
void DeleteObsoleteFiles ()
 
void CompactMemTable () EXCLUSIVE_LOCKS_REQUIRED(mutex_)
 
Status RecoverLogFile (uint64_t log_number, bool last_log, bool *save_manifest, VersionEdit *edit, SequenceNumber *max_sequence) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
 
Status WriteLevel0Table (MemTable *mem, VersionEdit *edit, Version *base) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
 
Status MakeRoomForWrite (bool force) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
 
WriteBatchBuildBatchGroup (Writer **last_writer)
 
void RecordBackgroundError (const Status &s)
 
void MaybeScheduleCompaction () EXCLUSIVE_LOCKS_REQUIRED(mutex_)
 
void BackgroundCall ()
 
void BackgroundCompaction () EXCLUSIVE_LOCKS_REQUIRED(mutex_)
 
void CleanupCompaction (CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
 
Status DoCompactionWork (CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
 
Status OpenCompactionOutputFile (CompactionState *compact)
 
Status FinishCompactionOutputFile (CompactionState *compact, Iterator *input)
 
Status InstallCompactionResults (CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
 
 DBImpl (const DBImpl &)
 
void operator= (const DBImpl &)
 
const Comparatoruser_comparator () const
 

Static Private Member Functions

static void BGWork (void *db)
 

Private Attributes

Env *const env_
 
const InternalKeyComparator internal_comparator_
 
const InternalFilterPolicy internal_filter_policy_
 
const Options options_
 
bool owns_info_log_
 
bool owns_cache_
 
const std::string dbname_
 
TableCachetable_cache_
 
FileLockdb_lock_
 
port::Mutex mutex_
 
port::AtomicPointer shutting_down_
 
port::CondVar bg_cv_
 
MemTablemem_
 
MemTableimm_
 
port::AtomicPointer has_imm_
 
WritableFilelogfile_
 
uint64_t logfile_number_
 
log::Writerlog_
 
uint32_t seed_
 
std::deque< Writer * > writers_
 
WriteBatchtmp_batch_
 
SnapshotList snapshots_
 
std::set< uint64_t > pending_outputs_
 
bool bg_compaction_scheduled_
 
ManualCompactionmanual_compaction_
 
VersionSetversions_
 
Status bg_error_
 
CompactionStats stats_ [config::kNumLevels]
 

Friends

class DB
 

Additional Inherited Members

- Static Public Member Functions inherited from leveldb::DB
static Status Open (const Options &options, const std::string &name, DB **dbptr)
 

Detailed Description

Definition at line 26 of file db_impl.h.


Class Documentation

§ leveldb::DBImpl::ManualCompaction

struct leveldb::DBImpl::ManualCompaction

Definition at line 162 of file db_impl.h.

Collaboration diagram for leveldb::DBImpl::ManualCompaction:
Class Members
const InternalKey * begin
bool done
const InternalKey * end
int level
InternalKey tmp_storage

Constructor & Destructor Documentation

§ DBImpl() [1/2]

leveldb::DBImpl::DBImpl ( const Options options,
const std::string &  dbname 
)

Definition at line 117 of file db_impl.cc.

118  : env_(raw_options.env),
119  internal_comparator_(raw_options.comparator),
120  internal_filter_policy_(raw_options.filter_policy),
122  &internal_filter_policy_, raw_options)),
123  owns_info_log_(options_.info_log != raw_options.info_log),
124  owns_cache_(options_.block_cache != raw_options.block_cache),
125  dbname_(dbname),
126  db_lock_(NULL),
127  shutting_down_(NULL),
128  bg_cv_(&mutex_),
129  mem_(NULL),
130  imm_(NULL),
131  logfile_(NULL),
132  logfile_number_(0),
133  log_(NULL),
134  seed_(0),
135  tmp_batch_(new WriteBatch),
137  manual_compaction_(NULL) {
138  has_imm_.Release_Store(NULL);
139 
140  // Reserve ten files or so for other uses and give the rest to TableCache.
141  const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
142  table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
143 
144  versions_ = new VersionSet(dbname_, &options_, table_cache_,
146 }
VersionSet * versions_
Definition: db_impl.h:171
Env *const env_
Definition: db_impl.h:122
const int kNumNonTableCacheFiles
Definition: db_impl.cc:38
port::CondVar bg_cv_
Definition: db_impl.h:139
bool owns_cache_
Definition: db_impl.h:127
TableCache * table_cache_
Definition: db_impl.h:131
const InternalKeyComparator internal_comparator_
Definition: db_impl.h:123
WritableFile * logfile_
Definition: db_impl.h:143
MemTable * imm_
Definition: db_impl.h:141
FileLock * db_lock_
Definition: db_impl.h:134
port::Mutex mutex_
Definition: db_impl.h:137
WriteBatch * tmp_batch_
Definition: db_impl.h:150
MemTable * mem_
Definition: db_impl.h:140
static char dbname[200]
Definition: c_test.c:15
ManualCompaction * manual_compaction_
Definition: db_impl.h:169
port::AtomicPointer has_imm_
Definition: db_impl.h:142
const std::string dbname_
Definition: db_impl.h:128
uint32_t seed_
Definition: db_impl.h:146
log::Writer * log_
Definition: db_impl.h:145
Logger * info_log
Definition: options.h:68
Cache * block_cache
Definition: options.h:98
int max_open_files
Definition: options.h:90
const InternalFilterPolicy internal_filter_policy_
Definition: db_impl.h:124
uint64_t logfile_number_
Definition: db_impl.h:144
const Options options_
Definition: db_impl.h:125
bool owns_info_log_
Definition: db_impl.h:126
Options SanitizeOptions(const std::string &dbname, const InternalKeyComparator *icmp, const InternalFilterPolicy *ipolicy, const Options &src)
Definition: db_impl.cc:90
bool bg_compaction_scheduled_
Definition: db_impl.h:159
port::AtomicPointer shutting_down_
Definition: db_impl.h:138
Here is the caller graph for this function:

§ ~DBImpl()

leveldb::DBImpl::~DBImpl ( )
virtual

Definition at line 148 of file db_impl.cc.

148  {
149  // Wait for background work to finish
150  mutex_.Lock();
151  shutting_down_.Release_Store(this); // Any non-NULL value is ok
152  while (bg_compaction_scheduled_) {
153  bg_cv_.Wait();
154  }
155  mutex_.Unlock();
156 
157  if (db_lock_ != NULL) {
159  }
160 
161  delete versions_;
162  if (mem_ != NULL) mem_->Unref();
163  if (imm_ != NULL) imm_->Unref();
164  delete tmp_batch_;
165  delete log_;
166  delete logfile_;
167  delete table_cache_;
168 
169  if (owns_info_log_) {
170  delete options_.info_log;
171  }
172  if (owns_cache_) {
173  delete options_.block_cache;
174  }
175 }
VersionSet * versions_
Definition: db_impl.h:171
Env *const env_
Definition: db_impl.h:122
virtual Status UnlockFile(FileLock *lock)=0
port::CondVar bg_cv_
Definition: db_impl.h:139
bool owns_cache_
Definition: db_impl.h:127
TableCache * table_cache_
Definition: db_impl.h:131
WritableFile * logfile_
Definition: db_impl.h:143
MemTable * imm_
Definition: db_impl.h:141
FileLock * db_lock_
Definition: db_impl.h:134
port::Mutex mutex_
Definition: db_impl.h:137
WriteBatch * tmp_batch_
Definition: db_impl.h:150
MemTable * mem_
Definition: db_impl.h:140
log::Writer * log_
Definition: db_impl.h:145
Logger * info_log
Definition: options.h:68
Cache * block_cache
Definition: options.h:98
const Options options_
Definition: db_impl.h:125
bool owns_info_log_
Definition: db_impl.h:126
bool bg_compaction_scheduled_
Definition: db_impl.h:159
port::AtomicPointer shutting_down_
Definition: db_impl.h:138
Here is the call graph for this function:

§ DBImpl() [2/2]

leveldb::DBImpl::DBImpl ( const DBImpl )
private

Member Function Documentation

§ BackgroundCall()

void leveldb::DBImpl::BackgroundCall ( )
private

Definition at line 667 of file db_impl.cc.

667  {
668  MutexLock l(&mutex_);
669  assert(bg_compaction_scheduled_);
670  if (shutting_down_.Acquire_Load()) {
671  // No more background work when shutting down.
672  } else if (!bg_error_.ok()) {
673  // No more background work after a background error.
674  } else {
676  }
677 
678  bg_compaction_scheduled_ = false;
679 
680  // Previous compaction may have produced too many files in a level,
681  // so reschedule another compaction if needed.
683  bg_cv_.SignalAll();
684 }
port::CondVar bg_cv_
Definition: db_impl.h:139
port::Mutex mutex_
Definition: db_impl.h:137
bool ok() const
Definition: status.h:52
bool bg_compaction_scheduled_
Definition: db_impl.h:159
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:645
Status bg_error_
Definition: db_impl.h:174
port::AtomicPointer shutting_down_
Definition: db_impl.h:138
void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:686
Here is the call graph for this function:
Here is the caller graph for this function:

§ BackgroundCompaction()

void leveldb::DBImpl::BackgroundCompaction ( )
private

Definition at line 686 of file db_impl.cc.

686  {
687  mutex_.AssertHeld();
688 
689  if (imm_ != NULL) {
690  CompactMemTable();
691  return;
692  }
693 
694  Compaction* c;
695  bool is_manual = (manual_compaction_ != NULL);
696  InternalKey manual_end;
697  if (is_manual) {
698  ManualCompaction* m = manual_compaction_;
699  c = versions_->CompactRange(m->level, m->begin, m->end);
700  m->done = (c == NULL);
701  if (c != NULL) {
702  manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
703  }
705  "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
706  m->level,
707  (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
708  (m->end ? m->end->DebugString().c_str() : "(end)"),
709  (m->done ? "(end)" : manual_end.DebugString().c_str()));
710  } else {
711  c = versions_->PickCompaction();
712  }
713 
714  Status status;
715  if (c == NULL) {
716  // Nothing to do
717  } else if (!is_manual && c->IsTrivialMove()) {
718  // Move file to next level
719  assert(c->num_input_files(0) == 1);
720  FileMetaData* f = c->input(0, 0);
721  c->edit()->DeleteFile(c->level(), f->number);
722  c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
723  f->smallest, f->largest);
724  status = versions_->LogAndApply(c->edit(), &mutex_);
725  if (!status.ok()) {
726  RecordBackgroundError(status);
727  }
728  VersionSet::LevelSummaryStorage tmp;
729  Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
730  static_cast<unsigned long long>(f->number),
731  c->level() + 1,
732  static_cast<unsigned long long>(f->file_size),
733  status.ToString().c_str(),
734  versions_->LevelSummary(&tmp));
735  } else {
736  CompactionState* compact = new CompactionState(c);
737  status = DoCompactionWork(compact);
738  if (!status.ok()) {
739  RecordBackgroundError(status);
740  }
741  CleanupCompaction(compact);
742  c->ReleaseInputs();
744  }
745  delete c;
746 
747  if (status.ok()) {
748  // Done
749  } else if (shutting_down_.Acquire_Load()) {
750  // Ignore compaction errors found during shutting down
751  } else {
753  "Compaction error: %s", status.ToString().c_str());
754  }
755 
756  if (is_manual) {
757  ManualCompaction* m = manual_compaction_;
758  if (!status.ok()) {
759  m->done = true;
760  }
761  if (!m->done) {
762  // We only compacted part of the requested range. Update *m
763  // to the range that is left to be compacted.
764  m->tmp_storage = manual_end;
765  m->begin = &m->tmp_storage;
766  }
767  manual_compaction_ = NULL;
768  }
769 }
FileMetaData * input(int which, int i) const
Definition: version_set.h:340
VersionSet * versions_
Definition: db_impl.h:171
void RecordBackgroundError(const Status &s)
Definition: db_impl.cc:637
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:534
Status LogAndApply(VersionEdit *edit, port::Mutex *mu) EXCLUSIVE_LOCKS_REQUIRED(mu)
Definition: version_set.cc:820
MemTable * imm_
Definition: db_impl.h:141
void DeleteObsoleteFiles()
Definition: db_impl.cc:218
void Log(Logger *info_log, const char *format,...)
Definition: env.cc:31
port::Mutex mutex_
Definition: db_impl.h:137
ManualCompaction * manual_compaction_
Definition: db_impl.h:169
Logger * info_log
Definition: options.h:68
Status DoCompactionWork(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:887
const char * LevelSummary(LevelSummaryStorage *scratch) const
Compaction * CompactRange(int level, const InternalKey *begin, const InternalKey *end)
const Options options_
Definition: db_impl.h:125
Compaction * PickCompaction()
void CleanupCompaction(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:771
port::AtomicPointer shutting_down_
Definition: db_impl.h:138
Here is the call graph for this function:
Here is the caller graph for this function:

§ BGWork()

void leveldb::DBImpl::BGWork ( void *  db)
staticprivate

Definition at line 663 of file db_impl.cc.

663  {
664  reinterpret_cast<DBImpl*>(db)->BackgroundCall();
665 }
void BackgroundCall()
Definition: db_impl.cc:667
DBImpl(const Options &options, const std::string &dbname)
Definition: db_impl.cc:117
Here is the call graph for this function:
Here is the caller graph for this function:

§ BuildBatchGroup()

WriteBatch * leveldb::DBImpl::BuildBatchGroup ( Writer **  last_writer)
private

Definition at line 1269 of file db_impl.cc.

1269  {
1270  assert(!writers_.empty());
1271  Writer* first = writers_.front();
1272  WriteBatch* result = first->batch;
1273  assert(result != NULL);
1274 
1275  size_t size = WriteBatchInternal::ByteSize(first->batch);
1276 
1277  // Allow the group to grow up to a maximum size, but if the
1278  // original write is small, limit the growth so we do not slow
1279  // down the small write too much.
1280  size_t max_size = 1 << 20;
1281  if (size <= (128<<10)) {
1282  max_size = size + (128<<10);
1283  }
1284 
1285  *last_writer = first;
1286  std::deque<Writer*>::iterator iter = writers_.begin();
1287  ++iter; // Advance past "first"
1288  for (; iter != writers_.end(); ++iter) {
1289  Writer* w = *iter;
1290  if (w->sync && !first->sync) {
1291  // Do not include a sync write into a batch handled by a non-sync write.
1292  break;
1293  }
1294 
1295  if (w->batch != NULL) {
1296  size += WriteBatchInternal::ByteSize(w->batch);
1297  if (size > max_size) {
1298  // Do not make batch too big
1299  break;
1300  }
1301 
1302  // Append to *result
1303  if (result == first->batch) {
1304  // Switch to temporary batch instead of disturbing caller's batch
1305  result = tmp_batch_;
1306  assert(WriteBatchInternal::Count(result) == 0);
1307  WriteBatchInternal::Append(result, first->batch);
1308  }
1309  WriteBatchInternal::Append(result, w->batch);
1310  }
1311  *last_writer = w;
1312  }
1313  return result;
1314 }
static size_t ByteSize(const WriteBatch *batch)
WriteBatch * tmp_batch_
Definition: db_impl.h:150
static int Count(const WriteBatch *batch)
Definition: write_batch.cc:82
static void Append(WriteBatch *dst, const WriteBatch *src)
Definition: write_batch.cc:141
std::deque< Writer * > writers_
Definition: db_impl.h:149
Here is the call graph for this function:
Here is the caller graph for this function:

§ CleanupCompaction()

void leveldb::DBImpl::CleanupCompaction ( CompactionState compact)
private

Definition at line 771 of file db_impl.cc.

771  {
772  mutex_.AssertHeld();
773  if (compact->builder != NULL) {
774  // May happen if we get a shutdown call in the middle of compaction
775  compact->builder->Abandon();
776  delete compact->builder;
777  } else {
778  assert(compact->outfile == NULL);
779  }
780  delete compact->outfile;
781  for (size_t i = 0; i < compact->outputs.size(); i++) {
782  const CompactionState::Output& out = compact->outputs[i];
783  pending_outputs_.erase(out.number);
784  }
785  delete compact;
786 }
port::Mutex mutex_
Definition: db_impl.h:137
std::set< uint64_t > pending_outputs_
Definition: db_impl.h:156
Here is the call graph for this function:
Here is the caller graph for this function:

§ CompactMemTable()

void leveldb::DBImpl::CompactMemTable ( )
private

Definition at line 534 of file db_impl.cc.

534  {
535  mutex_.AssertHeld();
536  assert(imm_ != NULL);
537 
538  // Save the contents of the memtable as a new Table
539  VersionEdit edit;
540  Version* base = versions_->current();
541  base->Ref();
542  Status s = WriteLevel0Table(imm_, &edit, base);
543  base->Unref();
544 
545  if (s.ok() && shutting_down_.Acquire_Load()) {
546  s = Status::IOError("Deleting DB during memtable compaction");
547  }
548 
549  // Replace immutable memtable with the generated Table
550  if (s.ok()) {
551  edit.SetPrevLogNumber(0);
552  edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
553  s = versions_->LogAndApply(&edit, &mutex_);
554  }
555 
556  if (s.ok()) {
557  // Commit to the new state
558  imm_->Unref();
559  imm_ = NULL;
560  has_imm_.Release_Store(NULL);
562  } else {
564  }
565 }
VersionSet * versions_
Definition: db_impl.h:171
void RecordBackgroundError(const Status &s)
Definition: db_impl.cc:637
Version * current() const
Definition: version_set.h:185
Status LogAndApply(VersionEdit *edit, port::Mutex *mu) EXCLUSIVE_LOCKS_REQUIRED(mu)
Definition: version_set.cc:820
MemTable * imm_
Definition: db_impl.h:141
void DeleteObsoleteFiles()
Definition: db_impl.cc:218
port::Mutex mutex_
Definition: db_impl.h:137
port::AtomicPointer has_imm_
Definition: db_impl.h:142
uint64_t logfile_number_
Definition: db_impl.h:144
static Status IOError(const Slice &msg, const Slice &msg2=Slice())
Definition: status.h:47
port::AtomicPointer shutting_down_
Definition: db_impl.h:138
Status WriteLevel0Table(MemTable *mem, VersionEdit *edit, Version *base) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:488
Here is the call graph for this function:
Here is the caller graph for this function:

§ CompactRange()

void leveldb::DBImpl::CompactRange ( const Slice begin,
const Slice end 
)
virtual

Implements leveldb::DB.

Definition at line 567 of file db_impl.cc.

567  {
568  int max_level_with_files = 1;
569  {
570  MutexLock l(&mutex_);
571  Version* base = versions_->current();
572  for (int level = 1; level < config::kNumLevels; level++) {
573  if (base->OverlapInLevel(level, begin, end)) {
574  max_level_with_files = level;
575  }
576  }
577  }
578  TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
579  for (int level = 0; level < max_level_with_files; level++) {
580  TEST_CompactRange(level, begin, end);
581  }
582 }
void TEST_CompactRange(int level, const Slice *begin, const Slice *end)
Definition: db_impl.cc:584
VersionSet * versions_
Definition: db_impl.h:171
Version * current() const
Definition: version_set.h:185
static const int kNumLevels
Definition: dbformat.h:22
port::Mutex mutex_
Definition: db_impl.h:137
Status TEST_CompactMemTable()
Definition: db_impl.cc:621
Here is the call graph for this function:
Here is the caller graph for this function:

§ Delete()

Status leveldb::DBImpl::Delete ( const WriteOptions options,
const Slice key 
)
virtual

Implements leveldb::DB.

Definition at line 1190 of file db_impl.cc.

1190  {
1191  return DB::Delete(options, key);
1192 }
virtual Status Delete(const WriteOptions &options, const Slice &key)=0
Definition: db_impl.cc:1482
Here is the call graph for this function:

§ DeleteObsoleteFiles()

void leveldb::DBImpl::DeleteObsoleteFiles ( )
private

Definition at line 218 of file db_impl.cc.

218  {
219  if (!bg_error_.ok()) {
220  // After a background error, we don't know whether a new version may
221  // or may not have been committed, so we cannot safely garbage collect.
222  return;
223  }
224 
225  // Make a set of all of the live files
226  std::set<uint64_t> live = pending_outputs_;
227  versions_->AddLiveFiles(&live);
228 
229  std::vector<std::string> filenames;
230  env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
231  uint64_t number;
232  FileType type;
233  for (size_t i = 0; i < filenames.size(); i++) {
234  if (ParseFileName(filenames[i], &number, &type)) {
235  bool keep = true;
236  switch (type) {
237  case kLogFile:
238  keep = ((number >= versions_->LogNumber()) ||
239  (number == versions_->PrevLogNumber()));
240  break;
241  case kDescriptorFile:
242  // Keep my manifest file, and any newer incarnations'
243  // (in case there is a race that allows other incarnations)
244  keep = (number >= versions_->ManifestFileNumber());
245  break;
246  case kTableFile:
247  keep = (live.find(number) != live.end());
248  break;
249  case kTempFile:
250  // Any temp files that are currently being written to must
251  // be recorded in pending_outputs_, which is inserted into "live"
252  keep = (live.find(number) != live.end());
253  break;
254  case kCurrentFile:
255  case kDBLockFile:
256  case kInfoLogFile:
257  keep = true;
258  break;
259  }
260 
261  if (!keep) {
262  if (type == kTableFile) {
263  table_cache_->Evict(number);
264  }
265  Log(options_.info_log, "Delete type=%d #%lld\n",
266  int(type),
267  static_cast<unsigned long long>(number));
268  env_->DeleteFile(dbname_ + "/" + filenames[i]);
269  }
270  }
271  }
272 }
VersionSet * versions_
Definition: db_impl.h:171
Env *const env_
Definition: db_impl.h:122
uint64_t PrevLogNumber() const
Definition: version_set.h:225
bool ParseFileName(const std::string &fname, uint64_t *number, FileType *type)
Definition: filename.cc:80
TableCache * table_cache_
Definition: db_impl.h:131
void Log(Logger *info_log, const char *format,...)
Definition: env.cc:31
uint64_t ManifestFileNumber() const
Definition: version_set.h:188
void Evict(uint64_t file_number)
Definition: table_cache.cc:121
uint64_t LogNumber() const
Definition: version_set.h:221
const std::string dbname_
Definition: db_impl.h:128
Logger * info_log
Definition: options.h:68
const Options options_
Definition: db_impl.h:125
bool ok() const
Definition: status.h:52
FileType
Definition: filename.h:20
std::set< uint64_t > pending_outputs_
Definition: db_impl.h:156
void AddLiveFiles(std::set< uint64_t > *live)
virtual Status DeleteFile(const std::string &fname)=0
Status bg_error_
Definition: db_impl.h:174
virtual Status GetChildren(const std::string &dir, std::vector< std::string > *result)=0
Here is the call graph for this function:
Here is the caller graph for this function:

§ DoCompactionWork()

Status leveldb::DBImpl::DoCompactionWork ( CompactionState compact)
private

Definition at line 887 of file db_impl.cc.

887  {
888  const uint64_t start_micros = env_->NowMicros();
889  int64_t imm_micros = 0; // Micros spent doing imm_ compactions
890 
891  Log(options_.info_log, "Compacting %d@%d + %d@%d files",
892  compact->compaction->num_input_files(0),
893  compact->compaction->level(),
894  compact->compaction->num_input_files(1),
895  compact->compaction->level() + 1);
896 
897  assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
898  assert(compact->builder == NULL);
899  assert(compact->outfile == NULL);
900  if (snapshots_.empty()) {
901  compact->smallest_snapshot = versions_->LastSequence();
902  } else {
903  compact->smallest_snapshot = snapshots_.oldest()->number_;
904  }
905 
906  // Release mutex while we're actually doing the compaction work
907  mutex_.Unlock();
908 
909  Iterator* input = versions_->MakeInputIterator(compact->compaction);
910  input->SeekToFirst();
911  Status status;
912  ParsedInternalKey ikey;
913  std::string current_user_key;
914  bool has_current_user_key = false;
915  SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
916  for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
917  // Prioritize immutable compaction work
918  if (has_imm_.NoBarrier_Load() != NULL) {
919  const uint64_t imm_start = env_->NowMicros();
920  mutex_.Lock();
921  if (imm_ != NULL) {
922  CompactMemTable();
923  bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
924  }
925  mutex_.Unlock();
926  imm_micros += (env_->NowMicros() - imm_start);
927  }
928 
929  Slice key = input->key();
930  if (compact->compaction->ShouldStopBefore(key) &&
931  compact->builder != NULL) {
932  status = FinishCompactionOutputFile(compact, input);
933  if (!status.ok()) {
934  break;
935  }
936  }
937 
938  // Handle key/value, add to state, etc.
939  bool drop = false;
940  if (!ParseInternalKey(key, &ikey)) {
941  // Do not hide error keys
942  current_user_key.clear();
943  has_current_user_key = false;
944  last_sequence_for_key = kMaxSequenceNumber;
945  } else {
946  if (!has_current_user_key ||
947  user_comparator()->Compare(ikey.user_key,
948  Slice(current_user_key)) != 0) {
949  // First occurrence of this user key
950  current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
951  has_current_user_key = true;
952  last_sequence_for_key = kMaxSequenceNumber;
953  }
954 
955  if (last_sequence_for_key <= compact->smallest_snapshot) {
956  // Hidden by an newer entry for same user key
957  drop = true; // (A)
958  } else if (ikey.type == kTypeDeletion &&
959  ikey.sequence <= compact->smallest_snapshot &&
960  compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
961  // For this user key:
962  // (1) there is no data in higher levels
963  // (2) data in lower levels will have larger sequence numbers
964  // (3) data in layers that are being compacted here and have
965  // smaller sequence numbers will be dropped in the next
966  // few iterations of this loop (by rule (A) above).
967  // Therefore this deletion marker is obsolete and can be dropped.
968  drop = true;
969  }
970 
971  last_sequence_for_key = ikey.sequence;
972  }
973 #if 0
975  " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
976  "%d smallest_snapshot: %d",
977  ikey.user_key.ToString().c_str(),
978  (int)ikey.sequence, ikey.type, kTypeValue, drop,
979  compact->compaction->IsBaseLevelForKey(ikey.user_key),
980  (int)last_sequence_for_key, (int)compact->smallest_snapshot);
981 #endif
982 
983  if (!drop) {
984  // Open output file if necessary
985  if (compact->builder == NULL) {
986  status = OpenCompactionOutputFile(compact);
987  if (!status.ok()) {
988  break;
989  }
990  }
991  if (compact->builder->NumEntries() == 0) {
992  compact->current_output()->smallest.DecodeFrom(key);
993  }
994  compact->current_output()->largest.DecodeFrom(key);
995  compact->builder->Add(key, input->value());
996 
997  // Close output file if it is big enough
998  if (compact->builder->FileSize() >=
999  compact->compaction->MaxOutputFileSize()) {
1000  status = FinishCompactionOutputFile(compact, input);
1001  if (!status.ok()) {
1002  break;
1003  }
1004  }
1005  }
1006 
1007  input->Next();
1008  }
1009 
1010  if (status.ok() && shutting_down_.Acquire_Load()) {
1011  status = Status::IOError("Deleting DB during compaction");
1012  }
1013  if (status.ok() && compact->builder != NULL) {
1014  status = FinishCompactionOutputFile(compact, input);
1015  }
1016  if (status.ok()) {
1017  status = input->status();
1018  }
1019  delete input;
1020  input = NULL;
1021 
1022  CompactionStats stats;
1023  stats.micros = env_->NowMicros() - start_micros - imm_micros;
1024  for (int which = 0; which < 2; which++) {
1025  for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
1026  stats.bytes_read += compact->compaction->input(which, i)->file_size;
1027  }
1028  }
1029  for (size_t i = 0; i < compact->outputs.size(); i++) {
1030  stats.bytes_written += compact->outputs[i].file_size;
1031  }
1032 
1033  mutex_.Lock();
1034  stats_[compact->compaction->level() + 1].Add(stats);
1035 
1036  if (status.ok()) {
1037  status = InstallCompactionResults(compact);
1038  }
1039  if (!status.ok()) {
1040  RecordBackgroundError(status);
1041  }
1042  VersionSet::LevelSummaryStorage tmp;
1044  "compacted to: %s", versions_->LevelSummary(&tmp));
1045  return status;
1046 }
CompactionStats stats_[config::kNumLevels]
Definition: db_impl.h:191
VersionSet * versions_
Definition: db_impl.h:171
Env *const env_
Definition: db_impl.h:122
static const SequenceNumber kMaxSequenceNumber
Definition: dbformat.h:67
void RecordBackgroundError(const Status &s)
Definition: db_impl.cc:637
SequenceNumber number_
Definition: snapshot.h:19
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:534
Status FinishCompactionOutputFile(CompactionState *compact, Iterator *input)
Definition: db_impl.cc:813
port::CondVar bg_cv_
Definition: db_impl.h:139
void Add(const CompactionStats &c)
Definition: db_impl.h:185
Iterator * MakeInputIterator(Compaction *c)
MemTable * imm_
Definition: db_impl.h:141
virtual void SeekToFirst()=0
void Log(Logger *info_log, const char *format,...)
Definition: env.cc:31
port::Mutex mutex_
Definition: db_impl.h:137
bool ParseInternalKey(const Slice &internal_key, ParsedInternalKey *result)
Definition: dbformat.h:176
uint64_t SequenceNumber
Definition: dbformat.h:63
SnapshotImpl * oldest() const
Definition: snapshot.h:39
bool empty() const
Definition: snapshot.h:38
int NumLevelFiles(int level) const
port::AtomicPointer has_imm_
Definition: db_impl.h:142
Logger * info_log
Definition: options.h:68
uint64_t LastSequence() const
Definition: version_set.h:209
const char * LevelSummary(LevelSummaryStorage *scratch) const
SnapshotList snapshots_
Definition: db_impl.h:152
Status OpenCompactionOutputFile(CompactionState *compact)
Definition: db_impl.cc:788
const Comparator * user_comparator() const
Definition: db_impl.h:197
const Options options_
Definition: db_impl.h:125
virtual uint64_t NowMicros()=0
Status InstallCompactionResults(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:866
static Status IOError(const Slice &msg, const Slice &msg2=Slice())
Definition: status.h:47
port::AtomicPointer shutting_down_
Definition: db_impl.h:138
Here is the call graph for this function:
Here is the caller graph for this function:

§ FinishCompactionOutputFile()

Status leveldb::DBImpl::FinishCompactionOutputFile ( CompactionState compact,
Iterator input 
)
private

Definition at line 813 of file db_impl.cc.

814  {
815  assert(compact != NULL);
816  assert(compact->outfile != NULL);
817  assert(compact->builder != NULL);
818 
819  const uint64_t output_number = compact->current_output()->number;
820  assert(output_number != 0);
821 
822  // Check for iterator errors
823  Status s = input->status();
824  const uint64_t current_entries = compact->builder->NumEntries();
825  if (s.ok()) {
826  s = compact->builder->Finish();
827  } else {
828  compact->builder->Abandon();
829  }
830  const uint64_t current_bytes = compact->builder->FileSize();
831  compact->current_output()->file_size = current_bytes;
832  compact->total_bytes += current_bytes;
833  delete compact->builder;
834  compact->builder = NULL;
835 
836  // Finish and check for file errors
837  if (s.ok()) {
838  s = compact->outfile->Sync();
839  }
840  if (s.ok()) {
841  s = compact->outfile->Close();
842  }
843  delete compact->outfile;
844  compact->outfile = NULL;
845 
846  if (s.ok() && current_entries > 0) {
847  // Verify that the table is usable
848  Iterator* iter = table_cache_->NewIterator(ReadOptions(),
849  output_number,
850  current_bytes);
851  s = iter->status();
852  delete iter;
853  if (s.ok()) {
855  "Generated table #%llu@%d: %lld keys, %lld bytes",
856  (unsigned long long) output_number,
857  compact->compaction->level(),
858  (unsigned long long) current_entries,
859  (unsigned long long) current_bytes);
860  }
861  }
862  return s;
863 }
virtual Status status() const =0
TableCache * table_cache_
Definition: db_impl.h:131
void Log(Logger *info_log, const char *format,...)
Definition: env.cc:31
Logger * info_log
Definition: options.h:68
Iterator * NewIterator(const ReadOptions &options, uint64_t file_number, uint64_t file_size, Table **tableptr=NULL)
Definition: table_cache.cc:82
const Options options_
Definition: db_impl.h:125
Here is the call graph for this function:
Here is the caller graph for this function:

§ Get()

Status leveldb::DBImpl::Get ( const ReadOptions options,
const Slice key,
std::string *  value 
)
virtual

Implements leveldb::DB.

Definition at line 1109 of file db_impl.cc.

1111  {
1112  Status s;
1113  MutexLock l(&mutex_);
1114  SequenceNumber snapshot;
1115  if (options.snapshot != NULL) {
1116  snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
1117  } else {
1118  snapshot = versions_->LastSequence();
1119  }
1120 
1121  MemTable* mem = mem_;
1122  MemTable* imm = imm_;
1123  Version* current = versions_->current();
1124  mem->Ref();
1125  if (imm != NULL) imm->Ref();
1126  current->Ref();
1127 
1128  bool have_stat_update = false;
1129  Version::GetStats stats;
1130 
1131  // Unlock while reading from files and memtables
1132  {
1133  mutex_.Unlock();
1134  // First look in the memtable, then in the immutable memtable (if any).
1135  LookupKey lkey(key, snapshot);
1136  if (mem->Get(lkey, value, &s)) {
1137  // Done
1138  } else if (imm != NULL && imm->Get(lkey, value, &s)) {
1139  // Done
1140  } else {
1141  s = current->Get(options, lkey, value, &stats);
1142  have_stat_update = true;
1143  }
1144  mutex_.Lock();
1145  }
1146 
1147  if (have_stat_update && current->UpdateStats(stats)) {
1149  }
1150  mem->Unref();
1151  if (imm != NULL) imm->Unref();
1152  current->Unref();
1153  return s;
1154 }
VersionSet * versions_
Definition: db_impl.h:171
Version * current() const
Definition: version_set.h:185
MemTable * imm_
Definition: db_impl.h:141
port::Mutex mutex_
Definition: db_impl.h:137
uint64_t SequenceNumber
Definition: dbformat.h:63
MemTable * mem_
Definition: db_impl.h:140
uint64_t LastSequence() const
Definition: version_set.h:209
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:645
Here is the call graph for this function:

§ GetApproximateSizes()

void leveldb::DBImpl::GetApproximateSizes ( const Range range,
int  n,
uint64_t *  sizes 
)
virtual

Implements leveldb::DB.

Definition at line 1448 of file db_impl.cc.

1450  {
1451  // TODO(opt): better implementation
1452  Version* v;
1453  {
1454  MutexLock l(&mutex_);
1455  versions_->current()->Ref();
1456  v = versions_->current();
1457  }
1458 
1459  for (int i = 0; i < n; i++) {
1460  // Convert user_key into a corresponding internal key.
1461  InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1462  InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1463  uint64_t start = versions_->ApproximateOffsetOf(v, k1);
1464  uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
1465  sizes[i] = (limit >= start ? limit - start : 0);
1466  }
1467 
1468  {
1469  MutexLock l(&mutex_);
1470  v->Unref();
1471  }
1472 }
uint64_t ApproximateOffsetOf(Version *v, const InternalKey &key)
VersionSet * versions_
Definition: db_impl.h:171
static const SequenceNumber kMaxSequenceNumber
Definition: dbformat.h:67
Version * current() const
Definition: version_set.h:185
port::Mutex mutex_
Definition: db_impl.h:137
static const ValueType kValueTypeForSeek
Definition: dbformat.h:61
Here is the call graph for this function:

§ GetProperty()

bool leveldb::DBImpl::GetProperty ( const Slice property,
std::string *  value 
)
virtual

Implements leveldb::DB.

Definition at line 1381 of file db_impl.cc.

1381  {
1382  value->clear();
1383 
1384  MutexLock l(&mutex_);
1385  Slice in = property;
1386  Slice prefix("leveldb.");
1387  if (!in.starts_with(prefix)) return false;
1388  in.remove_prefix(prefix.size());
1389 
1390  if (in.starts_with("num-files-at-level")) {
1391  in.remove_prefix(strlen("num-files-at-level"));
1392  uint64_t level;
1393  bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
1394  if (!ok || level >= config::kNumLevels) {
1395  return false;
1396  } else {
1397  char buf[100];
1398  snprintf(buf, sizeof(buf), "%d",
1399  versions_->NumLevelFiles(static_cast<int>(level)));
1400  *value = buf;
1401  return true;
1402  }
1403  } else if (in == "stats") {
1404  char buf[200];
1405  snprintf(buf, sizeof(buf),
1406  " Compactions\n"
1407  "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1408  "--------------------------------------------------\n"
1409  );
1410  value->append(buf);
1411  for (int level = 0; level < config::kNumLevels; level++) {
1412  int files = versions_->NumLevelFiles(level);
1413  if (stats_[level].micros > 0 || files > 0) {
1414  snprintf(
1415  buf, sizeof(buf),
1416  "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
1417  level,
1418  files,
1419  versions_->NumLevelBytes(level) / 1048576.0,
1420  stats_[level].micros / 1e6,
1421  stats_[level].bytes_read / 1048576.0,
1422  stats_[level].bytes_written / 1048576.0);
1423  value->append(buf);
1424  }
1425  }
1426  return true;
1427  } else if (in == "sstables") {
1428  *value = versions_->current()->DebugString();
1429  return true;
1430  } else if (in == "approximate-memory-usage") {
1431  size_t total_usage = options_.block_cache->TotalCharge();
1432  if (mem_) {
1433  total_usage += mem_->ApproximateMemoryUsage();
1434  }
1435  if (imm_) {
1436  total_usage += imm_->ApproximateMemoryUsage();
1437  }
1438  char buf[50];
1439  snprintf(buf, sizeof(buf), "%llu",
1440  static_cast<unsigned long long>(total_usage));
1441  value->append(buf);
1442  return true;
1443  }
1444 
1445  return false;
1446 }
CompactionStats stats_[config::kNumLevels]
Definition: db_impl.h:191
VersionSet * versions_
Definition: db_impl.h:171
Version * current() const
Definition: version_set.h:185
static const int kNumLevels
Definition: dbformat.h:22
MemTable * imm_
Definition: db_impl.h:141
port::Mutex mutex_
Definition: db_impl.h:137
int64_t NumLevelBytes(int level) const
bool ConsumeDecimalNumber(Slice *in, uint64_t *val)
Definition: logging.cc:48
std::string DebugString() const
Definition: version_set.cc:574
MemTable * mem_
Definition: db_impl.h:140
int NumLevelFiles(int level) const
virtual size_t TotalCharge() const =0
Cache * block_cache
Definition: options.h:98
const Options options_
Definition: db_impl.h:125
size_t ApproximateMemoryUsage()
Definition: memtable.cc:31
Here is the call graph for this function:

§ GetSnapshot()

const Snapshot * leveldb::DBImpl::GetSnapshot ( )
virtual

Implements leveldb::DB.

Definition at line 1175 of file db_impl.cc.

1175  {
1176  MutexLock l(&mutex_);
1177  return snapshots_.New(versions_->LastSequence());
1178 }
VersionSet * versions_
Definition: db_impl.h:171
port::Mutex mutex_
Definition: db_impl.h:137
uint64_t LastSequence() const
Definition: version_set.h:209
SnapshotList snapshots_
Definition: db_impl.h:152
const SnapshotImpl * New(SequenceNumber seq)
Definition: snapshot.h:42
Here is the call graph for this function:

§ InstallCompactionResults()

Status leveldb::DBImpl::InstallCompactionResults ( CompactionState compact)
private

Definition at line 866 of file db_impl.cc.

866  {
867  mutex_.AssertHeld();
868  Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
869  compact->compaction->num_input_files(0),
870  compact->compaction->level(),
871  compact->compaction->num_input_files(1),
872  compact->compaction->level() + 1,
873  static_cast<long long>(compact->total_bytes));
874 
875  // Add compaction outputs
876  compact->compaction->AddInputDeletions(compact->compaction->edit());
877  const int level = compact->compaction->level();
878  for (size_t i = 0; i < compact->outputs.size(); i++) {
879  const CompactionState::Output& out = compact->outputs[i];
880  compact->compaction->edit()->AddFile(
881  level + 1,
882  out.number, out.file_size, out.smallest, out.largest);
883  }
884  return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
885 }
VersionSet * versions_
Definition: db_impl.h:171
Status LogAndApply(VersionEdit *edit, port::Mutex *mu) EXCLUSIVE_LOCKS_REQUIRED(mu)
Definition: version_set.cc:820
void Log(Logger *info_log, const char *format,...)
Definition: env.cc:31
port::Mutex mutex_
Definition: db_impl.h:137
Logger * info_log
Definition: options.h:68
const Options options_
Definition: db_impl.h:125
Here is the call graph for this function:
Here is the caller graph for this function:

§ MakeRoomForWrite()

Status leveldb::DBImpl::MakeRoomForWrite ( bool  force)
private

Definition at line 1318 of file db_impl.cc.

1318  {
1319  mutex_.AssertHeld();
1320  assert(!writers_.empty());
1321  bool allow_delay = !force;
1322  Status s;
1323  while (true) {
1324  if (!bg_error_.ok()) {
1325  // Yield previous error
1326  s = bg_error_;
1327  break;
1328  } else if (
1329  allow_delay &&
1331  // We are getting close to hitting a hard limit on the number of
1332  // L0 files. Rather than delaying a single write by several
1333  // seconds when we hit the hard limit, start delaying each
1334  // individual write by 1ms to reduce latency variance. Also,
1335  // this delay hands over some CPU to the compaction thread in
1336  // case it is sharing the same core as the writer.
1337  mutex_.Unlock();
1338  env_->SleepForMicroseconds(1000);
1339  allow_delay = false; // Do not delay a single write more than once
1340  mutex_.Lock();
1341  } else if (!force &&
1343  // There is room in current memtable
1344  break;
1345  } else if (imm_ != NULL) {
1346  // We have filled up the current memtable, but the previous
1347  // one is still being compacted, so we wait.
1348  Log(options_.info_log, "Current memtable full; waiting...\n");
1349  bg_cv_.Wait();
1351  // There are too many level-0 files.
1352  Log(options_.info_log, "Too many L0 files; waiting...\n");
1353  bg_cv_.Wait();
1354  } else {
1355  // Attempt to switch to a new memtable and trigger compaction of old
1356  assert(versions_->PrevLogNumber() == 0);
1357  uint64_t new_log_number = versions_->NewFileNumber();
1358  WritableFile* lfile = NULL;
1359  s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
1360  if (!s.ok()) {
1361  // Avoid chewing through file number space in a tight loop.
1362  versions_->ReuseFileNumber(new_log_number);
1363  break;
1364  }
1365  delete log_;
1366  delete logfile_;
1367  logfile_ = lfile;
1368  logfile_number_ = new_log_number;
1369  log_ = new log::Writer(lfile);
1370  imm_ = mem_;
1371  has_imm_.Release_Store(imm_);
1372  mem_ = new MemTable(internal_comparator_);
1373  mem_->Ref();
1374  force = false; // Do not force another compaction if have room
1376  }
1377  }
1378  return s;
1379 }
VersionSet * versions_
Definition: db_impl.h:171
Env *const env_
Definition: db_impl.h:122
uint64_t PrevLogNumber() const
Definition: version_set.h:225
size_t write_buffer_size
Definition: options.h:83
static const int kL0_StopWritesTrigger
Definition: dbformat.h:31
port::CondVar bg_cv_
Definition: db_impl.h:139
static const int kL0_SlowdownWritesTrigger
Definition: dbformat.h:28
const InternalKeyComparator internal_comparator_
Definition: db_impl.h:123
WritableFile * logfile_
Definition: db_impl.h:143
virtual void SleepForMicroseconds(int micros)=0
MemTable * imm_
Definition: db_impl.h:141
void Log(Logger *info_log, const char *format,...)
Definition: env.cc:31
port::Mutex mutex_
Definition: db_impl.h:137
virtual Status NewWritableFile(const std::string &fname, WritableFile **result)=0
uint64_t NewFileNumber()
Definition: version_set.h:191
MemTable * mem_
Definition: db_impl.h:140
int NumLevelFiles(int level) const
port::AtomicPointer has_imm_
Definition: db_impl.h:142
void ReuseFileNumber(uint64_t file_number)
Definition: version_set.h:196
const std::string dbname_
Definition: db_impl.h:128
log::Writer * log_
Definition: db_impl.h:145
Logger * info_log
Definition: options.h:68
uint64_t logfile_number_
Definition: db_impl.h:144
const Options options_
Definition: db_impl.h:125
bool ok() const
Definition: status.h:52
std::string LogFileName(const std::string &name, uint64_t number)
Definition: filename.cc:27
size_t ApproximateMemoryUsage()
Definition: memtable.cc:31
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:645
Status bg_error_
Definition: db_impl.h:174
std::deque< Writer * > writers_
Definition: db_impl.h:149
Here is the call graph for this function:
Here is the caller graph for this function:

§ MaybeIgnoreError()

void leveldb::DBImpl::MaybeIgnoreError ( Status s) const
private

Definition at line 209 of file db_impl.cc.

209  {
210  if (s->ok() || options_.paranoid_checks) {
211  // No change needed
212  } else {
213  Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
214  *s = Status::OK();
215  }
216 }
void Log(Logger *info_log, const char *format,...)
Definition: env.cc:31
static Status OK()
Definition: status.h:32
Logger * info_log
Definition: options.h:68
bool paranoid_checks
Definition: options.h:57
const Options options_
Definition: db_impl.h:125
Here is the call graph for this function:
Here is the caller graph for this function:

§ MaybeScheduleCompaction()

void leveldb::DBImpl::MaybeScheduleCompaction ( )
private

Definition at line 645 of file db_impl.cc.

645  {
646  mutex_.AssertHeld();
648  // Already scheduled
649  } else if (shutting_down_.Acquire_Load()) {
650  // DB is being deleted; no more background compactions
651  } else if (!bg_error_.ok()) {
652  // Already got an error; no more changes
653  } else if (imm_ == NULL &&
654  manual_compaction_ == NULL &&
656  // No work to be done
657  } else {
659  env_->Schedule(&DBImpl::BGWork, this);
660  }
661 }
VersionSet * versions_
Definition: db_impl.h:171
Env *const env_
Definition: db_impl.h:122
MemTable * imm_
Definition: db_impl.h:141
port::Mutex mutex_
Definition: db_impl.h:137
bool NeedsCompaction() const
Definition: version_set.h:251
ManualCompaction * manual_compaction_
Definition: db_impl.h:169
static void BGWork(void *db)
Definition: db_impl.cc:663
virtual void Schedule(void(*function)(void *arg), void *arg)=0
bool ok() const
Definition: status.h:52
bool bg_compaction_scheduled_
Definition: db_impl.h:159
Status bg_error_
Definition: db_impl.h:174
port::AtomicPointer shutting_down_
Definition: db_impl.h:138
Here is the call graph for this function:
Here is the caller graph for this function:

§ NewDB()

Status leveldb::DBImpl::NewDB ( )
private

Definition at line 177 of file db_impl.cc.

177  {
178  VersionEdit new_db;
179  new_db.SetComparatorName(user_comparator()->Name());
180  new_db.SetLogNumber(0);
181  new_db.SetNextFile(2);
182  new_db.SetLastSequence(0);
183 
184  const std::string manifest = DescriptorFileName(dbname_, 1);
185  WritableFile* file;
186  Status s = env_->NewWritableFile(manifest, &file);
187  if (!s.ok()) {
188  return s;
189  }
190  {
191  log::Writer log(file);
192  std::string record;
193  new_db.EncodeTo(&record);
194  s = log.AddRecord(record);
195  if (s.ok()) {
196  s = file->Close();
197  }
198  }
199  delete file;
200  if (s.ok()) {
201  // Make "CURRENT" file that points to the new manifest file.
202  s = SetCurrentFile(env_, dbname_, 1);
203  } else {
204  env_->DeleteFile(manifest);
205  }
206  return s;
207 }
Env *const env_
Definition: db_impl.h:122
virtual Status NewWritableFile(const std::string &fname, WritableFile **result)=0
std::string DescriptorFileName(const std::string &dbname, uint64_t number)
Definition: filename.cc:42
Status SetCurrentFile(Env *env, const std::string &dbname, uint64_t descriptor_number)
Definition: filename.cc:126
const std::string dbname_
Definition: db_impl.h:128
const Comparator * user_comparator() const
Definition: db_impl.h:197
virtual Status DeleteFile(const std::string &fname)=0
Here is the call graph for this function:
Here is the caller graph for this function:

§ NewInternalIterator()

Iterator * leveldb::DBImpl::NewInternalIterator ( const ReadOptions options,
SequenceNumber latest_snapshot,
uint32_t *  seed 
)
private

Definition at line 1067 of file db_impl.cc.

1069  {
1070  IterState* cleanup = new IterState;
1071  mutex_.Lock();
1072  *latest_snapshot = versions_->LastSequence();
1073 
1074  // Collect together all needed child iterators
1075  std::vector<Iterator*> list;
1076  list.push_back(mem_->NewIterator());
1077  mem_->Ref();
1078  if (imm_ != NULL) {
1079  list.push_back(imm_->NewIterator());
1080  imm_->Ref();
1081  }
1082  versions_->current()->AddIterators(options, &list);
1083  Iterator* internal_iter =
1084  NewMergingIterator(&internal_comparator_, &list[0], list.size());
1085  versions_->current()->Ref();
1086 
1087  cleanup->mu = &mutex_;
1088  cleanup->mem = mem_;
1089  cleanup->imm = imm_;
1090  cleanup->version = versions_->current();
1091  internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
1092 
1093  *seed = ++seed_;
1094  mutex_.Unlock();
1095  return internal_iter;
1096 }
VersionSet * versions_
Definition: db_impl.h:171
Version * current() const
Definition: version_set.h:185
const InternalKeyComparator internal_comparator_
Definition: db_impl.h:123
static void CleanupIteratorState(void *arg1, void *arg2)
Definition: db_impl.cc:1056
MemTable * imm_
Definition: db_impl.h:141
port::Mutex mutex_
Definition: db_impl.h:137
MemTable * mem_
Definition: db_impl.h:140
void AddIterators(const ReadOptions &, std::vector< Iterator *> *iters)
Definition: version_set.cc:234
uint32_t seed_
Definition: db_impl.h:146
uint64_t LastSequence() const
Definition: version_set.h:209
Iterator * NewMergingIterator(const Comparator *cmp, Iterator **list, int n)
Definition: merger.cc:186
Iterator * NewIterator()
Definition: memtable.cc:78
Here is the call graph for this function:
Here is the caller graph for this function:

§ NewIterator()

Iterator * leveldb::DBImpl::NewIterator ( const ReadOptions options)
virtual

Implements leveldb::DB.

Definition at line 1156 of file db_impl.cc.

1156  {
1157  SequenceNumber latest_snapshot;
1158  uint32_t seed;
1159  Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
1160  return NewDBIterator(
1161  this, user_comparator(), iter,
1162  (options.snapshot != NULL
1163  ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
1164  : latest_snapshot),
1165  seed);
1166 }
Iterator * NewDBIterator(DBImpl *db, const Comparator *user_key_comparator, Iterator *internal_iter, SequenceNumber sequence, uint32_t seed)
Definition: db_iter.cc:308
uint64_t SequenceNumber
Definition: dbformat.h:63
const Comparator * user_comparator() const
Definition: db_impl.h:197
Iterator * NewInternalIterator(const ReadOptions &, SequenceNumber *latest_snapshot, uint32_t *seed)
Definition: db_impl.cc:1067
Here is the call graph for this function:

§ OpenCompactionOutputFile()

Status leveldb::DBImpl::OpenCompactionOutputFile ( CompactionState compact)
private

Definition at line 788 of file db_impl.cc.

788  {
789  assert(compact != NULL);
790  assert(compact->builder == NULL);
791  uint64_t file_number;
792  {
793  mutex_.Lock();
794  file_number = versions_->NewFileNumber();
795  pending_outputs_.insert(file_number);
796  CompactionState::Output out;
797  out.number = file_number;
798  out.smallest.Clear();
799  out.largest.Clear();
800  compact->outputs.push_back(out);
801  mutex_.Unlock();
802  }
803 
804  // Make the output file
805  std::string fname = TableFileName(dbname_, file_number);
806  Status s = env_->NewWritableFile(fname, &compact->outfile);
807  if (s.ok()) {
808  compact->builder = new TableBuilder(options_, compact->outfile);
809  }
810  return s;
811 }
VersionSet * versions_
Definition: db_impl.h:171
Env *const env_
Definition: db_impl.h:122
port::Mutex mutex_
Definition: db_impl.h:137
virtual Status NewWritableFile(const std::string &fname, WritableFile **result)=0
std::string TableFileName(const std::string &name, uint64_t number)
Definition: filename.cc:32
uint64_t NewFileNumber()
Definition: version_set.h:191
const std::string dbname_
Definition: db_impl.h:128
const Options options_
Definition: db_impl.h:125
std::set< uint64_t > pending_outputs_
Definition: db_impl.h:156
Here is the call graph for this function:
Here is the caller graph for this function:

§ operator=()

void leveldb::DBImpl::operator= ( const DBImpl )
private

§ Put()

Status leveldb::DBImpl::Put ( const WriteOptions o,
const Slice key,
const Slice value 
)
virtual

Implements leveldb::DB.

Definition at line 1186 of file db_impl.cc.

1186  {
1187  return DB::Put(o, key, val);
1188 }
virtual Status Put(const WriteOptions &options, const Slice &key, const Slice &value)=0
Definition: db_impl.cc:1476
Here is the call graph for this function:

§ RecordBackgroundError()

void leveldb::DBImpl::RecordBackgroundError ( const Status s)
private

Definition at line 637 of file db_impl.cc.

637  {
638  mutex_.AssertHeld();
639  if (bg_error_.ok()) {
640  bg_error_ = s;
641  bg_cv_.SignalAll();
642  }
643 }
port::CondVar bg_cv_
Definition: db_impl.h:139
port::Mutex mutex_
Definition: db_impl.h:137
bool ok() const
Definition: status.h:52
Status bg_error_
Definition: db_impl.h:174
Here is the call graph for this function:
Here is the caller graph for this function:

§ RecordReadSample()

void leveldb::DBImpl::RecordReadSample ( Slice  key)

Definition at line 1168 of file db_impl.cc.

1168  {
1169  MutexLock l(&mutex_);
1170  if (versions_->current()->RecordReadSample(key)) {
1172  }
1173 }
VersionSet * versions_
Definition: db_impl.h:171
Version * current() const
Definition: version_set.h:185
port::Mutex mutex_
Definition: db_impl.h:137
bool RecordReadSample(Slice key)
Definition: version_set.cc:444
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:645
Here is the call graph for this function:

§ Recover()

Status leveldb::DBImpl::Recover ( VersionEdit edit,
bool *  save_manifest 
)
private

Definition at line 274 of file db_impl.cc.

274  {
275  mutex_.AssertHeld();
276 
277  // Ignore error from CreateDir since the creation of the DB is
278  // committed only when the descriptor is created, and this directory
279  // may already exist from a previous failed creation attempt.
281  assert(db_lock_ == NULL);
282  Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
283  if (!s.ok()) {
284  return s;
285  }
286 
289  s = NewDB();
290  if (!s.ok()) {
291  return s;
292  }
293  } else {
295  dbname_, "does not exist (create_if_missing is false)");
296  }
297  } else {
300  dbname_, "exists (error_if_exists is true)");
301  }
302  }
303 
304  s = versions_->Recover(save_manifest);
305  if (!s.ok()) {
306  return s;
307  }
308  SequenceNumber max_sequence(0);
309 
310  // Recover from all newer log files than the ones named in the
311  // descriptor (new log files may have been added by the previous
312  // incarnation without registering them in the descriptor).
313  //
314  // Note that PrevLogNumber() is no longer used, but we pay
315  // attention to it in case we are recovering a database
316  // produced by an older version of leveldb.
317  const uint64_t min_log = versions_->LogNumber();
318  const uint64_t prev_log = versions_->PrevLogNumber();
319  std::vector<std::string> filenames;
320  s = env_->GetChildren(dbname_, &filenames);
321  if (!s.ok()) {
322  return s;
323  }
324  std::set<uint64_t> expected;
325  versions_->AddLiveFiles(&expected);
326  uint64_t number;
327  FileType type;
328  std::vector<uint64_t> logs;
329  for (size_t i = 0; i < filenames.size(); i++) {
330  if (ParseFileName(filenames[i], &number, &type)) {
331  expected.erase(number);
332  if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
333  logs.push_back(number);
334  }
335  }
336  if (!expected.empty()) {
337  char buf[50];
338  snprintf(buf, sizeof(buf), "%d missing files; e.g.",
339  static_cast<int>(expected.size()));
340  return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
341  }
342 
343  // Recover in the order in which the logs were generated
344  std::sort(logs.begin(), logs.end());
345  for (size_t i = 0; i < logs.size(); i++) {
346  s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
347  &max_sequence);
348  if (!s.ok()) {
349  return s;
350  }
351 
352  // The previous incarnation may not have written any MANIFEST
353  // records after allocating this log number. So we manually
354  // update the file number allocation counter in VersionSet.
355  versions_->MarkFileNumberUsed(logs[i]);
356  }
357 
358  if (versions_->LastSequence() < max_sequence) {
359  versions_->SetLastSequence(max_sequence);
360  }
361 
362  return Status::OK();
363 }
VersionSet * versions_
Definition: db_impl.h:171
Env *const env_
Definition: db_impl.h:122
Status RecoverLogFile(uint64_t log_number, bool last_log, bool *save_manifest, VersionEdit *edit, SequenceNumber *max_sequence) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:365
uint64_t PrevLogNumber() const
Definition: version_set.h:225
bool ParseFileName(const std::string &fname, uint64_t *number, FileType *type)
Definition: filename.cc:80
FileLock * db_lock_
Definition: db_impl.h:134
port::Mutex mutex_
Definition: db_impl.h:137
uint64_t SequenceNumber
Definition: dbformat.h:63
std::string TableFileName(const std::string &name, uint64_t number)
Definition: filename.cc:32
Status NewDB()
Definition: db_impl.cc:177
static Status Corruption(const Slice &msg, const Slice &msg2=Slice())
Definition: status.h:38
virtual Status CreateDir(const std::string &dirname)=0
static Status OK()
Definition: status.h:32
void MarkFileNumberUsed(uint64_t number)
std::string CurrentFileName(const std::string &dbname)
Definition: filename.cc:50
virtual Status LockFile(const std::string &fname, FileLock **lock)=0
uint64_t LogNumber() const
Definition: version_set.h:221
bool create_if_missing
Definition: options.h:45
const std::string dbname_
Definition: db_impl.h:128
virtual bool FileExists(const std::string &fname)=0
Status Recover(bool *save_manifest)
Definition: version_set.cc:905
uint64_t LastSequence() const
Definition: version_set.h:209
const Options options_
Definition: db_impl.h:125
static Status InvalidArgument(const Slice &msg, const Slice &msg2=Slice())
Definition: status.h:44
bool error_if_exists
Definition: options.h:49
void SetLastSequence(uint64_t s)
Definition: version_set.h:212
FileType
Definition: filename.h:20
void AddLiveFiles(std::set< uint64_t > *live)
virtual Status GetChildren(const std::string &dir, std::vector< std::string > *result)=0
std::string LockFileName(const std::string &dbname)
Definition: filename.cc:54
Here is the call graph for this function:
Here is the caller graph for this function:

§ RecoverLogFile()

Status leveldb::DBImpl::RecoverLogFile ( uint64_t  log_number,
bool  last_log,
bool *  save_manifest,
VersionEdit edit,
SequenceNumber max_sequence 
)
private

Definition at line 365 of file db_impl.cc.

367  {
368  struct LogReporter : public log::Reader::Reporter {
369  Env* env;
370  Logger* info_log;
371  const char* fname;
372  Status* status; // NULL if options_.paranoid_checks==false
373  virtual void Corruption(size_t bytes, const Status& s) {
374  Log(info_log, "%s%s: dropping %d bytes; %s",
375  (this->status == NULL ? "(ignoring error) " : ""),
376  fname, static_cast<int>(bytes), s.ToString().c_str());
377  if (this->status != NULL && this->status->ok()) *this->status = s;
378  }
379  };
380 
381  mutex_.AssertHeld();
382 
383  // Open the log file
384  std::string fname = LogFileName(dbname_, log_number);
385  SequentialFile* file;
386  Status status = env_->NewSequentialFile(fname, &file);
387  if (!status.ok()) {
388  MaybeIgnoreError(&status);
389  return status;
390  }
391 
392  // Create the log reader.
393  LogReporter reporter;
394  reporter.env = env_;
395  reporter.info_log = options_.info_log;
396  reporter.fname = fname.c_str();
397  reporter.status = (options_.paranoid_checks ? &status : NULL);
398  // We intentionally make log::Reader do checksumming even if
399  // paranoid_checks==false so that corruptions cause entire commits
400  // to be skipped instead of propagating bad information (like overly
401  // large sequence numbers).
402  log::Reader reader(file, &reporter, true/*checksum*/,
403  0/*initial_offset*/);
404  Log(options_.info_log, "Recovering log #%llu",
405  (unsigned long long) log_number);
406 
407  // Read all the records and add to a memtable
408  std::string scratch;
409  Slice record;
410  WriteBatch batch;
411  int compactions = 0;
412  MemTable* mem = NULL;
413  while (reader.ReadRecord(&record, &scratch) &&
414  status.ok()) {
415  if (record.size() < 12) {
416  reporter.Corruption(
417  record.size(), Status::Corruption("log record too small"));
418  continue;
419  }
420  WriteBatchInternal::SetContents(&batch, record);
421 
422  if (mem == NULL) {
423  mem = new MemTable(internal_comparator_);
424  mem->Ref();
425  }
426  status = WriteBatchInternal::InsertInto(&batch, mem);
427  MaybeIgnoreError(&status);
428  if (!status.ok()) {
429  break;
430  }
431  const SequenceNumber last_seq =
433  WriteBatchInternal::Count(&batch) - 1;
434  if (last_seq > *max_sequence) {
435  *max_sequence = last_seq;
436  }
437 
438  if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
439  compactions++;
440  *save_manifest = true;
441  status = WriteLevel0Table(mem, edit, NULL);
442  mem->Unref();
443  mem = NULL;
444  if (!status.ok()) {
445  // Reflect errors immediately so that conditions like full
446  // file-systems cause the DB::Open() to fail.
447  break;
448  }
449  }
450  }
451 
452  delete file;
453 
454  // See if we should keep reusing the last log file.
455  if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
456  assert(logfile_ == NULL);
457  assert(log_ == NULL);
458  assert(mem_ == NULL);
459  uint64_t lfile_size;
460  if (env_->GetFileSize(fname, &lfile_size).ok() &&
461  env_->NewAppendableFile(fname, &logfile_).ok()) {
462  Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
463  log_ = new log::Writer(logfile_, lfile_size);
464  logfile_number_ = log_number;
465  if (mem != NULL) {
466  mem_ = mem;
467  mem = NULL;
468  } else {
469  // mem can be NULL if lognum exists but was empty.
470  mem_ = new MemTable(internal_comparator_);
471  mem_->Ref();
472  }
473  }
474  }
475 
476  if (mem != NULL) {
477  // mem did not get reused; compact it.
478  if (status.ok()) {
479  *save_manifest = true;
480  status = WriteLevel0Table(mem, edit, NULL);
481  }
482  mem->Unref();
483  }
484 
485  return status;
486 }
Env *const env_
Definition: db_impl.h:122
size_t write_buffer_size
Definition: options.h:83
const InternalKeyComparator internal_comparator_
Definition: db_impl.h:123
WritableFile * logfile_
Definition: db_impl.h:143
static Status InsertInto(const WriteBatch *batch, MemTable *memtable)
Definition: write_batch.cc:128
void MaybeIgnoreError(Status *s) const
Definition: db_impl.cc:209
void Log(Logger *info_log, const char *format,...)
Definition: env.cc:31
port::Mutex mutex_
Definition: db_impl.h:137
virtual Status GetFileSize(const std::string &fname, uint64_t *file_size)=0
uint64_t SequenceNumber
Definition: dbformat.h:63
static Status Corruption(const Slice &msg, const Slice &msg2=Slice())
Definition: status.h:38
MemTable * mem_
Definition: db_impl.h:140
static void SetContents(WriteBatch *batch, const Slice &contents)
Definition: write_batch.cc:136
const std::string dbname_
Definition: db_impl.h:128
log::Writer * log_
Definition: db_impl.h:145
Logger * info_log
Definition: options.h:68
virtual Status NewAppendableFile(const std::string &fname, WritableFile **result)
Definition: env.cc:12
static int Count(const WriteBatch *batch)
Definition: write_batch.cc:82
uint64_t logfile_number_
Definition: db_impl.h:144
bool paranoid_checks
Definition: options.h:57
const Options options_
Definition: db_impl.h:125
bool ok() const
Definition: status.h:52
virtual Status NewSequentialFile(const std::string &fname, SequentialFile **result)=0
std::string LogFileName(const std::string &name, uint64_t number)
Definition: filename.cc:27
static SequenceNumber Sequence(const WriteBatch *batch)
Definition: write_batch.cc:90
Status WriteLevel0Table(MemTable *mem, VersionEdit *edit, Version *base) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:488
Here is the call graph for this function:
Here is the caller graph for this function:

§ ReleaseSnapshot()

void leveldb::DBImpl::ReleaseSnapshot ( const Snapshot snapshot)
virtual

Implements leveldb::DB.

Definition at line 1180 of file db_impl.cc.

1180  {
1181  MutexLock l(&mutex_);
1182  snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
1183 }
port::Mutex mutex_
Definition: db_impl.h:137
void Delete(const SnapshotImpl *s)
Definition: snapshot.h:53
SnapshotList snapshots_
Definition: db_impl.h:152
Here is the call graph for this function:

§ TEST_CompactMemTable()

Status leveldb::DBImpl::TEST_CompactMemTable ( )

Definition at line 621 of file db_impl.cc.

621  {
622  // NULL batch means just wait for earlier writes to be done
623  Status s = Write(WriteOptions(), NULL);
624  if (s.ok()) {
625  // Wait until the compaction completes
626  MutexLock l(&mutex_);
627  while (imm_ != NULL && bg_error_.ok()) {
628  bg_cv_.Wait();
629  }
630  if (imm_ != NULL) {
631  s = bg_error_;
632  }
633  }
634  return s;
635 }
port::CondVar bg_cv_
Definition: db_impl.h:139
MemTable * imm_
Definition: db_impl.h:141
port::Mutex mutex_
Definition: db_impl.h:137
bool ok() const
Definition: status.h:52
virtual Status Write(const WriteOptions &options, WriteBatch *updates)
Definition: db_impl.cc:1194
Status bg_error_
Definition: db_impl.h:174
Here is the call graph for this function:
Here is the caller graph for this function:

§ TEST_CompactRange()

void leveldb::DBImpl::TEST_CompactRange ( int  level,
const Slice begin,
const Slice end 
)

Definition at line 584 of file db_impl.cc.

584  {
585  assert(level >= 0);
586  assert(level + 1 < config::kNumLevels);
587 
588  InternalKey begin_storage, end_storage;
589 
590  ManualCompaction manual;
591  manual.level = level;
592  manual.done = false;
593  if (begin == NULL) {
594  manual.begin = NULL;
595  } else {
596  begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
597  manual.begin = &begin_storage;
598  }
599  if (end == NULL) {
600  manual.end = NULL;
601  } else {
602  end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
603  manual.end = &end_storage;
604  }
605 
606  MutexLock l(&mutex_);
607  while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
608  if (manual_compaction_ == NULL) { // Idle
609  manual_compaction_ = &manual;
611  } else { // Running either my compaction or another compaction.
612  bg_cv_.Wait();
613  }
614  }
615  if (manual_compaction_ == &manual) {
616  // Cancel my manual compaction since we aborted early for some reason.
617  manual_compaction_ = NULL;
618  }
619 }
static const SequenceNumber kMaxSequenceNumber
Definition: dbformat.h:67
static const int kNumLevels
Definition: dbformat.h:22
port::CondVar bg_cv_
Definition: db_impl.h:139
port::Mutex mutex_
Definition: db_impl.h:137
static const ValueType kValueTypeForSeek
Definition: dbformat.h:61
ManualCompaction * manual_compaction_
Definition: db_impl.h:169
bool ok() const
Definition: status.h:52
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:645
Status bg_error_
Definition: db_impl.h:174
port::AtomicPointer shutting_down_
Definition: db_impl.h:138
Here is the call graph for this function:
Here is the caller graph for this function:

§ TEST_MaxNextLevelOverlappingBytes()

int64_t leveldb::DBImpl::TEST_MaxNextLevelOverlappingBytes ( )

Definition at line 1104 of file db_impl.cc.

1104  {
1105  MutexLock l(&mutex_);
1107 }
VersionSet * versions_
Definition: db_impl.h:171
port::Mutex mutex_
Definition: db_impl.h:137
int64_t MaxNextLevelOverlappingBytes()
Here is the call graph for this function:

§ TEST_NewInternalIterator()

Iterator * leveldb::DBImpl::TEST_NewInternalIterator ( )

Definition at line 1098 of file db_impl.cc.

1098  {
1099  SequenceNumber ignored;
1100  uint32_t ignored_seed;
1101  return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
1102 }
uint64_t SequenceNumber
Definition: dbformat.h:63
Iterator * NewInternalIterator(const ReadOptions &, SequenceNumber *latest_snapshot, uint32_t *seed)
Definition: db_impl.cc:1067
Here is the call graph for this function:

§ user_comparator()

const Comparator* leveldb::DBImpl::user_comparator ( ) const
inlineprivate

Definition at line 197 of file db_impl.h.

197  {
199  }
const InternalKeyComparator internal_comparator_
Definition: db_impl.h:123
const Comparator * user_comparator() const
Definition: dbformat.h:125
Here is the call graph for this function:
Here is the caller graph for this function:

§ Write()

Status leveldb::DBImpl::Write ( const WriteOptions options,
WriteBatch updates 
)
virtual

Implements leveldb::DB.

Definition at line 1194 of file db_impl.cc.

1194  {
1195  Writer w(&mutex_);
1196  w.batch = my_batch;
1197  w.sync = options.sync;
1198  w.done = false;
1199 
1200  MutexLock l(&mutex_);
1201  writers_.push_back(&w);
1202  while (!w.done && &w != writers_.front()) {
1203  w.cv.Wait();
1204  }
1205  if (w.done) {
1206  return w.status;
1207  }
1208 
1209  // May temporarily unlock and wait.
1210  Status status = MakeRoomForWrite(my_batch == NULL);
1211  uint64_t last_sequence = versions_->LastSequence();
1212  Writer* last_writer = &w;
1213  if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
1214  WriteBatch* updates = BuildBatchGroup(&last_writer);
1215  WriteBatchInternal::SetSequence(updates, last_sequence + 1);
1216  last_sequence += WriteBatchInternal::Count(updates);
1217 
1218  // Add to log and apply to memtable. We can release the lock
1219  // during this phase since &w is currently responsible for logging
1220  // and protects against concurrent loggers and concurrent writes
1221  // into mem_.
1222  {
1223  mutex_.Unlock();
1224  status = log_->AddRecord(WriteBatchInternal::Contents(updates));
1225  bool sync_error = false;
1226  if (status.ok() && options.sync) {
1227  status = logfile_->Sync();
1228  if (!status.ok()) {
1229  sync_error = true;
1230  }
1231  }
1232  if (status.ok()) {
1233  status = WriteBatchInternal::InsertInto(updates, mem_);
1234  }
1235  mutex_.Lock();
1236  if (sync_error) {
1237  // The state of the log file is indeterminate: the log record we
1238  // just added may or may not show up when the DB is re-opened.
1239  // So we force the DB into a mode where all future writes fail.
1240  RecordBackgroundError(status);
1241  }
1242  }
1243  if (updates == tmp_batch_) tmp_batch_->Clear();
1244 
1245  versions_->SetLastSequence(last_sequence);
1246  }
1247 
1248  while (true) {
1249  Writer* ready = writers_.front();
1250  writers_.pop_front();
1251  if (ready != &w) {
1252  ready->status = status;
1253  ready->done = true;
1254  ready->cv.Signal();
1255  }
1256  if (ready == last_writer) break;
1257  }
1258 
1259  // Notify new head of write queue
1260  if (!writers_.empty()) {
1261  writers_.front()->cv.Signal();
1262  }
1263 
1264  return status;
1265 }
VersionSet * versions_
Definition: db_impl.h:171
void RecordBackgroundError(const Status &s)
Definition: db_impl.cc:637
WritableFile * logfile_
Definition: db_impl.h:143
static void SetSequence(WriteBatch *batch, SequenceNumber seq)
Definition: write_batch.cc:94
static Status InsertInto(const WriteBatch *batch, MemTable *memtable)
Definition: write_batch.cc:128
port::Mutex mutex_
Definition: db_impl.h:137
WriteBatch * tmp_batch_
Definition: db_impl.h:150
MemTable * mem_
Definition: db_impl.h:140
WriteBatch * BuildBatchGroup(Writer **last_writer)
Definition: db_impl.cc:1269
log::Writer * log_
Definition: db_impl.h:145
uint64_t LastSequence() const
Definition: version_set.h:209
static int Count(const WriteBatch *batch)
Definition: write_batch.cc:82
Status MakeRoomForWrite(bool force) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Definition: db_impl.cc:1318
Status AddRecord(const Slice &slice)
Definition: log_writer.cc:36
static Slice Contents(const WriteBatch *batch)
void SetLastSequence(uint64_t s)
Definition: version_set.h:212
virtual Status Sync()=0
std::deque< Writer * > writers_
Definition: db_impl.h:149
Here is the call graph for this function:
Here is the caller graph for this function:

§ WriteLevel0Table()

Status leveldb::DBImpl::WriteLevel0Table ( MemTable mem,
VersionEdit edit,
Version base 
)
private

Definition at line 488 of file db_impl.cc.

489  {
490  mutex_.AssertHeld();
491  const uint64_t start_micros = env_->NowMicros();
492  FileMetaData meta;
493  meta.number = versions_->NewFileNumber();
494  pending_outputs_.insert(meta.number);
495  Iterator* iter = mem->NewIterator();
496  Log(options_.info_log, "Level-0 table #%llu: started",
497  (unsigned long long) meta.number);
498 
499  Status s;
500  {
501  mutex_.Unlock();
502  s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
503  mutex_.Lock();
504  }
505 
506  Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
507  (unsigned long long) meta.number,
508  (unsigned long long) meta.file_size,
509  s.ToString().c_str());
510  delete iter;
511  pending_outputs_.erase(meta.number);
512 
513 
514  // Note that if file_size is zero, the file has been deleted and
515  // should not be added to the manifest.
516  int level = 0;
517  if (s.ok() && meta.file_size > 0) {
518  const Slice min_user_key = meta.smallest.user_key();
519  const Slice max_user_key = meta.largest.user_key();
520  if (base != NULL) {
521  level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
522  }
523  edit->AddFile(level, meta.number, meta.file_size,
524  meta.smallest, meta.largest);
525  }
526 
527  CompactionStats stats;
528  stats.micros = env_->NowMicros() - start_micros;
529  stats.bytes_written = meta.file_size;
530  stats_[level].Add(stats);
531  return s;
532 }
CompactionStats stats_[config::kNumLevels]
Definition: db_impl.h:191
VersionSet * versions_
Definition: db_impl.h:171
Env *const env_
Definition: db_impl.h:122
void Add(const CompactionStats &c)
Definition: db_impl.h:185
TableCache * table_cache_
Definition: db_impl.h:131
void Log(Logger *info_log, const char *format,...)
Definition: env.cc:31
port::Mutex mutex_
Definition: db_impl.h:137
uint64_t NewFileNumber()
Definition: version_set.h:191
const std::string dbname_
Definition: db_impl.h:128
Logger * info_log
Definition: options.h:68
const Options options_
Definition: db_impl.h:125
virtual uint64_t NowMicros()=0
Status BuildTable(const std::string &dbname, Env *env, const Options &options, TableCache *table_cache, Iterator *iter, FileMetaData *meta)
Definition: builder.cc:17
std::set< uint64_t > pending_outputs_
Definition: db_impl.h:156
Here is the call graph for this function:
Here is the caller graph for this function:

Friends And Related Function Documentation

§ DB

friend class DB
friend

Definition at line 68 of file db_impl.h.

Member Data Documentation

§ bg_compaction_scheduled_

bool leveldb::DBImpl::bg_compaction_scheduled_
private

Definition at line 159 of file db_impl.h.

§ bg_cv_

port::CondVar leveldb::DBImpl::bg_cv_
private

Definition at line 139 of file db_impl.h.

§ bg_error_

Status leveldb::DBImpl::bg_error_
private

Definition at line 174 of file db_impl.h.

§ db_lock_

FileLock* leveldb::DBImpl::db_lock_
private

Definition at line 134 of file db_impl.h.

§ dbname_

const std::string leveldb::DBImpl::dbname_
private

Definition at line 128 of file db_impl.h.

§ env_

Env* const leveldb::DBImpl::env_
private

Definition at line 122 of file db_impl.h.

§ has_imm_

port::AtomicPointer leveldb::DBImpl::has_imm_
private

Definition at line 142 of file db_impl.h.

§ imm_

MemTable* leveldb::DBImpl::imm_
private

Definition at line 141 of file db_impl.h.

§ internal_comparator_

const InternalKeyComparator leveldb::DBImpl::internal_comparator_
private

Definition at line 123 of file db_impl.h.

§ internal_filter_policy_

const InternalFilterPolicy leveldb::DBImpl::internal_filter_policy_
private

Definition at line 124 of file db_impl.h.

§ log_

log::Writer* leveldb::DBImpl::log_
private

Definition at line 145 of file db_impl.h.

§ logfile_

WritableFile* leveldb::DBImpl::logfile_
private

Definition at line 143 of file db_impl.h.

§ logfile_number_

uint64_t leveldb::DBImpl::logfile_number_
private

Definition at line 144 of file db_impl.h.

§ manual_compaction_

ManualCompaction* leveldb::DBImpl::manual_compaction_
private

Definition at line 169 of file db_impl.h.

§ mem_

MemTable* leveldb::DBImpl::mem_
private

Definition at line 140 of file db_impl.h.

§ mutex_

port::Mutex leveldb::DBImpl::mutex_
private

Definition at line 137 of file db_impl.h.

§ options_

const Options leveldb::DBImpl::options_
private

Definition at line 125 of file db_impl.h.

§ owns_cache_

bool leveldb::DBImpl::owns_cache_
private

Definition at line 127 of file db_impl.h.

§ owns_info_log_

bool leveldb::DBImpl::owns_info_log_
private

Definition at line 126 of file db_impl.h.

§ pending_outputs_

std::set<uint64_t> leveldb::DBImpl::pending_outputs_
private

Definition at line 156 of file db_impl.h.

§ seed_

uint32_t leveldb::DBImpl::seed_
private

Definition at line 146 of file db_impl.h.

§ shutting_down_

port::AtomicPointer leveldb::DBImpl::shutting_down_
private

Definition at line 138 of file db_impl.h.

§ snapshots_

SnapshotList leveldb::DBImpl::snapshots_
private

Definition at line 152 of file db_impl.h.

§ stats_

CompactionStats leveldb::DBImpl::stats_[config::kNumLevels]
private

Definition at line 191 of file db_impl.h.

§ table_cache_

TableCache* leveldb::DBImpl::table_cache_
private

Definition at line 131 of file db_impl.h.

§ tmp_batch_

WriteBatch* leveldb::DBImpl::tmp_batch_
private

Definition at line 150 of file db_impl.h.

§ versions_

VersionSet* leveldb::DBImpl::versions_
private

Definition at line 171 of file db_impl.h.

§ writers_

std::deque<Writer*> leveldb::DBImpl::writers_
private

Definition at line 149 of file db_impl.h.


The documentation for this class was generated from the following files: