Skip to content

Commit

Permalink
[DATA] fix async data writing
Browse files Browse the repository at this point in the history
  • Loading branch information
tqchen committed May 22, 2016
1 parent 2c0c066 commit d816208
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 105 deletions.
61 changes: 32 additions & 29 deletions src/data/sparse_page_dmatrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,41 +256,44 @@ void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
name_shards.push_back(prefix + ".col.page");
format_shards.push_back(SparsePage::Format::DecideFormat(prefix).second);
}
SparsePage::Writer writer(name_shards, format_shards, 6);
std::unique_ptr<SparsePage> page;
writer.Alloc(&page); page->Clear();

double tstart = dmlc::GetTime();
size_t bytes_write = 0;
// print every 4 sec.
const double kStep = 4.0;
size_t tick_expected = kStep;
{
SparsePage::Writer writer(name_shards, format_shards, 6);
std::unique_ptr<SparsePage> page;
writer.Alloc(&page); page->Clear();

while (make_next_col(page.get())) {
for (size_t i = 0; i < page->Size(); ++i) {
col_size_[i] += page->offset[i + 1] - page->offset[i];
}
double tstart = dmlc::GetTime();
size_t bytes_write = 0;
// print every 4 sec.
const double kStep = 4.0;
size_t tick_expected = kStep;

while (make_next_col(page.get())) {
for (size_t i = 0; i < page->Size(); ++i) {
col_size_[i] += page->offset[i + 1] - page->offset[i];
}

bytes_write += page->MemCostBytes();
writer.PushWrite(std::move(page));
writer.Alloc(&page);
page->Clear();
bytes_write += page->MemCostBytes();
writer.PushWrite(std::move(page));
writer.Alloc(&page);
page->Clear();

double tdiff = dmlc::GetTime() - tstart;
if (tdiff >= tick_expected) {
LOG(CONSOLE) << "Writing col.page file to " << cache_info_
<< " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " MB writen";
tick_expected += kStep;
double tdiff = dmlc::GetTime() - tstart;
if (tdiff >= tick_expected) {
LOG(CONSOLE) << "Writing col.page file to " << cache_info_
<< " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " MB writen";
tick_expected += kStep;
}
}
// save meta data
std::string col_meta_name = cache_shards[0] + ".col.meta";
std::unique_ptr<dmlc::Stream> fo(
dmlc::Stream::Create(col_meta_name.c_str(), "w"));
fo->Write(buffered_rowset_);
fo->Write(col_size_);
fo.reset(nullptr);
}
// save meta data
std::string col_meta_name = cache_shards[0] + ".col.meta";
std::unique_ptr<dmlc::Stream> fo(
dmlc::Stream::Create(col_meta_name.c_str(), "w"));
fo->Write(buffered_rowset_);
fo->Write(col_size_);
fo.reset(nullptr);
// initialize column data
CHECK(TryInitColData());
}
Expand Down
154 changes: 79 additions & 75 deletions src/data/sparse_page_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,58 +110,60 @@ void SparsePageSource::Create(dmlc::Parser<uint32_t>* src,
name_shards.push_back(prefix + ".row.page");
format_shards.push_back(SparsePage::Format::DecideFormat(prefix).first);
}
SparsePage::Writer writer(name_shards, format_shards, 6);
std::unique_ptr<SparsePage> page;
writer.Alloc(&page); page->Clear();

MetaInfo info;
size_t bytes_write = 0;
double tstart = dmlc::GetTime();
// print every 4 sec.
const double kStep = 4.0;
size_t tick_expected = kStep;

while (src->Next()) {
const dmlc::RowBlock<uint32_t>& batch = src->Value();
if (batch.label != nullptr) {
info.labels.insert(info.labels.end(), batch.label, batch.label + batch.size);
}
if (batch.weight != nullptr) {
info.weights.insert(info.weights.end(), batch.weight, batch.weight + batch.size);
}
info.num_row += batch.size;
info.num_nonzero += batch.offset[batch.size] - batch.offset[0];
for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) {
uint32_t index = batch.index[i];
info.num_col = std::max(info.num_col,
static_cast<uint64_t>(index + 1));
{
SparsePage::Writer writer(name_shards, format_shards, 6);
std::unique_ptr<SparsePage> page;
writer.Alloc(&page); page->Clear();

MetaInfo info;
size_t bytes_write = 0;
double tstart = dmlc::GetTime();
// print every 4 sec.
const double kStep = 4.0;
size_t tick_expected = kStep;

while (src->Next()) {
const dmlc::RowBlock<uint32_t>& batch = src->Value();
if (batch.label != nullptr) {
info.labels.insert(info.labels.end(), batch.label, batch.label + batch.size);
}
if (batch.weight != nullptr) {
info.weights.insert(info.weights.end(), batch.weight, batch.weight + batch.size);
}
info.num_row += batch.size;
info.num_nonzero += batch.offset[batch.size] - batch.offset[0];
for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) {
uint32_t index = batch.index[i];
info.num_col = std::max(info.num_col,
static_cast<uint64_t>(index + 1));
}
page->Push(batch);
if (page->MemCostBytes() >= kPageSize) {
bytes_write += page->MemCostBytes();
writer.PushWrite(std::move(page));
writer.Alloc(&page);
page->Clear();

double tdiff = dmlc::GetTime() - tstart;
if (tdiff >= tick_expected) {
LOG(CONSOLE) << "Writing row.page to " << cache_info << " in "
<< ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " written";
tick_expected += kStep;
}
}
}
page->Push(batch);
if (page->MemCostBytes() >= kPageSize) {
bytes_write += page->MemCostBytes();
writer.PushWrite(std::move(page));
writer.Alloc(&page);
page->Clear();

double tdiff = dmlc::GetTime() - tstart;
if (tdiff >= tick_expected) {
LOG(CONSOLE) << "Writing row.page to " << cache_info << " in "
<< ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " written";
tick_expected += kStep;
}
if (page->data.size() != 0) {
writer.PushWrite(std::move(page));
}
}

if (page->data.size() != 0) {
writer.PushWrite(std::move(page));
std::unique_ptr<dmlc::Stream> fo(
dmlc::Stream::Create(name_info.c_str(), "w"));
int tmagic = kMagic;
fo->Write(&tmagic, sizeof(tmagic));
info.SaveBinary(fo.get());
}

std::unique_ptr<dmlc::Stream> fo(
dmlc::Stream::Create(name_info.c_str(), "w"));
int tmagic = kMagic;
fo->Write(&tmagic, sizeof(tmagic));
info.SaveBinary(fo.get());
LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info;
}

Expand All @@ -176,38 +178,40 @@ void SparsePageSource::Create(DMatrix* src,
name_shards.push_back(prefix + ".row.page");
format_shards.push_back(SparsePage::Format::DecideFormat(prefix).first);
}
SparsePage::Writer writer(name_shards, format_shards, 6);
std::unique_ptr<SparsePage> page;
writer.Alloc(&page); page->Clear();

MetaInfo info;
size_t bytes_write = 0;
double tstart = dmlc::GetTime();
dmlc::DataIter<RowBatch>* iter = src->RowIterator();

while (iter->Next()) {
page->Push(iter->Value());
if (page->MemCostBytes() >= kPageSize) {
bytes_write += page->MemCostBytes();
{
SparsePage::Writer writer(name_shards, format_shards, 6);
std::unique_ptr<SparsePage> page;
writer.Alloc(&page); page->Clear();

MetaInfo info;
size_t bytes_write = 0;
double tstart = dmlc::GetTime();
dmlc::DataIter<RowBatch>* iter = src->RowIterator();

while (iter->Next()) {
page->Push(iter->Value());
if (page->MemCostBytes() >= kPageSize) {
bytes_write += page->MemCostBytes();
writer.PushWrite(std::move(page));
writer.Alloc(&page);
page->Clear();
double tdiff = dmlc::GetTime() - tstart;
LOG(CONSOLE) << "Writing to " << cache_info << " in "
<< ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " written";
}
}

if (page->data.size() != 0) {
writer.PushWrite(std::move(page));
writer.Alloc(&page);
page->Clear();
double tdiff = dmlc::GetTime() - tstart;
LOG(CONSOLE) << "Writing to " << cache_info << " in "
<< ((bytes_write >> 20UL) / tdiff) << " MB/s, "
<< (bytes_write >> 20UL) << " written";
}
}

if (page->data.size() != 0) {
writer.PushWrite(std::move(page));
std::unique_ptr<dmlc::Stream> fo(
dmlc::Stream::Create(name_info.c_str(), "w"));
int tmagic = kMagic;
fo->Write(&tmagic, sizeof(tmagic));
info.SaveBinary(fo.get());
}

std::unique_ptr<dmlc::Stream> fo(
dmlc::Stream::Create(name_info.c_str(), "w"));
int tmagic = kMagic;
fo->Write(&tmagic, sizeof(tmagic));
info.SaveBinary(fo.get());
LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info;
}

Expand Down
5 changes: 4 additions & 1 deletion src/data/sparse_page_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ SparsePage::Writer::Writer(
fo->Write(format_shard);
std::unique_ptr<SparsePage> page;
while (wqueue->Pop(&page)) {
if (page.get() == nullptr) break;
fmt->Write(*page, fo.get());
qrecycle_.Push(std::move(page));
}
Expand All @@ -45,7 +46,9 @@ SparsePage::Writer::Writer(

SparsePage::Writer::~Writer() {
for (auto& queue : qworkers_) {
queue.SignalForKill();
// use nullptr to signal termination.
std::unique_ptr<SparsePage> sig(nullptr);
queue.Push(std::move(sig));
}
for (auto& thread : workers_) {
thread->join();
Expand Down

0 comments on commit d816208

Please sign in to comment.