diff --git a/src/phase1.hpp b/src/phase1.hpp index 71b73318e..68fc0a051 100644 --- a/src/phase1.hpp +++ b/src/phase1.hpp @@ -63,6 +63,14 @@ struct THREADDATA { std::vector* ptmp_1_disks; }; +struct THREADF1DATA { + int index; + Sem::type* mine; + Sem::type* theirs; + uint8_t k; + const uint8_t* id; +}; + struct GlobalData { uint64_t left_writer_count; uint64_t right_writer_count; @@ -140,8 +148,8 @@ void* phase1_thread(THREADDATA* ptd) // Streams to read and right to tables. We will have handles to two tables. We will // read through the left table, compute matches, and evaluate f for matching entries, // writing results to the right table. - uint64_t left_buf_entries = 5000 + (uint64_t)((1.1)*(globals.stripe_size)); - uint64_t right_buf_entries = 5000 + (uint64_t)((1.1)*(globals.stripe_size)); + uint64_t left_buf_entries = 5000 + (uint64_t)((1.1) * (globals.stripe_size)); + uint64_t right_buf_entries = 5000 + (uint64_t)((1.1) * (globals.stripe_size)); uint8_t* right_writer_buf = new uint8_t[right_buf_entries * right_entry_size_bytes + 7](); uint8_t* left_writer_buf = new uint8_t[left_buf_entries * compressed_entry_size_bytes](); @@ -190,8 +198,8 @@ void* phase1_thread(THREADDATA* ptd) bool bStripePregamePair = false; bool bStripeStartPair = false; bool need_new_bucket = false; - bool first_thread = ptd->index % globals.num_threads == 0; - bool last_thread = ptd->index % globals.num_threads == globals.num_threads - 1; + bool first_thread = ptd->index % globals.num_threads == 0; + bool last_thread = ptd->index % globals.num_threads == globals.num_threads - 1; uint64_t L_position_base = 0; uint64_t R_position_base = 0; @@ -544,6 +552,63 @@ void* phase1_thread(THREADDATA* ptd) return 0; } +void* F1thread(THREADF1DATA* ptd) +{ + uint8_t k = ptd->k; + uint32_t entry_size_bytes = 16; + + uint64_t max_value = ((uint64_t)1 << (k)); + + uint64_t right_buf_entries = 1 << (kBatchSizes); + + uint64_t* f1_entries = (uint64_t*)malloc((1 << kBatchSizes) * sizeof(*f1_entries)); + + F1Calculator f1(k, ptd->id); + + uint8_t* right_writer_buf = new uint8_t[right_buf_entries * entry_size_bytes]; + + // Instead of computing f1(1), f1(2), etc, for each x, we compute them in batches + // to increase CPU efficency. + for (uint64_t lp = ptd->index; lp <= (((uint64_t)1) << (k - kBatchSizes)); + lp = lp + globals.num_threads) { // globals.num_threads) { + // For each pair x, y in the batch + + uint64_t right_writer_count = 0; + uint64_t x = lp * (1 << (kBatchSizes)); + + uint64_t loopcount = std::min(max_value - x, (uint64_t)1 << (kBatchSizes)); + + // Instead of computing f1(1), f1(2), etc, for each x, we compute them in batches + // to increase CPU efficency. + f1.CalculateBuckets(x, loopcount, f1_entries); + for (uint32_t i = 0; i < loopcount; i++) { + uint8_t to_write[16]; + uint128_t entry; + + entry = (uint128_t)f1_entries[i] << (128 - kExtraBits - k); + entry |= (uint128_t)x << (128 - kExtraBits - 2 * k); + Util::IntTo16Bytes(to_write, entry); + memcpy(&(right_writer_buf[i * entry_size_bytes]), to_write, 16); + right_writer_count++; + x++; + } + + Sem::Wait(ptd->theirs); + + // Write it out + for (uint32_t i = 0; i < right_writer_count; i++) { + globals.L_sort_manager->AddToCache(&(right_writer_buf[i * entry_size_bytes])); + } + + Sem::Post(ptd->mine); + } + + free(f1_entries); + delete[] right_writer_buf; + + return 0; +} + // This is Phase 1, or forward propagation. During this phase, all of the 7 tables, // and f functions, are evaluated. The result is an intermediate plot file, that is // several times larger than what the final file will be, but that has all of the @@ -570,7 +635,6 @@ std::vector RunPhase1( F1Calculator f1(k, id); uint64_t x = 0; uint64_t prevtableentries = 0; - uint64_t *f1_entries = new uint64_t[1 << kBatchSizes]; uint32_t t1_entry_size_bytes = EntrySizes::GetMaxEntrySize(k, 1, true); globals.L_sort_manager = std::make_unique( @@ -588,23 +652,44 @@ std::vector RunPhase1( // many elements are in each bucket. std::vector table_sizes = std::vector(8, 0); - // Instead of computing f1(1), f1(2), etc, for each x, we compute them in batches - // to increase CPU efficency. - for (uint64_t lp = 0; lp < (uint64_t)1 << (k - kBatchSizes); lp++) { - f1.CalculateBuckets(x, 1 << kBatchSizes, f1_entries); - for (int i = 0; i < 1 << kBatchSizes; i++) { - uint8_t to_write[16]; - uint128_t entry; + { + // Start of parallel execution + THREADF1DATA* td = new THREADF1DATA[num_threads]; + auto* mutex = new Sem::type[num_threads]; - entry = (uint128_t)f1_entries[i] << (128 - kExtraBits - k); - entry |= (uint128_t)x << (128 - kExtraBits - 2 * k); - Util::IntTo16Bytes(to_write, entry); - globals.L_sort_manager->AddToCache(to_write); - x++; + std::vector threads; + + for (int i = 0; i < num_threads; i++) { + mutex[i] = Sem::Create(); + } + + for (int i = 0; i < globals.num_threads; i++) { + td[i].index = i; + td[i].mine = &mutex[i]; + td[i].theirs = &mutex[(globals.num_threads + i - 1) % globals.num_threads]; + + td[i].k = k; + td[i].id = id; + + threads.emplace_back(F1thread, &td[i]); + } + Sem::Post(&mutex[globals.num_threads - 1]); + + for (auto& t : threads) { + t.join(); } + + for (int i = 0; i < globals.num_threads; i++) { + Sem::Destroy(mutex[i]); + } + + delete[] td; + delete[] mutex; + + // end of parallel execution } + prevtableentries = 1ULL << k; - delete[] f1_entries; f1_start_time.PrintElapsed("F1 complete, time:"); globals.L_sort_manager->FlushCache(); table_sizes[1] = x + 1;