28 #include "port/port.h" 48 explicit Writer(port::Mutex* mu) : cv(mu) { }
85 template <
class T,
class V>
87 if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
88 if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
118 :
env_(raw_options.env),
229 std::vector<std::string> filenames;
233 for (
size_t i = 0; i < filenames.size(); i++) {
247 keep = (live.find(number) != live.end());
252 keep = (live.find(number) != live.end());
267 static_cast<unsigned long long>(number));
295 dbname_,
"does not exist (create_if_missing is false)");
300 dbname_,
"exists (error_if_exists is true)");
319 std::vector<std::string> filenames;
324 std::set<uint64_t> expected;
328 std::vector<uint64_t> logs;
329 for (
size_t i = 0; i < filenames.size(); i++) {
331 expected.erase(number);
332 if (type ==
kLogFile && ((number >= min_log) || (number == prev_log)))
333 logs.push_back(number);
336 if (!expected.empty()) {
338 snprintf(buf,
sizeof(buf),
"%d missing files; e.g.",
339 static_cast<int>(expected.size()));
344 std::sort(logs.begin(), logs.end());
345 for (
size_t i = 0; i < logs.size(); i++) {
346 s =
RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
373 virtual void Corruption(
size_t bytes,
const Status& s) {
374 Log(info_log,
"%s%s: dropping %d bytes; %s",
375 (this->status == NULL ?
"(ignoring error) " :
""),
376 fname, static_cast<int>(bytes), s.
ToString().c_str());
377 if (this->status != NULL && this->status->
ok()) *this->status = s;
393 LogReporter reporter;
396 reporter.fname = fname.c_str();
405 (
unsigned long long) log_number);
413 while (reader.
ReadRecord(&record, &scratch) &&
415 if (record.
size() < 12) {
434 if (last_seq > *max_sequence) {
435 *max_sequence = last_seq;
440 *save_manifest =
true;
457 assert(
log_ == NULL);
458 assert(
mem_ == NULL);
479 *save_manifest =
true;
497 (
unsigned long long) meta.
number);
507 (
unsigned long long) meta.
number,
536 assert(
imm_ != NULL);
568 int max_level_with_files = 1;
574 max_level_with_files = level;
579 for (
int level = 0; level < max_level_with_files; level++) {
591 manual.
level = level;
597 manual.
begin = &begin_storage;
602 end_storage =
InternalKey(*end, 0, static_cast<ValueType>(0));
603 manual.
end = &end_storage;
653 }
else if (
imm_ == NULL &&
700 m->
done = (c == NULL);
705 "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
723 f->smallest, f->largest);
730 static_cast<unsigned long long>(f->number),
732 static_cast<unsigned long long>(f->file_size),
753 "Compaction error: %s", status.
ToString().c_str());
773 if (compact->
builder != NULL) {
778 assert(compact->
outfile == NULL);
781 for (
size_t i = 0; i < compact->
outputs.size(); i++) {
789 assert(compact != NULL);
790 assert(compact->
builder == NULL);
791 uint64_t file_number;
800 compact->
outputs.push_back(out);
815 assert(compact != NULL);
816 assert(compact->
outfile != NULL);
817 assert(compact->
builder != NULL);
820 assert(output_number != 0);
846 if (s.
ok() && current_entries > 0) {
855 "Generated table #%llu@%d: %lld keys, %lld bytes",
856 (
unsigned long long) output_number,
858 (
unsigned long long) current_entries,
859 (
unsigned long long) current_bytes);
878 for (
size_t i = 0; i < compact->
outputs.size(); i++) {
889 int64_t imm_micros = 0;
898 assert(compact->
builder == NULL);
899 assert(compact->
outfile == NULL);
913 std::string current_user_key;
914 bool has_current_user_key =
false;
918 if (
has_imm_.NoBarrier_Load() != NULL) {
942 current_user_key.clear();
943 has_current_user_key =
false;
946 if (!has_current_user_key ||
948 Slice(current_user_key)) != 0) {
951 has_current_user_key =
true;
955 if (last_sequence_for_key <= compact->smallest_snapshot) {
971 last_sequence_for_key = ikey.
sequence;
975 " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, " 976 "%d smallest_snapshot: %d",
985 if (compact->
builder == NULL) {
1013 if (status.
ok() && compact->
builder != NULL) {
1017 status = input->
status();
1024 for (
int which = 0; which < 2; which++) {
1029 for (
size_t i = 0; i < compact->
outputs.size(); i++) {
1062 state->
mu->Unlock();
1070 IterState* cleanup =
new IterState;
1075 std::vector<Iterator*> list;
1088 cleanup->mem =
mem_;
1089 cleanup->imm =
imm_;
1095 return internal_iter;
1100 uint32_t ignored_seed;
1111 std::string* value) {
1125 if (imm != NULL) imm->
Ref();
1128 bool have_stat_update =
false;
1136 if (mem->
Get(lkey, value, &s)) {
1138 }
else if (imm != NULL && imm->
Get(lkey, value, &s)) {
1141 s = current->
Get(options, lkey, value, &stats);
1142 have_stat_update =
true;
1147 if (have_stat_update && current->
UpdateStats(stats)) {
1151 if (imm != NULL) imm->
Unref();
1163 ? reinterpret_cast<const SnapshotImpl*>(options.
snapshot)->number_
1212 Writer* last_writer = &w;
1213 if (status.
ok() && my_batch != NULL) {
1225 bool sync_error =
false;
1226 if (status.
ok() && options.
sync) {
1256 if (ready == last_writer)
break;
1273 assert(result != NULL);
1280 size_t max_size = 1 << 20;
1281 if (size <= (128<<10)) {
1282 max_size = size + (128<<10);
1285 *last_writer = first;
1286 std::deque<Writer*>::iterator iter =
writers_.begin();
1288 for (; iter !=
writers_.end(); ++iter) {
1290 if (w->
sync && !first->sync) {
1295 if (w->
batch != NULL) {
1297 if (size > max_size) {
1303 if (result == first->batch) {
1321 bool allow_delay = !force;
1339 allow_delay =
false;
1341 }
else if (!force &&
1345 }
else if (
imm_ != NULL) {
1385 Slice in = property;
1386 Slice prefix(
"leveldb.");
1398 snprintf(buf,
sizeof(buf),
"%d",
1403 }
else if (in ==
"stats") {
1405 snprintf(buf,
sizeof(buf),
1407 "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n" 1408 "--------------------------------------------------\n" 1413 if (
stats_[level].micros > 0 || files > 0) {
1416 "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
1427 }
else if (in ==
"sstables") {
1430 }
else if (in ==
"approximate-memory-usage") {
1439 snprintf(buf,
sizeof(buf),
"%llu",
1440 static_cast<unsigned long long>(total_usage));
1449 const Range* range,
int n,
1459 for (
int i = 0; i < n; i++) {
1465 sizes[i] = (limit >= start ? limit - start : 0);
1478 batch.
Put(key, value);
1479 return Write(opt, &batch);
1485 return Write(opt, &batch);
1498 bool save_manifest =
false;
1500 if (s.
ok() && impl->
mem_ == NULL) {
1515 if (s.
ok() && save_manifest) {
1526 assert(impl->
mem_ != NULL);
1539 std::vector<std::string> filenames;
1542 if (filenames.empty()) {
1552 for (
size_t i = 0; i < filenames.size(); i++) {
1556 if (result.
ok() && !del.
ok()) {
FileMetaData * input(int which, int i) const
CompactionStats stats_[config::kNumLevels]
uint64_t ApproximateOffsetOf(Version *v, const InternalKey &key)
void TEST_CompactRange(int level, const Slice *begin, const Slice *end)
virtual Status status() const =0
const int kNumNonTableCacheFiles
Status RecoverLogFile(uint64_t log_number, bool last_log, bool *save_manifest, VersionEdit *edit, SequenceNumber *max_sequence) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
static const SequenceNumber kMaxSequenceNumber
virtual const Snapshot * GetSnapshot()
Status Recover(VersionEdit *edit, bool *save_manifest) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
virtual Iterator * NewIterator(const ReadOptions &)
void RecordBackgroundError(const Status &s)
uint64_t PrevLogNumber() const
Version * current() const
void SetPrevLogNumber(uint64_t num)
bool ShouldStopBefore(const Slice &internal_key)
static void ClipToRange(T *ptr, V minvalue, V maxvalue)
virtual Status UnlockFile(FileLock *lock)=0
int PickLevelForMemTableOutput(const Slice &smallest_user_key, const Slice &largest_user_key)
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
bool ParseFileName(const std::string &fname, uint64_t *number, FileType *type)
void SetLastSequence(SequenceNumber seq)
static const int kL0_StopWritesTrigger
virtual Status Put(const WriteOptions &, const Slice &key, const Slice &value)
static const int kNumLevels
std::string OldInfoLogFileName(const std::string &dbname)
Status FinishCompactionOutputFile(CompactionState *compact, Iterator *input)
void SetLogNumber(uint64_t num)
void SetComparatorName(const Slice &name)
static size_t ByteSize(const WriteBatch *batch)
virtual Slice key() const =0
static const int kL0_SlowdownWritesTrigger
void Add(const CompactionStats &c)
virtual Slice value() const =0
Compaction *const compaction
void DeleteFile(int level, uint64_t file)
bool UpdateStats(const GetStats &stats)
TableCache * table_cache_
const InternalKeyComparator internal_comparator_
void SetNextFile(uint64_t num)
static void CleanupIteratorState(void *arg1, void *arg2)
Iterator * MakeInputIterator(Compaction *c)
Output * current_output()
static void SetSequence(WriteBatch *batch, SequenceNumber seq)
Status LogAndApply(VersionEdit *edit, port::Mutex *mu) EXCLUSIVE_LOCKS_REQUIRED(mu)
virtual void SleepForMicroseconds(int micros)=0
static Status InsertInto(const WriteBatch *batch, MemTable *memtable)
virtual void SeekToFirst()=0
void MaybeIgnoreError(Status *s) const
void RecordReadSample(Slice key)
void DeleteObsoleteFiles()
void Log(Logger *info_log, const char *format,...)
std::string InfoLogFileName(const std::string &dbname)
uint64_t MaxOutputFileSize() const
std::string ToString() const
std::string DebugString() const
virtual bool GetProperty(const Slice &property, std::string *value)
CompactionState(Compaction *c)
virtual Status GetFileSize(const std::string &fname, uint64_t *file_size)=0
virtual Status NewWritableFile(const std::string &fname, WritableFile **result)=0
Iterator * NewDBIterator(DBImpl *db, const Comparator *user_key_comparator, Iterator *internal_iter, SequenceNumber sequence, uint32_t seed)
bool ParseInternalKey(const Slice &internal_key, ParsedInternalKey *result)
virtual Status Put(const WriteOptions &options, const Slice &key, const Slice &value)=0
Status DestroyDB(const std::string &dbname, const Options &options)
uint64_t ManifestFileNumber() const
int64_t TEST_MaxNextLevelOverlappingBytes()
virtual void GetApproximateSizes(const Range *range, int n, uint64_t *sizes)
std::string TableFileName(const std::string &name, uint64_t number)
virtual void ReleaseSnapshot(const Snapshot *snapshot)
static Status Corruption(const Slice &msg, const Slice &msg2=Slice())
int64_t NumLevelBytes(int level) const
void Evict(uint64_t file_number)
bool OverlapInLevel(int level, const Slice *smallest_user_key, const Slice *largest_user_key)
virtual Status CreateDir(const std::string &dirname)=0
bool ConsumeDecimalNumber(Slice *in, uint64_t *val)
std::string ToString() const
static const ValueType kValueTypeForSeek
SnapshotImpl * oldest() const
bool ReadRecord(Slice *record, std::string *scratch)
std::string DebugString() const
bool NeedsCompaction() const
void AddIterators(const ReadOptions &, std::vector< Iterator *> *iters)
Iterator * TEST_NewInternalIterator()
void Delete(const Slice &key)
virtual Status Delete(const WriteOptions &options, const Slice &key)=0
WriteBatch * BuildBatchGroup(Writer **last_writer)
std::string DescriptorFileName(const std::string &dbname, uint64_t number)
int NumLevelFiles(int level) const
static Status Open(const Options &options, const std::string &name, DB **dbptr)
uint64_t FileSize() const
virtual void CompactRange(const Slice *begin, const Slice *end)
void MarkFileNumberUsed(uint64_t number)
static void SetContents(WriteBatch *batch, const Slice &contents)
virtual size_t TotalCharge() const =0
ManualCompaction * manual_compaction_
void DecodeFrom(const Slice &s)
port::AtomicPointer has_imm_
Status TEST_CompactMemTable()
Status SetCurrentFile(Env *env, const std::string &dbname, uint64_t descriptor_number)
std::string CurrentFileName(const std::string &dbname)
virtual Status Delete(const WriteOptions &, const Slice &key)
bool IsTrivialMove() const
void EncodeTo(std::string *dst) const
void ReuseFileNumber(uint64_t file_number)
static void BGWork(void *db)
virtual Status LockFile(const std::string &fname, FileLock **lock)=0
uint64_t LogNumber() const
virtual Status RenameFile(const std::string &src, const std::string &target)=0
uint64_t NumEntries() const
const std::string dbname_
SequenceNumber smallest_snapshot
virtual bool FileExists(const std::string &fname)=0
Status Recover(bool *save_manifest)
bool IsBaseLevelForKey(const Slice &user_key)
uint64_t LastSequence() const
void Delete(const SnapshotImpl *s)
Status DoCompactionWork(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
void remove_prefix(size_t n)
virtual Status NewAppendableFile(const std::string &fname, WritableFile **result)
Iterator * NewMergingIterator(const Comparator *cmp, Iterator **list, int n)
Iterator * NewIterator(const ReadOptions &options, uint64_t file_number, uint64_t file_size, Table **tableptr=NULL)
bool RecordReadSample(Slice key)
const char * LevelSummary(LevelSummaryStorage *scratch) const
virtual void Schedule(void(*function)(void *arg), void *arg)=0
static int Count(const WriteBatch *batch)
Cache * NewLRUCache(size_t capacity)
Status MakeRoomForWrite(bool force) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
const InternalFilterPolicy internal_filter_policy_
Status OpenCompactionOutputFile(CompactionState *compact)
std::vector< Output > outputs
const Comparator * user_comparator() const
const FilterPolicy * filter_policy
Compaction * CompactRange(int level, const InternalKey *begin, const InternalKey *end)
DBImpl(const Options &options, const std::string &dbname)
const Comparator * comparator
static Status InvalidArgument(const Slice &msg, const Slice &msg2=Slice())
virtual Status NewSequentialFile(const std::string &fname, SequentialFile **result)=0
Compaction * PickCompaction()
std::string LogFileName(const std::string &name, uint64_t number)
virtual Status Write(const WriteOptions &options, WriteBatch *updates)
Status AddRecord(const Slice &slice)
static Slice Contents(const WriteBatch *batch)
Options SanitizeOptions(const std::string &dbname, const InternalKeyComparator *icmp, const InternalFilterPolicy *ipolicy, const Options &src)
size_t ApproximateMemoryUsage()
virtual uint64_t NowMicros()=0
Status BuildTable(const std::string &dbname, Env *env, const Options &options, TableCache *table_cache, Iterator *iter, FileMetaData *meta)
void RegisterCleanup(CleanupFunction function, void *arg1, void *arg2)
bool starts_with(const Slice &x) const
void CleanupCompaction(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
bool Get(const LookupKey &key, std::string *value, Status *s)
void AddFile(int level, uint64_t file, uint64_t file_size, const InternalKey &smallest, const InternalKey &largest)
const Snapshot * snapshot
void SetLastSequence(uint64_t s)
Status InstallCompactionResults(CompactionState *compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
Status Get(const ReadOptions &, const LookupKey &key, std::string *val, GetStats *stats)
const InternalKey * begin
std::set< uint64_t > pending_outputs_
const char * data() const
void AddLiveFiles(std::set< uint64_t > *live)
bool bg_compaction_scheduled_
virtual Status DeleteFile(const std::string &fname)=0
virtual Status DeleteDir(const std::string &dirname)=0
static void Append(WriteBatch *dst, const WriteBatch *src)
static Status IOError(const Slice &msg, const Slice &msg2=Slice())
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
void AddInputDeletions(VersionEdit *edit)
virtual bool Valid() const =0
virtual Status NewLogger(const std::string &fname, Logger **result)=0
void Put(const Slice &key, const Slice &value)
port::AtomicPointer shutting_down_
const SnapshotImpl * New(SequenceNumber seq)
static SequenceNumber Sequence(const WriteBatch *batch)
Status WriteLevel0Table(MemTable *mem, VersionEdit *edit, Version *base) EXCLUSIVE_LOCKS_REQUIRED(mutex_)
virtual Status GetChildren(const std::string &dir, std::vector< std::string > *result)=0
void Add(const Slice &key, const Slice &value)
int64_t MaxNextLevelOverlappingBytes()
std::deque< Writer * > writers_
Iterator * NewInternalIterator(const ReadOptions &, SequenceNumber *latest_snapshot, uint32_t *seed)
int num_input_files(int which) const
std::string LockFileName(const std::string &dbname)
void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
virtual Status Get(const ReadOptions &options, const Slice &key, std::string *value)