14 #include "port/port.h" 135 while (data_.size() < 1048576) {
145 if (pos_ + len > data_.size()) {
147 assert(len < data_.size());
150 return Slice(data_.data() + pos_ - len, len);
157 while (start < s.
size() && isspace(s[start])) {
160 size_t limit = s.
size();
161 while (limit > start && isspace(s[limit-1])) {
164 return Slice(s.
data() + start, limit - start);
169 if (msg.
empty())
return;
173 str->append(msg.
data(), msg.
size());
193 last_op_finish_ = start_;
205 done_ += other.
done_;
212 if (message_.empty()) message_ = other.
message_;
217 seconds_ = (finish_ - start_) * 1e-6;
227 double micros = now - last_op_finish_;
229 if (micros > 20000) {
230 fprintf(stderr,
"long op: %.1f micros%30s\r", micros,
"");
233 last_op_finish_ = now;
237 if (done_ >= next_report_) {
238 if (next_report_ < 1000) next_report_ += 100;
239 else if (next_report_ < 5000) next_report_ += 500;
240 else if (next_report_ < 10000) next_report_ += 1000;
241 else if (next_report_ < 50000) next_report_ += 5000;
242 else if (next_report_ < 100000) next_report_ += 10000;
243 else if (next_report_ < 500000) next_report_ += 50000;
244 else next_report_ += 100000;
245 fprintf(stderr,
"... finished %d ops%30s\r", done_,
"");
257 if (done_ < 1) done_ = 1;
263 double elapsed = (finish_ - start_) * 1e-6;
265 snprintf(rate,
sizeof(rate),
"%6.1f MB/s",
266 (bytes_ / 1048576.0) / elapsed);
271 fprintf(stdout,
"%-12s : %11.3f micros/op;%s%s\n",
273 seconds_ * 1e6 / done_,
274 (extra.empty() ?
"" :
" "),
277 fprintf(stdout,
"Microseconds per op:\n%s\n", hist_.
ToString().c_str());
330 const int kKeySize = 16;
332 fprintf(stdout,
"Keys: %d bytes each\n", kKeySize);
333 fprintf(stdout,
"Values: %d bytes each (%d bytes after compression)\n",
336 fprintf(stdout,
"Entries: %d\n", num_);
337 fprintf(stdout,
"RawSize: %.1f MB (estimated)\n",
340 fprintf(stdout,
"FileSize: %.1f MB (estimated)\n",
344 fprintf(stdout,
"------------------------------------------------\n");
348 #if defined(__GNUC__) && !defined(__OPTIMIZE__) 350 "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n" 355 "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
359 const char text[] =
"yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy";
360 std::string compressed;
361 if (!port::Snappy_Compress(text,
sizeof(text), &compressed)) {
362 fprintf(stdout,
"WARNING: Snappy compression is not enabled\n");
363 }
else if (compressed.size() >=
sizeof(text)) {
364 fprintf(stdout,
"WARNING: Snappy compression is not effective\n");
369 fprintf(stderr,
"LevelDB: version %d.%d\n",
373 time_t now = time(NULL);
374 fprintf(stderr,
"Date: %s", ctime(&now));
376 FILE* cpuinfo = fopen(
"/proc/cpuinfo",
"r");
377 if (cpuinfo != NULL) {
380 std::string cpu_type;
381 std::string cache_size;
382 while (fgets(line,
sizeof(line), cpuinfo) != NULL) {
383 const char* sep = strchr(line,
':');
387 Slice key = TrimSpace(
Slice(line, sep - 1 - line));
389 if (key ==
"model name") {
392 }
else if (key ==
"cache size") {
397 fprintf(stderr,
"CPU: %d * %s\n", num_cpus, cpu_type.c_str());
398 fprintf(stderr,
"CPUCache: %s\n", cache_size.c_str());
412 entries_per_batch_(1),
415 std::vector<std::string> files;
417 for (
size_t i = 0; i < files.size(); i++) {
430 delete filter_policy_;
438 while (benchmarks != NULL) {
439 const char* sep = strchr(benchmarks,
',');
445 name =
Slice(benchmarks, sep - benchmarks);
446 benchmarks = sep + 1;
453 entries_per_batch_ = 1;
456 void (
Benchmark::*method)(ThreadState*) = NULL;
457 bool fresh_db =
false;
460 if (name ==
Slice(
"open")) {
463 if (num_ < 1) num_ = 1;
464 }
else if (name ==
Slice(
"fillseq")) {
467 }
else if (name ==
Slice(
"fillbatch")) {
469 entries_per_batch_ = 1000;
471 }
else if (name ==
Slice(
"fillrandom")) {
474 }
else if (name ==
Slice(
"overwrite")) {
477 }
else if (name ==
Slice(
"fillsync")) {
480 write_options_.
sync =
true;
482 }
else if (name ==
Slice(
"fill100K")) {
485 value_size_ = 100 * 1000;
487 }
else if (name ==
Slice(
"readseq")) {
489 }
else if (name ==
Slice(
"readreverse")) {
491 }
else if (name ==
Slice(
"readrandom")) {
493 }
else if (name ==
Slice(
"readmissing")) {
495 }
else if (name ==
Slice(
"seekrandom")) {
497 }
else if (name ==
Slice(
"readhot")) {
499 }
else if (name ==
Slice(
"readrandomsmall")) {
502 }
else if (name ==
Slice(
"deleteseq")) {
504 }
else if (name ==
Slice(
"deleterandom")) {
506 }
else if (name ==
Slice(
"readwhilewriting")) {
509 }
else if (name ==
Slice(
"compact")) {
511 }
else if (name ==
Slice(
"crc32c")) {
513 }
else if (name ==
Slice(
"acquireload")) {
515 }
else if (name ==
Slice(
"snappycomp")) {
517 }
else if (name ==
Slice(
"snappyuncomp")) {
519 }
else if (name ==
Slice(
"heapprofile")) {
521 }
else if (name ==
Slice(
"stats")) {
522 PrintStats(
"leveldb.stats");
523 }
else if (name ==
Slice(
"sstables")) {
524 PrintStats(
"leveldb.sstables");
526 if (name !=
Slice()) {
527 fprintf(stderr,
"unknown benchmark '%s'\n", name.
ToString().c_str());
533 fprintf(stdout,
"%-12s : skipped (--use_existing_db is true)\n",
544 if (method != NULL) {
545 RunBenchmark(num_threads, name, method);
560 SharedState* shared = arg->
shared;
561 ThreadState* thread = arg->
thread;
564 shared->num_initialized++;
565 if (shared->num_initialized >= shared->total) {
566 shared->cv.SignalAll();
568 while (!shared->start) {
573 thread->stats.Start();
575 thread->stats.Stop();
580 if (shared->num_done >= shared->total) {
581 shared->cv.SignalAll();
587 void (
Benchmark::*method)(ThreadState*)) {
590 shared.num_initialized = 0;
592 shared.start =
false;
595 for (
int i = 0; i < n; i++) {
599 arg[i].
thread =
new ThreadState(i);
600 arg[i].
thread->shared = &shared;
605 while (shared.num_initialized < n) {
610 shared.cv.SignalAll();
611 while (shared.num_done < n) {
616 for (
int i = 1; i < n; i++) {
617 arg[0].
thread->stats.Merge(arg[i].thread->stats);
619 arg[0].
thread->stats.Report(name);
621 for (
int i = 0; i < n; i++) {
629 const int size = 4096;
630 const char* label =
"(4K per op)";
631 std::string data(size,
'x');
634 while (bytes < 500 * 1048576) {
636 thread->stats.FinishedSingleOp();
640 fprintf(stderr,
"... crc=0x%x\r", static_cast<unsigned int>(crc));
642 thread->stats.AddBytes(bytes);
643 thread->stats.AddMessage(label);
648 port::AtomicPointer ap(&dummy);
651 thread->stats.AddMessage(
"(each op is 1000 loads)");
652 while (count < 100000) {
653 for (
int i = 0; i < 1000; i++) {
654 ptr = ap.Acquire_Load();
657 thread->stats.FinishedSingleOp();
659 if (ptr == NULL) exit(1);
666 int64_t produced = 0;
668 std::string compressed;
669 while (ok && bytes < 1024 * 1048576) {
670 ok = port::Snappy_Compress(input.
data(), input.
size(), &compressed);
671 produced += compressed.
size();
672 bytes += input.
size();
673 thread->stats.FinishedSingleOp();
677 thread->stats.AddMessage(
"(snappy failure)");
680 snprintf(buf,
sizeof(buf),
"(output: %.1f%%)",
681 (produced * 100.0) / bytes);
682 thread->stats.AddMessage(buf);
683 thread->stats.AddBytes(bytes);
690 std::string compressed;
691 bool ok = port::Snappy_Compress(input.
data(), input.
size(), &compressed);
693 char* uncompressed =
new char[input.
size()];
694 while (ok && bytes < 1024 * 1048576) {
695 ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
697 bytes += input.
size();
698 thread->stats.FinishedSingleOp();
700 delete[] uncompressed;
703 thread->stats.AddMessage(
"(snappy failure)");
705 thread->stats.AddBytes(bytes);
723 fprintf(stderr,
"open error: %s\n", s.
ToString().c_str());
729 for (
int i = 0; i < num_; i++) {
732 thread->stats.FinishedSingleOp();
737 DoWrite(thread,
true);
741 DoWrite(thread,
false);
747 snprintf(msg,
sizeof(msg),
"(%d ops)", num_);
748 thread->stats.AddMessage(msg);
755 for (
int i = 0; i < num_; i += entries_per_batch_) {
757 for (
int j = 0; j < entries_per_batch_; j++) {
758 const int k = seq ? i+j : (thread->rand.Next() %
FLAGS_num);
760 snprintf(key,
sizeof(key),
"%016d", k);
761 batch.
Put(key, gen.Generate(value_size_));
762 bytes += value_size_ + strlen(key);
763 thread->stats.FinishedSingleOp();
765 s = db_->
Write(write_options_, &batch);
767 fprintf(stderr,
"put error: %s\n", s.
ToString().c_str());
771 thread->stats.AddBytes(bytes);
780 thread->stats.FinishedSingleOp();
784 thread->stats.AddBytes(bytes);
793 thread->stats.FinishedSingleOp();
797 thread->stats.AddBytes(bytes);
804 for (
int i = 0; i < reads_; i++) {
806 const int k = thread->rand.Next() %
FLAGS_num;
807 snprintf(key,
sizeof(key),
"%016d", k);
808 if (db_->
Get(options, key, &value).
ok()) {
811 thread->stats.FinishedSingleOp();
814 snprintf(msg,
sizeof(msg),
"(%d of %d found)", found, num_);
815 thread->stats.AddMessage(msg);
821 for (
int i = 0; i < reads_; i++) {
823 const int k = thread->rand.Next() %
FLAGS_num;
824 snprintf(key,
sizeof(key),
"%016d.", k);
825 db_->
Get(options, key, &value);
826 thread->stats.FinishedSingleOp();
833 const int range = (
FLAGS_num + 99) / 100;
834 for (
int i = 0; i < reads_; i++) {
836 const int k = thread->rand.Next() % range;
837 snprintf(key,
sizeof(key),
"%016d", k);
838 db_->
Get(options, key, &value);
839 thread->stats.FinishedSingleOp();
846 for (
int i = 0; i < reads_; i++) {
850 snprintf(key,
sizeof(key),
"%016d", k);
852 if (iter->
Valid() && iter->
key() == key) found++;
854 thread->stats.FinishedSingleOp();
857 snprintf(msg,
sizeof(msg),
"(%d of %d found)", found, num_);
858 thread->stats.AddMessage(msg);
865 for (
int i = 0; i < num_; i += entries_per_batch_) {
867 for (
int j = 0; j < entries_per_batch_; j++) {
868 const int k = seq ? i+j : (thread->rand.Next() %
FLAGS_num);
870 snprintf(key,
sizeof(key),
"%016d", k);
872 thread->stats.FinishedSingleOp();
874 s = db_->
Write(write_options_, &batch);
876 fprintf(stderr,
"del error: %s\n", s.
ToString().c_str());
883 DoDelete(thread,
true);
887 DoDelete(thread,
false);
891 if (thread->tid > 0) {
899 if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
905 const int k = thread->rand.Next() %
FLAGS_num;
907 snprintf(key,
sizeof(key),
"%016d", k);
908 Status s = db_->
Put(write_options_, key, gen.Generate(value_size_));
910 fprintf(stderr,
"put error: %s\n", s.
ToString().c_str());
916 thread->stats.Start();
929 fprintf(stdout,
"\n%s\n", stats.c_str());
938 snprintf(fname,
sizeof(fname),
"%s/heap-%04d",
FLAGS_db, ++heap_counter_);
942 fprintf(stderr,
"%s\n", s.
ToString().c_str());
945 bool ok = port::GetHeapProfile(WriteToFile, file);
948 fprintf(stderr,
"heap profiling not supported\n");
956 int main(
int argc,
char** argv) {
961 std::string default_db_path;
963 for (
int i = 1; i < argc; i++) {
969 }
else if (sscanf(argv[i],
"--compression_ratio=%lf%c", &d, &junk) == 1) {
971 }
else if (sscanf(argv[i],
"--histogram=%d%c", &n, &junk) == 1 &&
972 (n == 0 || n == 1)) {
974 }
else if (sscanf(argv[i],
"--use_existing_db=%d%c", &n, &junk) == 1 &&
975 (n == 0 || n == 1)) {
977 }
else if (sscanf(argv[i],
"--reuse_logs=%d%c", &n, &junk) == 1 &&
978 (n == 0 || n == 1)) {
980 }
else if (sscanf(argv[i],
"--num=%d%c", &n, &junk) == 1) {
982 }
else if (sscanf(argv[i],
"--reads=%d%c", &n, &junk) == 1) {
984 }
else if (sscanf(argv[i],
"--threads=%d%c", &n, &junk) == 1) {
986 }
else if (sscanf(argv[i],
"--value_size=%d%c", &n, &junk) == 1) {
988 }
else if (sscanf(argv[i],
"--write_buffer_size=%d%c", &n, &junk) == 1) {
990 }
else if (sscanf(argv[i],
"--max_file_size=%d%c", &n, &junk) == 1) {
992 }
else if (sscanf(argv[i],
"--block_size=%d%c", &n, &junk) == 1) {
994 }
else if (sscanf(argv[i],
"--cache_size=%d%c", &n, &junk) == 1) {
996 }
else if (sscanf(argv[i],
"--bloom_bits=%d%c", &n, &junk) == 1) {
998 }
else if (sscanf(argv[i],
"--open_files=%d%c", &n, &junk) == 1) {
1000 }
else if (strncmp(argv[i],
"--db=", 5) == 0) {
1003 fprintf(stderr,
"Invalid flag '%s'\n", argv[i]);
1013 default_db_path +=
"/dbbench";
1014 FLAGS_db = default_db_path.c_str();
void Merge(const Histogram &other)
void ReadReverse(ThreadState *thread)
void Merge(const Stats &other)
static int FLAGS_open_files
void SnappyUncompress(ThreadState *thread)
void PrintStats(const char *key)
virtual Slice key() const =0
static int FLAGS_value_size
virtual void CompactRange(const Slice *begin, const Slice *end)=0
virtual Slice value() const =0
static void ThreadBody(void *v)
static const char * FLAGS_db
virtual void SeekToFirst()=0
std::string ToString() const
void ReadRandom(ThreadState *thread)
virtual void Seek(const Slice &target)=0
virtual Status NewWritableFile(const std::string &fname, WritableFile **result)=0
virtual void SeekToLast()=0
virtual Status Put(const WriteOptions &options, const Slice &key, const Slice &value)=0
void DoWrite(ThreadState *thread, bool seq)
Slice CompressibleString(Random *rnd, double compressed_fraction, size_t len, std::string *dst)
const FilterPolicy * filter_policy_
virtual Status GetTestDirectory(std::string *path)=0
Status DestroyDB(const std::string &dbname, const Options &options)
void Report(const Slice &name)
void ReadMissing(ThreadState *thread)
Slice Generate(size_t len)
static const char * FLAGS_benchmarks
virtual bool GetProperty(const Slice &property, std::string *value)=0
std::string ToString() const
std::string ToString() const
void Delete(const Slice &key)
virtual void StartThread(void(*function)(void *arg), void *arg)=0
void SnappyCompress(ThreadState *thread)
virtual Status Get(const ReadOptions &options, const Slice &key, std::string *value)=0
static int FLAGS_max_file_size
static Status Open(const Options &options, const std::string &name, DB **dbptr)
void WriteRandom(ThreadState *thread)
WriteOptions write_options_
static int FLAGS_cache_size
const FilterPolicy * NewBloomFilterPolicy(int bits_per_key)
void DeleteRandom(ThreadState *thread)
static int FLAGS_write_buffer_size
void(Benchmark::* method)(ThreadState *)
void AcquireLoad(ThreadState *thread)
static const int kMinorVersion
static int FLAGS_block_size
static bool FLAGS_reuse_logs
int main(int argc, char **argv)
void Compact(ThreadState *thread)
void OpenBench(ThreadState *thread)
void ReadSequential(ThreadState *thread)
virtual Status Write(const WriteOptions &options, WriteBatch *updates)=0
Cache * NewLRUCache(size_t capacity)
uint32_t Value(const char *data, size_t n)
static void ThreadBody(void *arg)
const FilterPolicy * filter_policy
void ReadHot(ThreadState *thread)
void AddMessage(Slice msg)
static void WriteToFile(void *arg, const char *buf, int n)
void RunBenchmark(int n, Slice name, void(Benchmark::*method)(ThreadState *))
void SeekRandom(ThreadState *thread)
static bool FLAGS_use_existing_db
virtual Iterator * NewIterator(const ReadOptions &options)=0
virtual uint64_t NowMicros()=0
void ReadWhileWriting(ThreadState *thread)
bool starts_with(const Slice &x) const
const char * data() const
void Crc32c(ThreadState *thread)
virtual Status DeleteFile(const std::string &fname)=0
static double FLAGS_compression_ratio
virtual bool Valid() const =0
void Put(const Slice &key, const Slice &value)
static int FLAGS_bloom_bits
static const int kMajorVersion
virtual Status GetChildren(const std::string &dir, std::vector< std::string > *result)=0
void WriteSeq(ThreadState *thread)
static void AppendWithSpace(std::string *str, Slice msg)
void DeleteSeq(ThreadState *thread)
void DoDelete(ThreadState *thread, bool seq)
static bool FLAGS_histogram