Skip to content

Commit

Permalink
f1 parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
wjblanke authored and hoffmang9 committed Oct 23, 2020
1 parent f51fef6 commit a3491d4
Showing 1 changed file with 103 additions and 18 deletions.
121 changes: 103 additions & 18 deletions src/phase1.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ struct THREADDATA {
std::vector<FileDisk>* ptmp_1_disks;
};

struct THREADF1DATA {
int index;
Sem::type* mine;
Sem::type* theirs;
uint8_t k;
const uint8_t* id;
};

struct GlobalData {
uint64_t left_writer_count;
uint64_t right_writer_count;
Expand Down Expand Up @@ -140,8 +148,8 @@ void* phase1_thread(THREADDATA* ptd)
// Streams to read and right to tables. We will have handles to two tables. We will
// read through the left table, compute matches, and evaluate f for matching entries,
// writing results to the right table.
uint64_t left_buf_entries = 5000 + (uint64_t)((1.1)*(globals.stripe_size));
uint64_t right_buf_entries = 5000 + (uint64_t)((1.1)*(globals.stripe_size));
uint64_t left_buf_entries = 5000 + (uint64_t)((1.1) * (globals.stripe_size));
uint64_t right_buf_entries = 5000 + (uint64_t)((1.1) * (globals.stripe_size));
uint8_t* right_writer_buf = new uint8_t[right_buf_entries * right_entry_size_bytes + 7]();
uint8_t* left_writer_buf = new uint8_t[left_buf_entries * compressed_entry_size_bytes]();

Expand Down Expand Up @@ -190,8 +198,8 @@ void* phase1_thread(THREADDATA* ptd)
bool bStripePregamePair = false;
bool bStripeStartPair = false;
bool need_new_bucket = false;
bool first_thread = ptd->index % globals.num_threads == 0;
bool last_thread = ptd->index % globals.num_threads == globals.num_threads - 1;
bool first_thread = ptd->index % globals.num_threads == 0;
bool last_thread = ptd->index % globals.num_threads == globals.num_threads - 1;

uint64_t L_position_base = 0;
uint64_t R_position_base = 0;
Expand Down Expand Up @@ -544,6 +552,63 @@ void* phase1_thread(THREADDATA* ptd)
return 0;
}

void* F1thread(THREADF1DATA* ptd)
{
uint8_t k = ptd->k;
uint32_t entry_size_bytes = 16;

uint64_t max_value = ((uint64_t)1 << (k));

uint64_t right_buf_entries = 1 << (kBatchSizes);

uint64_t* f1_entries = (uint64_t*)malloc((1 << kBatchSizes) * sizeof(*f1_entries));

F1Calculator f1(k, ptd->id);

uint8_t* right_writer_buf = new uint8_t[right_buf_entries * entry_size_bytes];

// Instead of computing f1(1), f1(2), etc, for each x, we compute them in batches
// to increase CPU efficency.
for (uint64_t lp = ptd->index; lp <= (((uint64_t)1) << (k - kBatchSizes));
lp = lp + globals.num_threads) { // globals.num_threads) {
// For each pair x, y in the batch

uint64_t right_writer_count = 0;
uint64_t x = lp * (1 << (kBatchSizes));

uint64_t loopcount = std::min(max_value - x, (uint64_t)1 << (kBatchSizes));

// Instead of computing f1(1), f1(2), etc, for each x, we compute them in batches
// to increase CPU efficency.
f1.CalculateBuckets(x, loopcount, f1_entries);
for (uint32_t i = 0; i < loopcount; i++) {
uint8_t to_write[16];
uint128_t entry;

entry = (uint128_t)f1_entries[i] << (128 - kExtraBits - k);
entry |= (uint128_t)x << (128 - kExtraBits - 2 * k);
Util::IntTo16Bytes(to_write, entry);
memcpy(&(right_writer_buf[i * entry_size_bytes]), to_write, 16);
right_writer_count++;
x++;
}

Sem::Wait(ptd->theirs);

// Write it out
for (uint32_t i = 0; i < right_writer_count; i++) {
globals.L_sort_manager->AddToCache(&(right_writer_buf[i * entry_size_bytes]));
}

Sem::Post(ptd->mine);
}

free(f1_entries);
delete[] right_writer_buf;

return 0;
}

// This is Phase 1, or forward propagation. During this phase, all of the 7 tables,
// and f functions, are evaluated. The result is an intermediate plot file, that is
// several times larger than what the final file will be, but that has all of the
Expand All @@ -570,7 +635,6 @@ std::vector<uint64_t> RunPhase1(
F1Calculator f1(k, id);
uint64_t x = 0;
uint64_t prevtableentries = 0;
uint64_t *f1_entries = new uint64_t[1 << kBatchSizes];

uint32_t t1_entry_size_bytes = EntrySizes::GetMaxEntrySize(k, 1, true);
globals.L_sort_manager = std::make_unique<SortManager>(
Expand All @@ -588,23 +652,44 @@ std::vector<uint64_t> RunPhase1(
// many elements are in each bucket.
std::vector<uint64_t> table_sizes = std::vector<uint64_t>(8, 0);

// Instead of computing f1(1), f1(2), etc, for each x, we compute them in batches
// to increase CPU efficency.
for (uint64_t lp = 0; lp < (uint64_t)1 << (k - kBatchSizes); lp++) {
f1.CalculateBuckets(x, 1 << kBatchSizes, f1_entries);
for (int i = 0; i < 1 << kBatchSizes; i++) {
uint8_t to_write[16];
uint128_t entry;
{
// Start of parallel execution
THREADF1DATA* td = new THREADF1DATA[num_threads];
auto* mutex = new Sem::type[num_threads];

entry = (uint128_t)f1_entries[i] << (128 - kExtraBits - k);
entry |= (uint128_t)x << (128 - kExtraBits - 2 * k);
Util::IntTo16Bytes(to_write, entry);
globals.L_sort_manager->AddToCache(to_write);
x++;
std::vector<std::thread> threads;

for (int i = 0; i < num_threads; i++) {
mutex[i] = Sem::Create();
}

for (int i = 0; i < globals.num_threads; i++) {
td[i].index = i;
td[i].mine = &mutex[i];
td[i].theirs = &mutex[(globals.num_threads + i - 1) % globals.num_threads];

td[i].k = k;
td[i].id = id;

threads.emplace_back(F1thread, &td[i]);
}
Sem::Post(&mutex[globals.num_threads - 1]);

for (auto& t : threads) {
t.join();
}

for (int i = 0; i < globals.num_threads; i++) {
Sem::Destroy(mutex[i]);
}

delete[] td;
delete[] mutex;

// end of parallel execution
}

prevtableentries = 1ULL << k;
delete[] f1_entries;
f1_start_time.PrintElapsed("F1 complete, time:");
globals.L_sort_manager->FlushCache();
table_sizes[1] = x + 1;
Expand Down

0 comments on commit a3491d4

Please sign in to comment.