15 #include <sys/types.h> 22 #include "port/port.h" 42 : filename_(fname), file_(f) { }
47 size_t r = fread_unlocked(scratch, 1, n, file_);
48 *result =
Slice(scratch, r);
61 if (fseek(file_, n, SEEK_CUR)) {
62 return IOError(filename_, errno);
76 : filename_(fname), fd_(fd) { }
80 char* scratch)
const {
82 ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
83 *result =
Slice(scratch, (r < 0) ? 0 : r);
99 SetAllowed(
sizeof(
void*) >= 8 ? 1000 : 0);
105 if (GetAllowed() <= 0) {
109 intptr_t x = GetAllowed();
121 SetAllowed(GetAllowed() + 1);
129 return reinterpret_cast<intptr_t
>(allowed_.Acquire_Load());
134 allowed_.Release_Store(reinterpret_cast<void*>(v));
153 : filename_(fname), mmapped_region_(base), length_(length),
158 munmap(mmapped_region_, length_);
163 char* scratch)
const {
165 if (offset + n > length_) {
167 s =
IOError(filename_, EINVAL);
169 *result =
Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
182 : filename_(fname), file_(f) { }
192 size_t r = fwrite_unlocked(data.
data(), 1, data.
size(), file_);
193 if (r != data.
size()) {
194 return IOError(filename_, errno);
201 if (fclose(file_) != 0) {
202 result =
IOError(filename_, errno);
209 if (fflush_unlocked(file_) != 0) {
210 return IOError(filename_, errno);
216 const char* f = filename_.c_str();
217 const char* sep = strrchr(f,
'/');
224 dir = std::string(f, sep - f);
229 int fd = open(dir.c_str(), O_RDONLY);
244 Status s = SyncDirIfManifest();
248 if (fflush_unlocked(file_) != 0 ||
249 fdatasync(fileno(file_)) != 0) {
259 memset(&f, 0,
sizeof(f));
260 f.l_type = (lock ? F_WRLCK : F_UNLCK);
261 f.l_whence = SEEK_SET;
264 return fcntl(fd, F_SETLK, &f);
283 return locked_files_.insert(fname).second;
287 locked_files_.erase(fname);
295 char msg[] =
"Destroying Env::Default()\n";
296 fwrite(msg, 1,
sizeof(msg), stderr);
302 FILE* f = fopen(fname.c_str(),
"r");
316 int fd = open(fname.c_str(), O_RDONLY);
319 }
else if (mmap_limit_.Acquire()) {
321 s = GetFileSize(fname, &size);
323 void* base = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
324 if (base != MAP_FAILED) {
332 mmap_limit_.Release();
343 FILE* f = fopen(fname.c_str(),
"w");
356 FILE* f = fopen(fname.c_str(),
"a");
367 return access(fname.c_str(), F_OK) == 0;
371 std::vector<std::string>* result) {
373 DIR* d = opendir(dir.c_str());
377 struct dirent* entry;
378 while ((entry = readdir(d)) != NULL) {
379 result->push_back(entry->d_name);
387 if (unlink(fname.c_str()) != 0) {
388 result =
IOError(fname, errno);
395 if (mkdir(name.c_str(), 0755) != 0) {
403 if (rmdir(name.c_str()) != 0) {
412 if (stat(fname.c_str(), &sbuf) != 0) {
416 *size = sbuf.st_size;
423 if (rename(src.c_str(), target.c_str()) != 0) {
432 int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
434 result =
IOError(fname, errno);
435 }
else if (!locks_.Insert(fname)) {
439 result =
IOError(
"lock " + fname, errno);
441 locks_.Remove(fname);
445 my_lock->
name_ = fname;
455 result =
IOError(
"unlock", errno);
457 locks_.Remove(my_lock->
name_);
463 virtual void Schedule(
void (*
function)(
void*),
void* arg);
465 virtual void StartThread(
void (*
function)(
void* arg),
void* arg);
468 const char* env = getenv(
"TEST_TMPDIR");
469 if (env && env[0] !=
'\0') {
473 snprintf(buf,
sizeof(buf),
"/tmp/leveldbtest-%d",
int(geteuid()));
482 pthread_t tid = pthread_self();
483 uint64_t thread_id = 0;
484 memcpy(&thread_id, &tid, std::min(
sizeof(thread_id),
sizeof(tid)));
489 FILE* f = fopen(fname.c_str(),
"w");
501 gettimeofday(&tv, NULL);
502 return static_cast<uint64_t
>(tv.tv_sec) * 1000000 + tv.tv_usec;
512 fprintf(stderr,
"pthread %s: %s\n", label, strerror(result));
520 reinterpret_cast<PosixEnv*
>(arg)->BGThread();
530 struct BGItem {
void* arg; void (*
function)(
void*); };
538 PosixEnv::PosixEnv() : started_bgthread_(false) {
562 queue_.back().function =
function;
576 void (*
function)(
void*) =
queue_.front().function;
577 void* arg =
queue_.front().arg;
587 void (*user_function)(
void*);
592 StartThreadState* state =
reinterpret_cast<StartThreadState*
>(arg);
593 state->user_function(state->arg);
600 StartThreadState* state =
new StartThreadState;
601 state->user_function =
function;
609 static pthread_once_t
once = PTHREAD_ONCE_INIT;
virtual ~PosixRandomAccessFile()
void PthreadCall(const char *label, int result)
static void * StartThreadWrapper(void *arg)
virtual Status NewLogger(const std::string &fname, Logger **result)
intptr_t GetAllowed() const
static void InitDefaultEnv()
virtual Status GetTestDirectory(std::string *result)
virtual void SleepForMicroseconds(int micros)
std::set< std::string > locked_files_
virtual Status DeleteDir(const std::string &name)
virtual Status Append(const Slice &data)
std::deque< BGItem > BGQueue
static int LockOrUnlock(int fd, bool lock)
void SetAllowed(intptr_t v)
static port::OnceType once
virtual uint64_t NowMicros()
virtual ~PosixMmapReadableFile()
PosixRandomAccessFile(const std::string &fname, int fd)
PosixWritableFile(const std::string &fname, FILE *f)
virtual Status RenameFile(const std::string &src, const std::string &target)
PosixMmapReadableFile(const std::string &fname, void *base, size_t length, MmapLimiter *limiter)
virtual void Schedule(void(*function)(void *), void *arg)
virtual Status NewWritableFile(const std::string &fname, WritableFile **result)
PosixSequentialFile(const std::string &fname, FILE *f)
void Remove(const std::string &fname)
virtual void StartThread(void(*function)(void *arg), void *arg)
port::AtomicPointer allowed_
Status SyncDirIfManifest()
static void * BGThreadWrapper(void *arg)
virtual Status CreateDir(const std::string &name)
virtual Status GetChildren(const std::string &dir, std::vector< std::string > *result)
virtual Status DeleteFile(const std::string &fname)
virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const
virtual bool FileExists(const std::string &fname)
bool Insert(const std::string &fname)
virtual Status Read(uint64_t offset, size_t n, Slice *result, char *scratch) const
virtual Status Skip(uint64_t n)
virtual Status Read(size_t n, Slice *result, char *scratch)
virtual Status NewRandomAccessFile(const std::string &fname, RandomAccessFile **result)
virtual ~PosixSequentialFile()
virtual Status LockFile(const std::string &fname, FileLock **lock)
virtual Status NewSequentialFile(const std::string &fname, SequentialFile **result)
bool starts_with(const Slice &x) const
static Status IOError(const std::string &context, int err_number)
virtual Status GetFileSize(const std::string &fname, uint64_t *size)
virtual Status UnlockFile(FileLock *lock)
const char * data() const
static Status IOError(const Slice &msg, const Slice &msg2=Slice())
virtual Status NewAppendableFile(const std::string &fname, WritableFile **result)