Skip to content

Commit

Permalink
Improve EnvHdfs
Browse files Browse the repository at this point in the history
Summary: Copy improvements from fbcode's version of EnvHdfs to our open-source version. Some very important bug fixes in there.

Test Plan: compiles

Reviewers: dhruba, haobo, sdong

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D18711
  • Loading branch information
igorcanadi committed May 14, 2014
1 parent f457444 commit eea7322
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 45 deletions.
22 changes: 13 additions & 9 deletions hdfs/env_hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

namespace rocksdb {

static const std::string kProto = "hdfs://";
static const std::string pathsep = "/";

// Thrown during execution when there is an issue with the supplied
// arguments.
class HdfsUsageException : public std::exception { };
Expand Down Expand Up @@ -58,20 +55,23 @@ class HdfsEnv : public Env {
}

virtual Status NewSequentialFile(const std::string& fname,
SequentialFile** result);
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options);

virtual Status NewRandomAccessFile(const std::string& fname,
RandomAccessFile** result);
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options);

virtual Status NewWritableFile(const std::string& fname,
WritableFile** result);
std::unique_ptr<WritableFile>* result,
const EnvOptions& options);

virtual Status NewRandomRWFile(const std::string& fname,
unique_ptr<RandomRWFile>* result,
std::unique_ptr<RandomRWFile>* result,
const EnvOptions& options);

virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result);
std::unique_ptr<Directory>* result);

virtual bool FileExists(const std::string& fname);

Expand All @@ -97,7 +97,8 @@ class HdfsEnv : public Env {

virtual Status UnlockFile(FileLock* lock);

virtual Status NewLogger(const std::string& fname, Logger** result);
virtual Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result);

virtual void Schedule(void (*function)(void* arg), void* arg,
Priority pri = LOW) {
Expand Down Expand Up @@ -161,6 +162,9 @@ class HdfsEnv : public Env {
// object here so that we can use posix timers,
// posix threads, etc.

static const std::string kProto;
static const std::string pathsep;

/**
* If the URI is specified of the form hdfs://server:port/path,
* then connect to the specified cluster
Expand Down
117 changes: 81 additions & 36 deletions util/env_hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "hdfs/hdfs.h"
#include "hdfs/env_hdfs.h"

#define HDFS_EXISTS 0
#define HDFS_DOESNT_EXIST 1

//
// This file defines an HDFS environment for rocksdb. It uses the libhdfs
// api to access HDFS. All HDFS files created by one instance of rocksdb
Expand All @@ -39,7 +42,8 @@ static Logger* mylog = nullptr;

// Used for reading a file from HDFS. It implements both sequential-read
// access methods as well as random read access methods.
class HdfsReadableFile: virtual public SequentialFile, virtual public RandomAccessFile {
class HdfsReadableFile : virtual public SequentialFile,
virtual public RandomAccessFile {
private:
hdfsFS fileSys_;
std::string filename_;
Expand Down Expand Up @@ -73,17 +77,34 @@ class HdfsReadableFile: virtual public SequentialFile, virtual public RandomAcce
Status s;
Log(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",
filename_.c_str(), n);
size_t bytes_read = hdfsRead(fileSys_, hfile_, scratch, (tSize)n);
Log(mylog, "[hdfs] HdfsReadableFile read %s\n", filename_.c_str());
*result = Slice(scratch, bytes_read);
if (bytes_read < n) {
if (feof()) {
// We leave status as ok if we hit the end of the file
} else {
// A partial read with an error: return a non-ok status
s = IOError(filename_, errno);

char* buffer = scratch;
size_t total_bytes_read = 0;
tSize bytes_read = 0;
tSize remaining_bytes = (tSize)n;

// Read a total of n bytes repeatedly until we hit error or eof
while (remaining_bytes > 0) {
bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);
if (bytes_read <= 0) {
break;
}
assert(bytes_read <= remaining_bytes);

total_bytes_read += bytes_read;
remaining_bytes -= bytes_read;
buffer += bytes_read;
}
assert(total_bytes_read <= n);

Log(mylog, "[hdfs] HdfsReadableFile read %s\n", filename_.c_str());

if (bytes_read < 0) {
s = IOError(filename_, errno);
} else {
*result = Slice(scratch, total_bytes_read);
}

return s;
}

Expand Down Expand Up @@ -139,8 +160,7 @@ class HdfsReadableFile: virtual public SequentialFile, virtual public RandomAcce
size = pFileInfo->mSize;
hdfsFreeFileInfo(pFileInfo, 1);
} else {
throw rocksdb::HdfsFatalException("fileSize on unknown file " +
filename_);
throw HdfsFatalException("fileSize on unknown file " + filename_);
}
return size;
}
Expand Down Expand Up @@ -236,9 +256,8 @@ class HdfsLogger : public Logger {
uint64_t (*gettid_)(); // Return the thread id for the current thread

public:
HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)(),
const InfoLogLevel log_level = InfoLogLevel::ERROR)
: Logger(log_level), file_(f), gettid_(gettid) {
HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
: file_(f), gettid_(gettid) {
Log(mylog, "[hdfs] HdfsLogger opened %s\n",
file_->getName().c_str());
}
Expand Down Expand Up @@ -324,40 +343,52 @@ class HdfsLogger : public Logger {

// Finally, the hdfs environment

const std::string HdfsEnv::kProto = "hdfs://";
const std::string HdfsEnv::pathsep = "/";

// open a file for sequential reading
Status HdfsEnv::NewSequentialFile(const std::string& fname,
SequentialFile** result) {
unique_ptr<SequentialFile>* result,
const EnvOptions& options) {
result->reset();
HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
if (f == nullptr) {
if (f == nullptr || !f->isValid()) {
delete f;
*result = nullptr;
return IOError(fname, errno);
}
*result = dynamic_cast<SequentialFile*>(f);
result->reset(dynamic_cast<SequentialFile*>(f));
return Status::OK();
}

// open a file for random reading
Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
RandomAccessFile** result) {
unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) {
result->reset();
HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
if (f == nullptr) {
if (f == nullptr || !f->isValid()) {
delete f;
*result = nullptr;
return IOError(fname, errno);
}
*result = dynamic_cast<RandomAccessFile*>(f);
result->reset(dynamic_cast<RandomAccessFile*>(f));
return Status::OK();
}

// create a new file for writing
Status HdfsEnv::NewWritableFile(const std::string& fname,
WritableFile** result) {
unique_ptr<WritableFile>* result,
const EnvOptions& options) {
result->reset();
Status s;
HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
if (f == nullptr || !f->isValid()) {
delete f;
*result = nullptr;
return IOError(fname, errno);
}
*result = dynamic_cast<WritableFile*>(f);
result->reset(dynamic_cast<WritableFile*>(f));
return Status::OK();
}

Expand All @@ -367,24 +398,30 @@ Status HdfsEnv::NewRandomRWFile(const std::string& fname,
return Status::NotSupported("NewRandomRWFile not supported on HdfsEnv");
}

virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
return Status::NotSupported("NewDirectory not yet supported on HdfsEnv");
Status HdfsEnv::NewDirectory(const std::string& name,
unique_ptr<Directory>* result) {
return Status::NotSupported("NewDirectory not supported on HdfsEnv");
}

bool HdfsEnv::FileExists(const std::string& fname) {
int value = hdfsExists(fileSys_, fname.c_str());
if (value == 0) {
switch (value) {
case HDFS_EXISTS:
return true;
case HDFS_DOESNT_EXIST:
return false;
default: // anything else should be an error
Log(mylog, "FileExists hdfsExists call failed");
throw HdfsFatalException("hdfsExists call failed with error " +
std::to_string(value) + ".\n");
}
return false;
}

Status HdfsEnv::GetChildren(const std::string& path,
std::vector<std::string>* result) {
int value = hdfsExists(fileSys_, path.c_str());
switch (value) {
case 0: {
case HDFS_EXISTS: { // directory exists
int numEntries = 0;
hdfsFileInfo* pHdfsFileInfo = 0;
pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
Expand All @@ -402,15 +439,17 @@ Status HdfsEnv::GetChildren(const std::string& path,
} else {
// numEntries < 0 indicates error
Log(mylog, "hdfsListDirectory call failed with error ");
throw HdfsFatalException("hdfsListDirectory call failed negative error.\n");
throw HdfsFatalException(
"hdfsListDirectory call failed negative error.\n");
}
break;
}
case 1: // directory does not exist, exit
case HDFS_DOESNT_EXIST: // directory does not exist, exit
break;
default: // anything else should be an error
Log(mylog, "hdfsListDirectory call failed with error ");
throw HdfsFatalException("hdfsListDirectory call failed with error.\n");
Log(mylog, "GetChildren hdfsExists call failed");
throw HdfsFatalException("hdfsExists call failed with error " +
std::to_string(value) + ".\n");
}
return Status::OK();
}
Expand All @@ -432,10 +471,15 @@ Status HdfsEnv::CreateDir(const std::string& name) {
Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
const int value = hdfsExists(fileSys_, name.c_str());
// Not atomic. state might change b/w hdfsExists and CreateDir.
if (value == 0) {
switch (value) {
case HDFS_EXISTS:
return Status::OK();
} else {
case HDFS_DOESNT_EXIST:
return CreateDir(name);
default: // anything else should be an error
Log(mylog, "CreateDirIfMissing hdfsExists call failed");
throw HdfsFatalException("hdfsExists call failed with error " +
std::to_string(value) + ".\n");
}
};

Expand Down Expand Up @@ -492,11 +536,12 @@ Status HdfsEnv::NewLogger(const std::string& fname,
shared_ptr<Logger>* result) {
HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
if (f == nullptr || !f->isValid()) {
delete f;
*result = nullptr;
return IOError(fname, errno);
}
HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
*result = h;
result->reset(h);
if (mylog == nullptr) {
// mylog = h; // uncomment this for detailed logging
}
Expand Down

0 comments on commit eea7322

Please sign in to comment.