leveldb
db_bench.cc
Go to the documentation of this file.
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include <sys/types.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include "db/db_impl.h"
9 #include "db/version_set.h"
10 #include "leveldb/cache.h"
11 #include "leveldb/db.h"
12 #include "leveldb/env.h"
13 #include "leveldb/write_batch.h"
14 #include "port/port.h"
15 #include "util/crc32c.h"
16 #include "util/histogram.h"
17 #include "util/mutexlock.h"
18 #include "util/random.h"
19 #include "util/testutil.h"
20 
21 // Comma-separated list of operations to run in the specified order
22 // Actual benchmarks:
23 // fillseq -- write N values in sequential key order in async mode
24 // fillrandom -- write N values in random key order in async mode
25 // overwrite -- overwrite N values in random key order in async mode
26 // fillsync -- write N/100 values in random key order in sync mode
27 // fill100K -- write N/1000 100K values in random order in async mode
28 // deleteseq -- delete N keys in sequential order
29 // deleterandom -- delete N keys in random order
30 // readseq -- read N times sequentially
31 // readreverse -- read N times in reverse order
32 // readrandom -- read N times in random order
33 // readmissing -- read N missing keys in random order
34 // readhot -- read N times in random order from 1% section of DB
35 // seekrandom -- N random seeks
36 // open -- cost of opening a DB
37 // crc32c -- repeated crc32c of 4K of data
38 // acquireload -- load N*1000 times
39 // Meta operations:
40 // compact -- Compact the entire DB
41 // stats -- Print DB stats
42 // sstables -- Print sstable info
43 // heapprofile -- Dump a heap profile (if supported by this port)
44 static const char* FLAGS_benchmarks =
45  "fillseq,"
46  "fillsync,"
47  "fillrandom,"
48  "overwrite,"
49  "readrandom,"
50  "readrandom," // Extra run to allow previous compactions to quiesce
51  "readseq,"
52  "readreverse,"
53  "compact,"
54  "readrandom,"
55  "readseq,"
56  "readreverse,"
57  "fill100K,"
58  "crc32c,"
59  "snappycomp,"
60  "snappyuncomp,"
61  "acquireload,"
62  ;
63 
64 // Number of key/values to place in database
65 static int FLAGS_num = 1000000;
66 
67 // Number of read operations to do. If negative, do FLAGS_num reads.
68 static int FLAGS_reads = -1;
69 
70 // Number of concurrent threads to run.
71 static int FLAGS_threads = 1;
72 
73 // Size of each value
74 static int FLAGS_value_size = 100;
75 
76 // Arrange to generate values that shrink to this fraction of
77 // their original size after compression
78 static double FLAGS_compression_ratio = 0.5;
79 
80 // Print histogram of operation timings
81 static bool FLAGS_histogram = false;
82 
83 // Number of bytes to buffer in memtable before compacting
84 // (initialized to default value by "main")
85 static int FLAGS_write_buffer_size = 0;
86 
87 // Number of bytes written to each file.
88 // (initialized to default value by "main")
89 static int FLAGS_max_file_size = 0;
90 
91 // Approximate size of user data packed per block (before compression.
92 // (initialized to default value by "main")
93 static int FLAGS_block_size = 0;
94 
95 // Number of bytes to use as a cache of uncompressed data.
96 // Negative means use default settings.
97 static int FLAGS_cache_size = -1;
98 
99 // Maximum number of files to keep open at the same time (use default if == 0)
100 static int FLAGS_open_files = 0;
101 
102 // Bloom filter bits per key.
103 // Negative means use default settings.
104 static int FLAGS_bloom_bits = -1;
105 
106 // If true, do not destroy the existing database. If you set this
107 // flag and also specify a benchmark that wants a fresh database, that
108 // benchmark will fail.
109 static bool FLAGS_use_existing_db = false;
110 
111 // If true, reuse existing log/MANIFEST files when re-opening a database.
112 static bool FLAGS_reuse_logs = false;
113 
114 // Use the db with the following name.
115 static const char* FLAGS_db = NULL;
116 
117 namespace leveldb {
118 
119 namespace {
121 
122 // Helper for quickly generating random data.
124  private:
125  std::string data_;
126  int pos_;
127 
128  public:
130  // We use a limited amount of data over and over again and ensure
131  // that it is larger than the compression window (32KB), and also
132  // large enough to serve all typical value sizes we want to write.
133  Random rnd(301);
134  std::string piece;
135  while (data_.size() < 1048576) {
136  // Add a short fragment that is as compressible as specified
137  // by FLAGS_compression_ratio.
139  data_.append(piece);
140  }
141  pos_ = 0;
142  }
143 
144  Slice Generate(size_t len) {
145  if (pos_ + len > data_.size()) {
146  pos_ = 0;
147  assert(len < data_.size());
148  }
149  pos_ += len;
150  return Slice(data_.data() + pos_ - len, len);
151  }
152 };
153 
154 #if defined(__linux)
155 static Slice TrimSpace(Slice s) {
156  size_t start = 0;
157  while (start < s.size() && isspace(s[start])) {
158  start++;
159  }
160  size_t limit = s.size();
161  while (limit > start && isspace(s[limit-1])) {
162  limit--;
163  }
164  return Slice(s.data() + start, limit - start);
165 }
166 #endif
167 
168 static void AppendWithSpace(std::string* str, Slice msg) {
169  if (msg.empty()) return;
170  if (!str->empty()) {
171  str->push_back(' ');
172  }
173  str->append(msg.data(), msg.size());
174 }
175 
176 class Stats {
177  private:
178  double start_;
179  double finish_;
180  double seconds_;
181  int done_;
183  int64_t bytes_;
186  std::string message_;
187 
188  public:
189  Stats() { Start(); }
190 
191  void Start() {
192  next_report_ = 100;
193  last_op_finish_ = start_;
194  hist_.Clear();
195  done_ = 0;
196  bytes_ = 0;
197  seconds_ = 0;
198  start_ = g_env->NowMicros();
199  finish_ = start_;
200  message_.clear();
201  }
202 
203  void Merge(const Stats& other) {
204  hist_.Merge(other.hist_);
205  done_ += other.done_;
206  bytes_ += other.bytes_;
207  seconds_ += other.seconds_;
208  if (other.start_ < start_) start_ = other.start_;
209  if (other.finish_ > finish_) finish_ = other.finish_;
210 
211  // Just keep the messages from one thread
212  if (message_.empty()) message_ = other.message_;
213  }
214 
215  void Stop() {
216  finish_ = g_env->NowMicros();
217  seconds_ = (finish_ - start_) * 1e-6;
218  }
219 
220  void AddMessage(Slice msg) {
221  AppendWithSpace(&message_, msg);
222  }
223 
225  if (FLAGS_histogram) {
226  double now = g_env->NowMicros();
227  double micros = now - last_op_finish_;
228  hist_.Add(micros);
229  if (micros > 20000) {
230  fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
231  fflush(stderr);
232  }
233  last_op_finish_ = now;
234  }
235 
236  done_++;
237  if (done_ >= next_report_) {
238  if (next_report_ < 1000) next_report_ += 100;
239  else if (next_report_ < 5000) next_report_ += 500;
240  else if (next_report_ < 10000) next_report_ += 1000;
241  else if (next_report_ < 50000) next_report_ += 5000;
242  else if (next_report_ < 100000) next_report_ += 10000;
243  else if (next_report_ < 500000) next_report_ += 50000;
244  else next_report_ += 100000;
245  fprintf(stderr, "... finished %d ops%30s\r", done_, "");
246  fflush(stderr);
247  }
248  }
249 
250  void AddBytes(int64_t n) {
251  bytes_ += n;
252  }
253 
254  void Report(const Slice& name) {
255  // Pretend at least one op was done in case we are running a benchmark
256  // that does not call FinishedSingleOp().
257  if (done_ < 1) done_ = 1;
258 
259  std::string extra;
260  if (bytes_ > 0) {
261  // Rate is computed on actual elapsed time, not the sum of per-thread
262  // elapsed times.
263  double elapsed = (finish_ - start_) * 1e-6;
264  char rate[100];
265  snprintf(rate, sizeof(rate), "%6.1f MB/s",
266  (bytes_ / 1048576.0) / elapsed);
267  extra = rate;
268  }
269  AppendWithSpace(&extra, message_);
270 
271  fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n",
272  name.ToString().c_str(),
273  seconds_ * 1e6 / done_,
274  (extra.empty() ? "" : " "),
275  extra.c_str());
276  if (FLAGS_histogram) {
277  fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
278  }
279  fflush(stdout);
280  }
281 };
282 
283 // State shared by all concurrent executions of the same benchmark.
284 struct SharedState {
285  port::Mutex mu;
286  port::CondVar cv;
287  int total;
288 
289  // Each thread goes through the following states:
290  // (1) initializing
291  // (2) waiting for others to be initialized
292  // (3) running
293  // (4) done
294 
296  int num_done;
297  bool start;
298 
299  SharedState() : cv(&mu) { }
300 };
301 
302 // Per-thread state for concurrent executions of the same benchmark.
303 struct ThreadState {
304  int tid; // 0..n-1 when running in n threads
305  Random rand; // Has different seeds for different threads
308 
309  ThreadState(int index)
310  : tid(index),
311  rand(1000 + index) {
312  }
313 };
314 
315 } // namespace
316 
317 class Benchmark {
318  private:
321  DB* db_;
322  int num_;
326  int reads_;
328 
329  void PrintHeader() {
330  const int kKeySize = 16;
331  PrintEnvironment();
332  fprintf(stdout, "Keys: %d bytes each\n", kKeySize);
333  fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n",
335  static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
336  fprintf(stdout, "Entries: %d\n", num_);
337  fprintf(stdout, "RawSize: %.1f MB (estimated)\n",
338  ((static_cast<int64_t>(kKeySize + FLAGS_value_size) * num_)
339  / 1048576.0));
340  fprintf(stdout, "FileSize: %.1f MB (estimated)\n",
341  (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_)
342  / 1048576.0));
343  PrintWarnings();
344  fprintf(stdout, "------------------------------------------------\n");
345  }
346 
347  void PrintWarnings() {
348 #if defined(__GNUC__) && !defined(__OPTIMIZE__)
349  fprintf(stdout,
350  "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
351  );
352 #endif
353 #ifndef NDEBUG
354  fprintf(stdout,
355  "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
356 #endif
357 
358  // See if snappy is working by attempting to compress a compressible string
359  const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy";
360  std::string compressed;
361  if (!port::Snappy_Compress(text, sizeof(text), &compressed)) {
362  fprintf(stdout, "WARNING: Snappy compression is not enabled\n");
363  } else if (compressed.size() >= sizeof(text)) {
364  fprintf(stdout, "WARNING: Snappy compression is not effective\n");
365  }
366  }
367 
369  fprintf(stderr, "LevelDB: version %d.%d\n",
371 
372 #if defined(__linux)
373  time_t now = time(NULL);
374  fprintf(stderr, "Date: %s", ctime(&now)); // ctime() adds newline
375 
376  FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
377  if (cpuinfo != NULL) {
378  char line[1000];
379  int num_cpus = 0;
380  std::string cpu_type;
381  std::string cache_size;
382  while (fgets(line, sizeof(line), cpuinfo) != NULL) {
383  const char* sep = strchr(line, ':');
384  if (sep == NULL) {
385  continue;
386  }
387  Slice key = TrimSpace(Slice(line, sep - 1 - line));
388  Slice val = TrimSpace(Slice(sep + 1));
389  if (key == "model name") {
390  ++num_cpus;
391  cpu_type = val.ToString();
392  } else if (key == "cache size") {
393  cache_size = val.ToString();
394  }
395  }
396  fclose(cpuinfo);
397  fprintf(stderr, "CPU: %d * %s\n", num_cpus, cpu_type.c_str());
398  fprintf(stderr, "CPUCache: %s\n", cache_size.c_str());
399  }
400 #endif
401  }
402 
403  public:
405  : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL),
406  filter_policy_(FLAGS_bloom_bits >= 0
408  : NULL),
409  db_(NULL),
410  num_(FLAGS_num),
411  value_size_(FLAGS_value_size),
412  entries_per_batch_(1),
413  reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
414  heap_counter_(0) {
415  std::vector<std::string> files;
416  g_env->GetChildren(FLAGS_db, &files);
417  for (size_t i = 0; i < files.size(); i++) {
418  if (Slice(files[i]).starts_with("heap-")) {
419  g_env->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
420  }
421  }
422  if (!FLAGS_use_existing_db) {
424  }
425  }
426 
428  delete db_;
429  delete cache_;
430  delete filter_policy_;
431  }
432 
433  void Run() {
434  PrintHeader();
435  Open();
436 
437  const char* benchmarks = FLAGS_benchmarks;
438  while (benchmarks != NULL) {
439  const char* sep = strchr(benchmarks, ',');
440  Slice name;
441  if (sep == NULL) {
442  name = benchmarks;
443  benchmarks = NULL;
444  } else {
445  name = Slice(benchmarks, sep - benchmarks);
446  benchmarks = sep + 1;
447  }
448 
449  // Reset parameters that may be overridden below
450  num_ = FLAGS_num;
451  reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
452  value_size_ = FLAGS_value_size;
453  entries_per_batch_ = 1;
454  write_options_ = WriteOptions();
455 
456  void (Benchmark::*method)(ThreadState*) = NULL;
457  bool fresh_db = false;
458  int num_threads = FLAGS_threads;
459 
460  if (name == Slice("open")) {
461  method = &Benchmark::OpenBench;
462  num_ /= 10000;
463  if (num_ < 1) num_ = 1;
464  } else if (name == Slice("fillseq")) {
465  fresh_db = true;
466  method = &Benchmark::WriteSeq;
467  } else if (name == Slice("fillbatch")) {
468  fresh_db = true;
469  entries_per_batch_ = 1000;
470  method = &Benchmark::WriteSeq;
471  } else if (name == Slice("fillrandom")) {
472  fresh_db = true;
473  method = &Benchmark::WriteRandom;
474  } else if (name == Slice("overwrite")) {
475  fresh_db = false;
476  method = &Benchmark::WriteRandom;
477  } else if (name == Slice("fillsync")) {
478  fresh_db = true;
479  num_ /= 1000;
480  write_options_.sync = true;
481  method = &Benchmark::WriteRandom;
482  } else if (name == Slice("fill100K")) {
483  fresh_db = true;
484  num_ /= 1000;
485  value_size_ = 100 * 1000;
486  method = &Benchmark::WriteRandom;
487  } else if (name == Slice("readseq")) {
488  method = &Benchmark::ReadSequential;
489  } else if (name == Slice("readreverse")) {
490  method = &Benchmark::ReadReverse;
491  } else if (name == Slice("readrandom")) {
492  method = &Benchmark::ReadRandom;
493  } else if (name == Slice("readmissing")) {
494  method = &Benchmark::ReadMissing;
495  } else if (name == Slice("seekrandom")) {
496  method = &Benchmark::SeekRandom;
497  } else if (name == Slice("readhot")) {
498  method = &Benchmark::ReadHot;
499  } else if (name == Slice("readrandomsmall")) {
500  reads_ /= 1000;
501  method = &Benchmark::ReadRandom;
502  } else if (name == Slice("deleteseq")) {
503  method = &Benchmark::DeleteSeq;
504  } else if (name == Slice("deleterandom")) {
505  method = &Benchmark::DeleteRandom;
506  } else if (name == Slice("readwhilewriting")) {
507  num_threads++; // Add extra thread for writing
508  method = &Benchmark::ReadWhileWriting;
509  } else if (name == Slice("compact")) {
510  method = &Benchmark::Compact;
511  } else if (name == Slice("crc32c")) {
512  method = &Benchmark::Crc32c;
513  } else if (name == Slice("acquireload")) {
514  method = &Benchmark::AcquireLoad;
515  } else if (name == Slice("snappycomp")) {
516  method = &Benchmark::SnappyCompress;
517  } else if (name == Slice("snappyuncomp")) {
518  method = &Benchmark::SnappyUncompress;
519  } else if (name == Slice("heapprofile")) {
520  HeapProfile();
521  } else if (name == Slice("stats")) {
522  PrintStats("leveldb.stats");
523  } else if (name == Slice("sstables")) {
524  PrintStats("leveldb.sstables");
525  } else {
526  if (name != Slice()) { // No error message for empty name
527  fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str());
528  }
529  }
530 
531  if (fresh_db) {
532  if (FLAGS_use_existing_db) {
533  fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
534  name.ToString().c_str());
535  method = NULL;
536  } else {
537  delete db_;
538  db_ = NULL;
540  Open();
541  }
542  }
543 
544  if (method != NULL) {
545  RunBenchmark(num_threads, name, method);
546  }
547  }
548  }
549 
550  private:
551  struct ThreadArg {
553  SharedState* shared;
554  ThreadState* thread;
555  void (Benchmark::*method)(ThreadState*);
556  };
557 
558  static void ThreadBody(void* v) {
559  ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
560  SharedState* shared = arg->shared;
561  ThreadState* thread = arg->thread;
562  {
563  MutexLock l(&shared->mu);
564  shared->num_initialized++;
565  if (shared->num_initialized >= shared->total) {
566  shared->cv.SignalAll();
567  }
568  while (!shared->start) {
569  shared->cv.Wait();
570  }
571  }
572 
573  thread->stats.Start();
574  (arg->bm->*(arg->method))(thread);
575  thread->stats.Stop();
576 
577  {
578  MutexLock l(&shared->mu);
579  shared->num_done++;
580  if (shared->num_done >= shared->total) {
581  shared->cv.SignalAll();
582  }
583  }
584  }
585 
586  void RunBenchmark(int n, Slice name,
587  void (Benchmark::*method)(ThreadState*)) {
588  SharedState shared;
589  shared.total = n;
590  shared.num_initialized = 0;
591  shared.num_done = 0;
592  shared.start = false;
593 
594  ThreadArg* arg = new ThreadArg[n];
595  for (int i = 0; i < n; i++) {
596  arg[i].bm = this;
597  arg[i].method = method;
598  arg[i].shared = &shared;
599  arg[i].thread = new ThreadState(i);
600  arg[i].thread->shared = &shared;
601  g_env->StartThread(ThreadBody, &arg[i]);
602  }
603 
604  shared.mu.Lock();
605  while (shared.num_initialized < n) {
606  shared.cv.Wait();
607  }
608 
609  shared.start = true;
610  shared.cv.SignalAll();
611  while (shared.num_done < n) {
612  shared.cv.Wait();
613  }
614  shared.mu.Unlock();
615 
616  for (int i = 1; i < n; i++) {
617  arg[0].thread->stats.Merge(arg[i].thread->stats);
618  }
619  arg[0].thread->stats.Report(name);
620 
621  for (int i = 0; i < n; i++) {
622  delete arg[i].thread;
623  }
624  delete[] arg;
625  }
626 
627  void Crc32c(ThreadState* thread) {
628  // Checksum about 500MB of data total
629  const int size = 4096;
630  const char* label = "(4K per op)";
631  std::string data(size, 'x');
632  int64_t bytes = 0;
633  uint32_t crc = 0;
634  while (bytes < 500 * 1048576) {
635  crc = crc32c::Value(data.data(), size);
636  thread->stats.FinishedSingleOp();
637  bytes += size;
638  }
639  // Print so result is not dead
640  fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));
641 
642  thread->stats.AddBytes(bytes);
643  thread->stats.AddMessage(label);
644  }
645 
646  void AcquireLoad(ThreadState* thread) {
647  int dummy;
648  port::AtomicPointer ap(&dummy);
649  int count = 0;
650  void *ptr = NULL;
651  thread->stats.AddMessage("(each op is 1000 loads)");
652  while (count < 100000) {
653  for (int i = 0; i < 1000; i++) {
654  ptr = ap.Acquire_Load();
655  }
656  count++;
657  thread->stats.FinishedSingleOp();
658  }
659  if (ptr == NULL) exit(1); // Disable unused variable warning.
660  }
661 
662  void SnappyCompress(ThreadState* thread) {
663  RandomGenerator gen;
664  Slice input = gen.Generate(Options().block_size);
665  int64_t bytes = 0;
666  int64_t produced = 0;
667  bool ok = true;
668  std::string compressed;
669  while (ok && bytes < 1024 * 1048576) { // Compress 1G
670  ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
671  produced += compressed.size();
672  bytes += input.size();
673  thread->stats.FinishedSingleOp();
674  }
675 
676  if (!ok) {
677  thread->stats.AddMessage("(snappy failure)");
678  } else {
679  char buf[100];
680  snprintf(buf, sizeof(buf), "(output: %.1f%%)",
681  (produced * 100.0) / bytes);
682  thread->stats.AddMessage(buf);
683  thread->stats.AddBytes(bytes);
684  }
685  }
686 
687  void SnappyUncompress(ThreadState* thread) {
688  RandomGenerator gen;
689  Slice input = gen.Generate(Options().block_size);
690  std::string compressed;
691  bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
692  int64_t bytes = 0;
693  char* uncompressed = new char[input.size()];
694  while (ok && bytes < 1024 * 1048576) { // Compress 1G
695  ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
696  uncompressed);
697  bytes += input.size();
698  thread->stats.FinishedSingleOp();
699  }
700  delete[] uncompressed;
701 
702  if (!ok) {
703  thread->stats.AddMessage("(snappy failure)");
704  } else {
705  thread->stats.AddBytes(bytes);
706  }
707  }
708 
709  void Open() {
710  assert(db_ == NULL);
711  Options options;
712  options.env = g_env;
714  options.block_cache = cache_;
717  options.block_size = FLAGS_block_size;
719  options.filter_policy = filter_policy_;
720  options.reuse_logs = FLAGS_reuse_logs;
721  Status s = DB::Open(options, FLAGS_db, &db_);
722  if (!s.ok()) {
723  fprintf(stderr, "open error: %s\n", s.ToString().c_str());
724  exit(1);
725  }
726  }
727 
728  void OpenBench(ThreadState* thread) {
729  for (int i = 0; i < num_; i++) {
730  delete db_;
731  Open();
732  thread->stats.FinishedSingleOp();
733  }
734  }
735 
736  void WriteSeq(ThreadState* thread) {
737  DoWrite(thread, true);
738  }
739 
740  void WriteRandom(ThreadState* thread) {
741  DoWrite(thread, false);
742  }
743 
744  void DoWrite(ThreadState* thread, bool seq) {
745  if (num_ != FLAGS_num) {
746  char msg[100];
747  snprintf(msg, sizeof(msg), "(%d ops)", num_);
748  thread->stats.AddMessage(msg);
749  }
750 
751  RandomGenerator gen;
752  WriteBatch batch;
753  Status s;
754  int64_t bytes = 0;
755  for (int i = 0; i < num_; i += entries_per_batch_) {
756  batch.Clear();
757  for (int j = 0; j < entries_per_batch_; j++) {
758  const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
759  char key[100];
760  snprintf(key, sizeof(key), "%016d", k);
761  batch.Put(key, gen.Generate(value_size_));
762  bytes += value_size_ + strlen(key);
763  thread->stats.FinishedSingleOp();
764  }
765  s = db_->Write(write_options_, &batch);
766  if (!s.ok()) {
767  fprintf(stderr, "put error: %s\n", s.ToString().c_str());
768  exit(1);
769  }
770  }
771  thread->stats.AddBytes(bytes);
772  }
773 
774  void ReadSequential(ThreadState* thread) {
775  Iterator* iter = db_->NewIterator(ReadOptions());
776  int i = 0;
777  int64_t bytes = 0;
778  for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
779  bytes += iter->key().size() + iter->value().size();
780  thread->stats.FinishedSingleOp();
781  ++i;
782  }
783  delete iter;
784  thread->stats.AddBytes(bytes);
785  }
786 
787  void ReadReverse(ThreadState* thread) {
788  Iterator* iter = db_->NewIterator(ReadOptions());
789  int i = 0;
790  int64_t bytes = 0;
791  for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
792  bytes += iter->key().size() + iter->value().size();
793  thread->stats.FinishedSingleOp();
794  ++i;
795  }
796  delete iter;
797  thread->stats.AddBytes(bytes);
798  }
799 
800  void ReadRandom(ThreadState* thread) {
801  ReadOptions options;
802  std::string value;
803  int found = 0;
804  for (int i = 0; i < reads_; i++) {
805  char key[100];
806  const int k = thread->rand.Next() % FLAGS_num;
807  snprintf(key, sizeof(key), "%016d", k);
808  if (db_->Get(options, key, &value).ok()) {
809  found++;
810  }
811  thread->stats.FinishedSingleOp();
812  }
813  char msg[100];
814  snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
815  thread->stats.AddMessage(msg);
816  }
817 
818  void ReadMissing(ThreadState* thread) {
819  ReadOptions options;
820  std::string value;
821  for (int i = 0; i < reads_; i++) {
822  char key[100];
823  const int k = thread->rand.Next() % FLAGS_num;
824  snprintf(key, sizeof(key), "%016d.", k);
825  db_->Get(options, key, &value);
826  thread->stats.FinishedSingleOp();
827  }
828  }
829 
830  void ReadHot(ThreadState* thread) {
831  ReadOptions options;
832  std::string value;
833  const int range = (FLAGS_num + 99) / 100;
834  for (int i = 0; i < reads_; i++) {
835  char key[100];
836  const int k = thread->rand.Next() % range;
837  snprintf(key, sizeof(key), "%016d", k);
838  db_->Get(options, key, &value);
839  thread->stats.FinishedSingleOp();
840  }
841  }
842 
843  void SeekRandom(ThreadState* thread) {
844  ReadOptions options;
845  int found = 0;
846  for (int i = 0; i < reads_; i++) {
847  Iterator* iter = db_->NewIterator(options);
848  char key[100];
849  const int k = thread->rand.Next() % FLAGS_num;
850  snprintf(key, sizeof(key), "%016d", k);
851  iter->Seek(key);
852  if (iter->Valid() && iter->key() == key) found++;
853  delete iter;
854  thread->stats.FinishedSingleOp();
855  }
856  char msg[100];
857  snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
858  thread->stats.AddMessage(msg);
859  }
860 
861  void DoDelete(ThreadState* thread, bool seq) {
862  RandomGenerator gen;
863  WriteBatch batch;
864  Status s;
865  for (int i = 0; i < num_; i += entries_per_batch_) {
866  batch.Clear();
867  for (int j = 0; j < entries_per_batch_; j++) {
868  const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
869  char key[100];
870  snprintf(key, sizeof(key), "%016d", k);
871  batch.Delete(key);
872  thread->stats.FinishedSingleOp();
873  }
874  s = db_->Write(write_options_, &batch);
875  if (!s.ok()) {
876  fprintf(stderr, "del error: %s\n", s.ToString().c_str());
877  exit(1);
878  }
879  }
880  }
881 
882  void DeleteSeq(ThreadState* thread) {
883  DoDelete(thread, true);
884  }
885 
886  void DeleteRandom(ThreadState* thread) {
887  DoDelete(thread, false);
888  }
889 
890  void ReadWhileWriting(ThreadState* thread) {
891  if (thread->tid > 0) {
892  ReadRandom(thread);
893  } else {
894  // Special thread that keeps writing until other threads are done.
895  RandomGenerator gen;
896  while (true) {
897  {
898  MutexLock l(&thread->shared->mu);
899  if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
900  // Other threads have finished
901  break;
902  }
903  }
904 
905  const int k = thread->rand.Next() % FLAGS_num;
906  char key[100];
907  snprintf(key, sizeof(key), "%016d", k);
908  Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
909  if (!s.ok()) {
910  fprintf(stderr, "put error: %s\n", s.ToString().c_str());
911  exit(1);
912  }
913  }
914 
915  // Do not count any of the preceding work/delay in stats.
916  thread->stats.Start();
917  }
918  }
919 
920  void Compact(ThreadState* thread) {
921  db_->CompactRange(NULL, NULL);
922  }
923 
924  void PrintStats(const char* key) {
925  std::string stats;
926  if (!db_->GetProperty(key, &stats)) {
927  stats = "(failed)";
928  }
929  fprintf(stdout, "\n%s\n", stats.c_str());
930  }
931 
932  static void WriteToFile(void* arg, const char* buf, int n) {
933  reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n));
934  }
935 
936  void HeapProfile() {
937  char fname[100];
938  snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
939  WritableFile* file;
940  Status s = g_env->NewWritableFile(fname, &file);
941  if (!s.ok()) {
942  fprintf(stderr, "%s\n", s.ToString().c_str());
943  return;
944  }
945  bool ok = port::GetHeapProfile(WriteToFile, file);
946  delete file;
947  if (!ok) {
948  fprintf(stderr, "heap profiling not supported\n");
949  g_env->DeleteFile(fname);
950  }
951  }
952 };
953 
954 } // namespace leveldb
955 
956 int main(int argc, char** argv) {
961  std::string default_db_path;
962 
963  for (int i = 1; i < argc; i++) {
964  double d;
965  int n;
966  char junk;
967  if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) {
968  FLAGS_benchmarks = argv[i] + strlen("--benchmarks=");
969  } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) {
971  } else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 &&
972  (n == 0 || n == 1)) {
973  FLAGS_histogram = n;
974  } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
975  (n == 0 || n == 1)) {
977  } else if (sscanf(argv[i], "--reuse_logs=%d%c", &n, &junk) == 1 &&
978  (n == 0 || n == 1)) {
979  FLAGS_reuse_logs = n;
980  } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
981  FLAGS_num = n;
982  } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
983  FLAGS_reads = n;
984  } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) {
985  FLAGS_threads = n;
986  } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) {
987  FLAGS_value_size = n;
988  } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
990  } else if (sscanf(argv[i], "--max_file_size=%d%c", &n, &junk) == 1) {
992  } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) {
993  FLAGS_block_size = n;
994  } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
995  FLAGS_cache_size = n;
996  } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) {
997  FLAGS_bloom_bits = n;
998  } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
999  FLAGS_open_files = n;
1000  } else if (strncmp(argv[i], "--db=", 5) == 0) {
1001  FLAGS_db = argv[i] + 5;
1002  } else {
1003  fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
1004  exit(1);
1005  }
1006  }
1007 
1009 
1010  // Choose a location for the test database if none given with --db=<path>
1011  if (FLAGS_db == NULL) {
1012  leveldb::g_env->GetTestDirectory(&default_db_path);
1013  default_db_path += "/dbbench";
1014  FLAGS_db = default_db_path.c_str();
1015  }
1016 
1017  leveldb::Benchmark benchmark;
1018  benchmark.Run();
1019  return 0;
1020 }
static int FLAGS_threads
Definition: db_bench.cc:71
void PrintEnvironment()
Definition: db_bench.cc:368
void Merge(const Histogram &other)
Definition: histogram.cc:58
void Add(double value)
Definition: histogram.cc:44
void ReadReverse(ThreadState *thread)
Definition: db_bench.cc:787
size_t write_buffer_size
Definition: options.h:83
static int FLAGS_open_files
Definition: db_bench.cc:100
void SnappyUncompress(ThreadState *thread)
Definition: db_bench.cc:687
void PrintStats(const char *key)
Definition: db_bench.cc:924
virtual Slice key() const =0
static int FLAGS_value_size
Definition: db_bench.cc:74
virtual void CompactRange(const Slice *begin, const Slice *end)=0
virtual Slice value() const =0
void PrintWarnings()
Definition: db_bench.cc:347
static void ThreadBody(void *v)
Definition: db_bench.cc:558
static const char * FLAGS_db
Definition: db_bench.cc:115
virtual void SeekToFirst()=0
bool empty() const
Definition: slice.h:46
std::string ToString() const
Definition: slice.h:66
void ReadRandom(ThreadState *thread)
Definition: db_bench.cc:800
virtual void Seek(const Slice &target)=0
virtual void Next()=0
virtual Status NewWritableFile(const std::string &fname, WritableFile **result)=0
virtual void SeekToLast()=0
virtual Status Put(const WriteOptions &options, const Slice &key, const Slice &value)=0
Definition: db_impl.cc:1476
void DoWrite(ThreadState *thread, bool seq)
Definition: db_bench.cc:744
Slice CompressibleString(Random *rnd, double compressed_fraction, size_t len, std::string *dst)
Definition: testutil.cc:34
static int FLAGS_reads
Definition: db_bench.cc:68
const FilterPolicy * filter_policy_
Definition: db_bench.cc:320
virtual Status GetTestDirectory(std::string *path)=0
size_t max_file_size
Definition: options.h:125
Status DestroyDB(const std::string &dbname, const Options &options)
Definition: db_impl.cc:1537
void ReadMissing(ThreadState *thread)
Definition: db_bench.cc:818
static const char * FLAGS_benchmarks
Definition: db_bench.cc:44
virtual bool GetProperty(const Slice &property, std::string *value)=0
std::string ToString() const
Definition: status.cc:36
std::string ToString() const
Definition: histogram.cc:105
void Delete(const Slice &key)
Definition: write_batch.cc:105
virtual void StartThread(void(*function)(void *arg), void *arg)=0
virtual void Prev()=0
void SnappyCompress(ThreadState *thread)
Definition: db_bench.cc:662
virtual Status Get(const ReadOptions &options, const Slice &key, std::string *value)=0
static int FLAGS_max_file_size
Definition: db_bench.cc:89
static Status Open(const Options &options, const std::string &name, DB **dbptr)
Definition: db_impl.cc:1490
void WriteRandom(ThreadState *thread)
Definition: db_bench.cc:740
WriteOptions write_options_
Definition: db_bench.cc:325
static int FLAGS_cache_size
Definition: db_bench.cc:97
const FilterPolicy * NewBloomFilterPolicy(int bits_per_key)
Definition: bloom.cc:91
void DeleteRandom(ThreadState *thread)
Definition: db_bench.cc:886
static int FLAGS_write_buffer_size
Definition: db_bench.cc:85
bool create_if_missing
Definition: options.h:45
void(Benchmark::* method)(ThreadState *)
Definition: db_bench.cc:555
void AcquireLoad(ThreadState *thread)
Definition: db_bench.cc:646
static const int kMinorVersion
Definition: db.h:17
static int FLAGS_block_size
Definition: db_bench.cc:93
static bool FLAGS_reuse_logs
Definition: db_bench.cc:112
int main(int argc, char **argv)
Definition: db_bench.cc:956
void Compact(ThreadState *thread)
Definition: db_bench.cc:920
void OpenBench(ThreadState *thread)
Definition: db_bench.cc:728
Cache * block_cache
Definition: options.h:98
void ReadSequential(ThreadState *thread)
Definition: db_bench.cc:774
virtual Status Write(const WriteOptions &options, WriteBatch *updates)=0
static int FLAGS_num
Definition: db_bench.cc:65
int max_open_files
Definition: options.h:90
Cache * NewLRUCache(size_t capacity)
Definition: cache.cc:401
uint32_t Value(const char *data, size_t n)
Definition: crc32c.h:20
static void ThreadBody(void *arg)
Definition: env_test.cc:73
const FilterPolicy * filter_policy
Definition: options.h:154
void ReadHot(ThreadState *thread)
Definition: db_bench.cc:830
bool ok() const
Definition: status.h:52
static void WriteToFile(void *arg, const char *buf, int n)
Definition: db_bench.cc:932
void RunBenchmark(int n, Slice name, void(Benchmark::*method)(ThreadState *))
Definition: db_bench.cc:586
void SeekRandom(ThreadState *thread)
Definition: db_bench.cc:843
static bool FLAGS_use_existing_db
Definition: db_bench.cc:109
virtual Iterator * NewIterator(const ReadOptions &options)=0
virtual uint64_t NowMicros()=0
void ReadWhileWriting(ThreadState *thread)
Definition: db_bench.cc:890
size_t block_size
Definition: options.h:106
bool starts_with(const Slice &x) const
Definition: slice.h:75
const char * data() const
Definition: slice.h:40
static Env * Default()
Definition: env_posix.cc:613
void Crc32c(ThreadState *thread)
Definition: db_bench.cc:627
Definition: db.h:44
virtual Status DeleteFile(const std::string &fname)=0
size_t size() const
Definition: slice.h:43
static double FLAGS_compression_ratio
Definition: db_bench.cc:78
virtual bool Valid() const =0
void Put(const Slice &key, const Slice &value)
Definition: write_batch.cc:98
static int FLAGS_bloom_bits
Definition: db_bench.cc:104
static const int kMajorVersion
Definition: db.h:16
virtual Status GetChildren(const std::string &dir, std::vector< std::string > *result)=0
void WriteSeq(ThreadState *thread)
Definition: db_bench.cc:736
static void AppendWithSpace(std::string *str, Slice msg)
Definition: db_bench.cc:168
void DeleteSeq(ThreadState *thread)
Definition: db_bench.cc:882
void DoDelete(ThreadState *thread, bool seq)
Definition: db_bench.cc:861
static bool FLAGS_histogram
Definition: db_bench.cc:81