Skip to content

Commit

Permalink
KUDU-2844 (2/3): move RowBlock memory into a new RowBlockMemory struct
Browse files Browse the repository at this point in the history
This takes the Arena* member of RowBlock and moves it into a new
RowBlockMemory structure. The RowBlockMemory structure will later
be extended to include a list of reference-counted block handles.

Change-Id: I17a21f33f44988795ffe064b3ba41055e1a19e90
Reviewed-on: http://gerrit.cloudera.org:8080/15801
Reviewed-by: Andrew Wong <[email protected]>
Tested-by: Kudu Jenkins
  • Loading branch information
toddlipcon committed Aug 14, 2020
1 parent 7b832c9 commit fb0f4bc
Show file tree
Hide file tree
Showing 38 changed files with 351 additions and 257 deletions.
1 change: 1 addition & 0 deletions src/kudu/cfile/block_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "kudu/cfile/block_cache.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/common/rowblock_memory.h"

namespace kudu {
namespace cfile {
Expand Down
5 changes: 3 additions & 2 deletions src/kudu/cfile/cfile-test-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "kudu/cfile/cfile_reader.h"
#include "kudu/cfile/cfile_writer.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/columnblock-test-util.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stringprintf.h"
Expand Down Expand Up @@ -447,7 +448,7 @@ void TimeReadFileForDataType(CFileIterator* iter, int* count) {
ASSERT_OK_FAST(iter->CopyNextValues(&n, &ctx));
sum += FastSum<ScopedColumnBlock<Type>, SumType>(cb, n);
*count += n;
cb.arena()->Reset();
cb.memory()->Reset();
}
LOG(INFO)<< "Sum: " << sum;
LOG(INFO)<< "Count: " << *count;
Expand All @@ -469,7 +470,7 @@ void ReadBinaryFile(CFileIterator* iter, int* count) {
}
}
*count += n;
cb.arena()->Reset();
cb.memory()->Reset();
}
LOG(INFO) << "Sum of value lengths: " << sum_lens;
LOG(INFO) << "Count: " << *count;
Expand Down
57 changes: 31 additions & 26 deletions src/kudu/cfile/cfile-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@
#include "kudu/cfile/index_btree.h"
#include "kudu/cfile/type_encodings.h"
#include "kudu/common/column_materialization_context.h"
#include "kudu/common/columnblock-test-util.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowblock_memory.h"
#include "kudu/common/rowid.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
Expand All @@ -67,7 +69,6 @@
#include "kudu/util/int128.h"
#include "kudu/util/int128_util.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/metrics.h"
#include "kudu/util/nvm_cache.h"
#include "kudu/util/slice.h"
Expand All @@ -76,6 +77,10 @@
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

namespace kudu {
class Arena;
} // namespace kudu

DECLARE_bool(cfile_write_checksums);
DECLARE_bool(cfile_verify_checksums);
DECLARE_string(block_cache_type);
Expand Down Expand Up @@ -163,9 +168,11 @@ class TestCFile : public CFileTestBase {
ASSERT_OK(iter->SeekToOrdinal(0));
size_t fetched = 0;
while (fetched < 10000) {
ColumnBlock advancing_block(out.type_info(), nullptr,
ColumnBlock advancing_block(out.type_info(),
nullptr,
out.data() + (fetched * out.stride()),
out.nrows() - fetched, out.arena());
out.nrows() - fetched,
out.memory());
ColumnMaterializationContext adv_ctx = CreateNonDecoderEvalContext(&advancing_block, &sel);
ASSERT_TRUE(iter->HasNext());
size_t batch_size = random() % 5 + 1;
Expand Down Expand Up @@ -204,7 +211,7 @@ class TestCFile : public CFileTestBase {
unique_ptr<CFileIterator> iter;
ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));

Arena arena(8192);
RowBlockMemory mem;
ScopedColumnBlock<DataGeneratorType::kDataType> cb(10);

SelectionVector sel(10);
Expand Down Expand Up @@ -234,7 +241,7 @@ class TestCFile : public CFileTestBase {
ASSERT_EQ((*generator)[j], cb[j]);
}
}
cb.arena()->Reset();
cb.memory()->Reset();
read_offset += n;
}
}
Expand Down Expand Up @@ -431,11 +438,9 @@ INSTANTIATE_TEST_CASE_P(CacheMemoryTypes, TestCFileBothCacheMemoryTypes,
::testing::Values(Cache::MemoryType::DRAM,
Cache::MemoryType::NVM));

template<DataType type>
void CopyOne(CFileIterator *it,
typename TypeTraits<type>::cpp_type *ret,
Arena *arena) {
ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, arena);
template <DataType type>
void CopyOne(CFileIterator* it, typename TypeTraits<type>::cpp_type* ret, RowBlockMemory* mem) {
ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, mem);
SelectionVector sel(1);
ColumnMaterializationContext ctx(0, nullptr, &cb, &sel);
ctx.SetDecoderEvalNotSupported();
Expand Down Expand Up @@ -634,18 +639,18 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding,
unique_ptr<CFileIterator> iter;
ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));

Arena arena(1024);
RowBlockMemory mem;

ASSERT_OK(iter->SeekToOrdinal(5000));
ASSERT_EQ(5000u, iter->GetCurrentOrdinal());
ASSERT_EQ(5000, iter->GetCurrentOrdinal());
Slice s;

CopyOne<STRING>(iter.get(), &s, &arena);
CopyOne<STRING>(iter.get(), &s, &mem);
ASSERT_EQ(formatter(5000), s.ToString());

// Seek to last key exactly, should succeed
ASSERT_OK(iter->SeekToOrdinal(9999));
ASSERT_EQ(9999u, iter->GetCurrentOrdinal());
ASSERT_EQ(9999, iter->GetCurrentOrdinal());

// Seek to after last key. Should result in not found.
ASSERT_TRUE(iter->SeekToOrdinal(10000).IsNotFound());
Expand All @@ -662,56 +667,56 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding,
// (seek to "hello 0000.5" through "hello 9999.5")
string buf;
for (int i = 1; i < 10000; i++) {
arena.Reset();
mem.Reset();
buf = formatter(i - 1);
buf.append(".5");
s = Slice(buf);
EncodeStringKey(schema, s, &arena, &encoded_key);
EncodeStringKey(schema, s, &mem.arena, &encoded_key);
ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact));
ASSERT_FALSE(exact);
ASSERT_EQ(i, iter->GetCurrentOrdinal());
CopyOne<STRING>(iter.get(), &s, &arena);
CopyOne<STRING>(iter.get(), &s, &mem);
ASSERT_EQ(formatter(i), s.ToString());
}

// Seek exactly to each key
// (seek to "hello 0000" through "hello 9999")
for (int i = 0; i < 9999; i++) {
arena.Reset();
mem.Reset();
buf = formatter(i);
s = Slice(buf);
EncodeStringKey(schema, s, &arena, &encoded_key);
EncodeStringKey(schema, s, &mem.arena, &encoded_key);
ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact));
ASSERT_TRUE(exact);
ASSERT_EQ(i, iter->GetCurrentOrdinal());
Slice read_back;
CopyOne<STRING>(iter.get(), &read_back, &arena);
CopyOne<STRING>(iter.get(), &read_back, &mem);
ASSERT_EQ(read_back.ToString(), s.ToString());
}

// after last entry
// (seek to "hello 9999.x")
buf = formatter(9999) + ".x";
s = Slice(buf);
EncodeStringKey(schema, s, &arena, &encoded_key);
EncodeStringKey(schema, s, &mem.arena, &encoded_key);
EXPECT_TRUE(iter->SeekAtOrAfter(*encoded_key, &exact).IsNotFound());

// before first entry
// (seek to "hello 000", which falls before "hello 0000")
buf = formatter(0);
buf.resize(buf.size() - 1);
s = Slice(buf);
EncodeStringKey(schema, s, &arena, &encoded_key);
EncodeStringKey(schema, s, &mem.arena, &encoded_key);
ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact));
EXPECT_FALSE(exact);
EXPECT_EQ(0, iter->GetCurrentOrdinal());
CopyOne<STRING>(iter.get(), &s, &arena);
CopyOne<STRING>(iter.get(), &s, &mem);
EXPECT_EQ(formatter(0), s.ToString());

// Seek to start of file by ordinal
ASSERT_OK(iter->SeekToFirst());
ASSERT_EQ(0, iter->GetCurrentOrdinal());
CopyOne<STRING>(iter.get(), &s, &arena);
CopyOne<STRING>(iter.get(), &s, &mem);
ASSERT_EQ(formatter(0), s.ToString());

// Reseek to start and fetch all data.
Expand Down Expand Up @@ -850,9 +855,9 @@ TEST_P(TestCFileBothCacheMemoryTypes, TestDefaultColumnIter) {
// Test String Default Value
Slice str_data[kNumItems];
Slice str_value("Hello");
Arena arena(32*1024);
RowBlockMemory mem;
DefaultColumnValueIterator str_iter(GetTypeInfo(STRING), &str_value);
ColumnBlock str_col(GetTypeInfo(STRING), nullptr, str_data, kNumItems, &arena);
ColumnBlock str_col(GetTypeInfo(STRING), nullptr, str_data, kNumItems, &mem);
ColumnMaterializationContext str_ctx = CreateNonDecoderEvalContext(&str_col, &sel);
ASSERT_OK(str_iter.Scan(&str_ctx));
for (size_t i = 0; i < str_col.nrows(); ++i) {
Expand Down
9 changes: 4 additions & 5 deletions src/kudu/cfile/cfile_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
#include "kudu/common/column_materialization_context.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowblock_memory.h"
#include "kudu/common/types.h"
#include "kudu/gutil/port.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/memory/arena.h"

namespace kudu {
namespace cfile {
Expand All @@ -55,13 +55,12 @@ Status DumpIterator(const CFileReader& reader,
std::ostream* out,
int num_rows,
int indent) {

Arena arena(8192);
RowBlockMemory mem(8192);
uint8_t buf[kBufSize];
const TypeInfo *type = reader.type_info();
size_t max_rows = kBufSize/type->size();
uint8_t nulls[BitmapSize(max_rows)];
ColumnBlock cb(type, reader.is_nullable() ? nulls : nullptr, buf, max_rows, &arena);
ColumnBlock cb(type, reader.is_nullable() ? nulls : nullptr, buf, max_rows, &mem);
SelectionVector sel(max_rows);
ColumnMaterializationContext ctx(0, nullptr, &cb, &sel);
string strbuf;
Expand Down Expand Up @@ -93,7 +92,7 @@ Status DumpIterator(const CFileReader& reader,

*out << strbuf;
strbuf.clear();
arena.Reset();
mem.Reset();
count += n;
}

Expand Down
17 changes: 9 additions & 8 deletions src/kudu/cfile/encoding-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/cfile_util.h"
#include "kudu/cfile/type_encodings.h"
#include "kudu/common/columnblock-test-util.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/gutil/port.h"
Expand Down Expand Up @@ -73,20 +75,20 @@ namespace cfile {
class TestEncoding : public KuduTest {
public:
TestEncoding()
: arena_(1024) {
: memory_(1024) {
}

protected:
virtual void SetUp() OVERRIDE {
KuduTest::SetUp();
arena_.Reset();
memory_.Reset();
default_write_options_.storage_attributes.cfile_block_size = 256 * 1024;
}

template<DataType type>
void CopyOne(BlockDecoder *decoder,
typename TypeTraits<type>::cpp_type *ret) {
ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, &arena_);
ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, &memory_);
ColumnDataView cdv(&cb);
size_t n = 1;
ASSERT_OK(decoder->CopyNextValues(&n, &cdv));
Expand Down Expand Up @@ -461,7 +463,7 @@ class TestEncoding : public KuduTest {
vector<CppType> decoded;
decoded.resize(size);

ColumnBlock dst_block(GetTypeInfo(Type), nullptr, &decoded[0], size, &arena_);
ColumnBlock dst_block(GetTypeInfo(Type), nullptr, &decoded[0], size, &memory_);
ColumnDataView view(&dst_block);
int dec_count = 0;
while (bd->HasNext()) {
Expand Down Expand Up @@ -582,7 +584,7 @@ class TestEncoding : public KuduTest {
ColumnBlock dst_block(GetTypeInfo(IntType), nullptr,
&decoded[0],
to_insert.size(),
&arena_);
&memory_);
int dec_count = 0;
while (ibd->HasNext()) {
ASSERT_EQ((uint32_t)(dec_count), ibd->GetCurrentIndex());
Expand Down Expand Up @@ -666,7 +668,7 @@ class TestEncoding : public KuduTest {
ColumnBlock dst_block(GetTypeInfo(BOOL), nullptr,
&decoded[0],
to_insert.size(),
&arena_);
&memory_);

int dec_count = 0;
while (bd->HasNext()) {
Expand Down Expand Up @@ -704,8 +706,7 @@ class TestEncoding : public KuduTest {
}
}

Arena arena_;
faststring contiguous_buf_;
RowBlockMemory memory_;
WriterOptions default_write_options_;
};

Expand Down
17 changes: 9 additions & 8 deletions src/kudu/codegen/codegen-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/row.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/singleton.h"
Expand Down Expand Up @@ -66,7 +67,7 @@ class CodegenTest : public KuduTest {
CodegenTest()
: random_(SeedRandom()),
// Set the initial Arena size as small as possible to catch errors during relocation.
projections_arena_(16) {
projections_mem_(16) {
// Create the base schema.
vector<ColumnSchema> cols = { ColumnSchema("key ", UINT64, false),
ColumnSchema("int32 ", INT32, false),
Expand Down Expand Up @@ -138,7 +139,7 @@ class CodegenTest : public KuduTest {

private:
// Projects the test rows into parameter rowblock using projector and
// member projections_arena_ (should be Reset() manually).
// member projections_mem_ (should be Reset() manually).
template<bool READ, class RowProjectorType>
void ProjectTestRows(RowProjectorType* rp, RowBlock* rb);
void AddRandomString(RowBuilder* rb);
Expand All @@ -153,7 +154,7 @@ class CodegenTest : public KuduTest {
codegen::CodeGenerator generator_;
Random random_;
unique_ptr<ConstContiguousRow> test_rows_[kNumTestRows];
Arena projections_arena_;
RowBlockMemory projections_mem_;
unique_ptr<Arena> test_rows_arena_;
};

Expand Down Expand Up @@ -203,9 +204,9 @@ void CodegenTest::ProjectTestRows(RowProjectorType* rp, RowBlock* rb) {
ConstContiguousRow src = *test_rows_[i];
RowBlockRow dst = rb->row(i);
if (READ) {
CHECK_OK(rp->ProjectRowForRead(src, &dst, &projections_arena_));
CHECK_OK(rp->ProjectRowForRead(src, &dst, rb->arena()));
} else {
CHECK_OK(rp->ProjectRowForWrite(src, &dst, &projections_arena_));
CHECK_OK(rp->ProjectRowForWrite(src, &dst, rb->arena()));
}
}
}
Expand All @@ -220,10 +221,10 @@ void CodegenTest::TestProjection(const Schema* proj) {
CHECK_EQ(with->base_schema(), &base_);
CHECK_EQ(with->projection(), proj);

RowBlock rb_with(proj, kNumTestRows, &projections_arena_);
RowBlock rb_without(proj, kNumTestRows, &projections_arena_);
RowBlock rb_with(proj, kNumTestRows, &projections_mem_);
RowBlock rb_without(proj, kNumTestRows, &projections_mem_);

projections_arena_.Reset();
projections_mem_.Reset();
ProjectTestRows<READ>(with.get(), &rb_with);
ProjectTestRows<READ>(&without, &rb_without);
CheckRowBlocksEqual(&rb_with, &rb_without, "Codegen", "Expected");
Expand Down
2 changes: 1 addition & 1 deletion src/kudu/common/column_predicate-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/common/columnblock.h"
#include "kudu/common/columnblock-test-util.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/schema.h"
Expand Down
Loading

0 comments on commit fb0f4bc

Please sign in to comment.