Skip to content

Commit

Permalink
use uniqe_ptr<> instead of raw pointers to hold arrays, simplifies me…
Browse files Browse the repository at this point in the history
…mory management. Remove unused allocation in phase3
  • Loading branch information
arvidn authored and hoffmang9 committed Oct 27, 2020
1 parent a3491d4 commit b42b07f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 26 deletions.
33 changes: 12 additions & 21 deletions src/phase1.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,26 +150,24 @@ void* phase1_thread(THREADDATA* ptd)
// writing results to the right table.
uint64_t left_buf_entries = 5000 + (uint64_t)((1.1) * (globals.stripe_size));
uint64_t right_buf_entries = 5000 + (uint64_t)((1.1) * (globals.stripe_size));
uint8_t* right_writer_buf = new uint8_t[right_buf_entries * right_entry_size_bytes + 7]();
uint8_t* left_writer_buf = new uint8_t[left_buf_entries * compressed_entry_size_bytes]();
std::unique_ptr<uint8_t[]> right_writer_buf(new uint8_t[right_buf_entries * right_entry_size_bytes + 7]);
std::unique_ptr<uint8_t[]> left_writer_buf(new uint8_t[left_buf_entries * compressed_entry_size_bytes]);

FxCalculator f(k, table_index + 1);

// Stores map of old positions to new positions (positions after dropping entries from L
// table that did not match) Map ke
uint16_t position_map_size = 2000;

uint16_t* L_position_map =
new uint16_t[position_map_size](); // Should comfortably fit 2 buckets worth of items
uint16_t* R_position_map = new uint16_t[position_map_size]();
// Should comfortably fit 2 buckets worth of items
std::unique_ptr<uint16_t[]> L_position_map(new uint16_t[position_map_size]);
std::unique_ptr<uint16_t[]> R_position_map(new uint16_t[position_map_size]);

// Start at left table pos = 0 and iterate through the whole table. Note that the left table
// will already be sorted by y
uint64_t totalstripes = (prevtableentries + globals.stripe_size - 1) / globals.stripe_size;
uint64_t threadstripes = (totalstripes + globals.num_threads - 1) / globals.num_threads;

std::vector<Bits> to_write_R_entries;

for (uint64_t stripe = 0; stripe < threadstripes; stripe++) {
uint64_t pos = (stripe * globals.num_threads + ptd->index) * globals.stripe_size;
uint64_t endpos = pos + globals.stripe_size + 1; // one y value overlap
Expand Down Expand Up @@ -327,9 +325,7 @@ void* phase1_thread(THREADDATA* ptd)
// one for L bucket and one for R bucket, and we cycle through them. Map
// keys are stored as positions % 2^10 for efficiency. Map values are stored
// as offsets from the base position for that bucket, for efficiency.
uint16_t* tmp = L_position_map;
L_position_map = R_position_map;
R_position_map = tmp;
std::swap(L_position_map, R_position_map);
L_position_base = R_position_base;
R_position_base = stripe_left_writer_count;

Expand All @@ -349,7 +345,7 @@ void* phase1_thread(THREADDATA* ptd)
throw InvalidStateException("Left writer count overrun");
}
uint8_t* tmp_buf =
left_writer_buf + left_writer_count * compressed_entry_size_bytes;
left_writer_buf.get() + left_writer_count * compressed_entry_size_bytes;

left_writer_count++;
// memset(tmp_buf, 0xff, compressed_entry_size_bytes);
Expand Down Expand Up @@ -451,7 +447,7 @@ void* phase1_thread(THREADDATA* ptd)

if (bStripeStartPair) {
uint8_t* right_buf =
right_writer_buf + right_writer_count * right_entry_size_bytes;
right_writer_buf.get() + right_writer_count * right_entry_size_bytes;
new_entry.ToBytes(right_buf);
right_writer_count++;
}
Expand Down Expand Up @@ -510,7 +506,7 @@ void* phase1_thread(THREADDATA* ptd)
// Correct positions
for (uint32_t i = 0; i < right_writer_count; i++) {
uint64_t posaccum = 0;
uint8_t* entrybuf = right_writer_buf + i * right_entry_size_bytes;
uint8_t* entrybuf = right_writer_buf.get() + i * right_entry_size_bytes;

for (uint32_t j = startbyte; j <= endbyte; j++) {
posaccum = (posaccum << 8) | (entrybuf[j]);
Expand All @@ -523,32 +519,27 @@ void* phase1_thread(THREADDATA* ptd)
}
if (table_index < 6) {
for (uint64_t i = 0; i < right_writer_count; i++) {
globals.R_sort_manager->AddToCache(right_writer_buf + i * right_entry_size_bytes);
globals.R_sort_manager->AddToCache(right_writer_buf.get() + i * right_entry_size_bytes);
}
} else {
// Writes out the right table for table 7
(*ptmp_1_disks)[table_index + 1].Write(
globals.right_writer,
right_writer_buf,
right_writer_buf.get(),
right_writer_count * right_entry_size_bytes);
}
globals.right_writer += right_writer_count * right_entry_size_bytes;
globals.right_writer_count += right_writer_count;

(*ptmp_1_disks)[table_index].Write(
globals.left_writer, left_writer_buf, left_writer_count * compressed_entry_size_bytes);
globals.left_writer, left_writer_buf.get(), left_writer_count * compressed_entry_size_bytes);
globals.left_writer += left_writer_count * compressed_entry_size_bytes;
globals.left_writer_count += left_writer_count;

globals.matches += matches;
Sem::Post(ptd->mine);
}

delete[] L_position_map;
delete[] R_position_map;

delete[] right_writer_buf;
delete[] left_writer_buf;
return 0;
}

Expand Down
7 changes: 2 additions & 5 deletions src/phase3.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ Phase3Results RunPhase3(

uint8_t *right_entry_buf;
uint8_t *left_entry_disk_buf = left_reader_buf;
uint8_t *left_entry_buf_sm = new uint8_t[left_entry_size_bytes];

uint64_t entry_sort_key, entry_pos, entry_offset;
uint64_t cached_entry_sort_key = 0;
Expand Down Expand Up @@ -282,7 +281,7 @@ Phase3Results RunPhase3(
greatest_pos = entry_pos + entry_offset;
}
if (entry_pos == current_pos) {
uint64_t old_write_pos = entry_pos % kReadMinusWrite;
uint64_t const old_write_pos = entry_pos % kReadMinusWrite;
old_sort_keys[old_write_pos][old_counters[old_write_pos]] = entry_sort_key;
old_offsets[old_write_pos][old_counters[old_write_pos]] =
(entry_pos + entry_offset);
Expand Down Expand Up @@ -331,7 +330,7 @@ Phase3Results RunPhase3(
}
}

uint64_t write_pointer_pos = current_pos - kReadMinusWrite + 1;
uint64_t const write_pointer_pos = current_pos - kReadMinusWrite + 1;

// Rewrites each right entry as (line_point, sort_key)
if (current_pos + 1 >= kReadMinusWrite) {
Expand Down Expand Up @@ -377,8 +376,6 @@ Phase3Results RunPhase3(
// Flush cache so all entries are written to buckets
R_sort_manager->FlushCache();

delete[] left_entry_buf_sm;

Timer computation_pass_2_timer;

// The memory will be used like this, with most memory allocated towards the
Expand Down

0 comments on commit b42b07f

Please sign in to comment.