Skip to content

Commit

Permalink
Modeling Support for CREATE INDEX (cmu-db#1153)
Browse files Browse the repository at this point in the history
  • Loading branch information
17zhangw authored Sep 8, 2020
1 parent 7724571 commit fa7fecd
Show file tree
Hide file tree
Showing 22 changed files with 425 additions and 76 deletions.
271 changes: 245 additions & 26 deletions benchmark/runner/mini_runners.cpp

Large diffs are not rendered by default.

7 changes: 2 additions & 5 deletions script/model/global_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,14 @@ class GlobalTrainer:
Trainer for the mini models
"""

def __init__(self, input_path, model_results_path, ml_models, test_ratio, impact_model_ratio, mini_model_map, warmup_period, simulate_cache, tpcc_hack):
def __init__(self, input_path, model_results_path, ml_models, test_ratio, impact_model_ratio, mini_model_map, warmup_period, tpcc_hack):
self.input_path = input_path
self.model_results_path = model_results_path
self.ml_models = ml_models
self.test_ratio = test_ratio
self.impact_model_ratio = impact_model_ratio
self.mini_model_map = mini_model_map
self.warmup_period = warmup_period
self.simulate_cache = simulate_cache
self.tpcc_hack = tpcc_hack

def train(self):
Expand All @@ -106,7 +105,6 @@ def train(self):
self.mini_model_map,
self.model_results_path,
self.warmup_period,
self.simulate_cache,
self.tpcc_hack)

return self._train_global_models(resource_data_list, impact_data_list)
Expand Down Expand Up @@ -216,7 +214,6 @@ def _train_model_with_derived_data(self, impact_data_list, model_name):
aparser.add_argument('--impact_model_ratio', type=float, default=0.1,
help='Sample ratio to train the global impact model')
aparser.add_argument('--warmup_period', type=float, default=3, help='OLTPBench warmup period')
aparser.add_argument('--simulate_cache', default=False, help='Should simulate cache at 0.4')
aparser.add_argument('--tpcc_hack', default=False, help='Should do feature correction for TPCC')
aparser.add_argument('--log', default='info', help='The logging level')
args = aparser.parse_args()
Expand All @@ -228,7 +225,7 @@ def _train_model_with_derived_data(self, impact_data_list, model_name):
with open(args.mini_model_file, 'rb') as pickle_file:
model_map = pickle.load(pickle_file)
trainer = GlobalTrainer(args.input_path, args.model_results_path, args.ml_models, args.test_ratio,
args.impact_model_ratio, model_map, args.warmup_period, args.simulate_cache, args.tpcc_hack)
args.impact_model_ratio, model_map, args.warmup_period, args.tpcc_hack)
resource_model, impact_model, direct_model = trainer.train()
with open(args.save_path + '/global_resource_model.pickle', 'wb') as file:
pickle.dump(resource_model, file)
Expand Down
2 changes: 1 addition & 1 deletion script/model/mini_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def train(self):
help='Prediction results of the mini models')
aparser.add_argument('--save_path', default='trained_model', help='Path to save the mini models')
aparser.add_argument('--ml_models', nargs='*', type=str,
default=["lr", "rf", "gbm"],
default=["lr", "rf", "nn", 'huber', 'svr', 'kr', 'gbm'],
help='ML models for the mini trainer to evaluate')
aparser.add_argument('--test_ratio', type=float, default=0.2, help='Test data split ratio')
aparser.add_argument('--trim', default=0.2, type=float, help='% of values to remove from both top and bottom')
Expand Down
2 changes: 2 additions & 0 deletions script/model/training_util/data_transforming_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def _tuple_num_log_predict_transform(x, y):

OpUnit.IDX_SCAN: _tuple_num_log_transformer,
OpUnit.SORT_BUILD: _tuple_num_linear_log_transformer,
OpUnit.CREATE_INDEX: _tuple_num_linear_log_transformer,

OpUnit.AGG_BUILD: _tuple_num_memory_cardinality_linear_transformer,
}
Expand Down Expand Up @@ -146,6 +147,7 @@ def _tuple_num_cardinality_linear_train_transform(x):
OpUnit.OP_DECIMAL_COMPARE: None,
OpUnit.OUTPUT: None,
OpUnit.IDX_SCAN: None,
OpUnit.CREATE_INDEX: None,

# Transform the opunits for which the ratio between the tuple num and the cardinality impacts the behavior
OpUnit.HASHJOIN_BUILD: _tuple_num_cardinality_linear_train_transform,
Expand Down
33 changes: 5 additions & 28 deletions script/model/training_util/global_data_constructing_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from type import Target, OpUnit, ConcurrentCountingMode


def get_data(input_path, mini_model_map, model_results_path, warmup_period, simulate_cache, tpcc_hack):
def get_data(input_path, mini_model_map, model_results_path, warmup_period, tpcc_hack):
"""Get the data for the global models
Read from the cache if exists, otherwise save the constructed data to the cache.
Expand All @@ -30,7 +30,7 @@ def get_data(input_path, mini_model_map, model_results_path, warmup_period, simu
with open(cache_file, 'rb') as pickle_file:
resource_data_list, impact_data_list = pickle.load(pickle_file)
else:
data_list = _get_grouped_opunit_data_with_prediction(input_path, mini_model_map, model_results_path, warmup_period, simulate_cache, tpcc_hack)
data_list = _get_grouped_opunit_data_with_prediction(input_path, mini_model_map, model_results_path, warmup_period, tpcc_hack)
resource_data_list, impact_data_list = _construct_interval_based_global_model_data(data_list,
model_results_path)
with open(cache_file, 'wb') as file:
Expand All @@ -39,7 +39,7 @@ def get_data(input_path, mini_model_map, model_results_path, warmup_period, simu
return resource_data_list, impact_data_list


def _get_grouped_opunit_data_with_prediction(input_path, mini_model_map, model_results_path, warmup_period, simulate_cache, tpcc_hack):
def _get_grouped_opunit_data_with_prediction(input_path, mini_model_map, model_results_path, warmup_period, tpcc_hack):
"""Get the grouped opunit data with the predicted metrics and elapsed time
:param input_path: input data file path
Expand All @@ -49,7 +49,7 @@ def _get_grouped_opunit_data_with_prediction(input_path, mini_model_map, model_r
:return: The list of the GroupedOpUnitData objects
"""
data_list = _get_data_list(input_path, warmup_period, tpcc_hack)
_predict_grouped_opunit_data(data_list, mini_model_map, model_results_path, simulate_cache)
_predict_grouped_opunit_data(data_list, mini_model_map, model_results_path)
logging.info("Finished GroupedOpUnitData prediction with the mini models")
return data_list

Expand Down Expand Up @@ -191,7 +191,7 @@ def _get_data_list(input_path, warmup_period, tpcc_hack):
return data_list


def _predict_grouped_opunit_data(data_list, mini_model_map, model_results_path, simulate_cache):
def _predict_grouped_opunit_data(data_list, mini_model_map, model_results_path):
"""Use the mini-runner to predict the resource consumptions for all the GlobalData, and record the prediction
result in place
Expand Down Expand Up @@ -222,20 +222,10 @@ def _predict_grouped_opunit_data(data_list, mini_model_map, model_results_path,
prediction_cache = {}

# First run a prediction on the global running data with the mini model results
last_pipeline = None
for i, data in enumerate(tqdm.tqdm(data_list, desc="Predict GroupedOpUnitData")):
y = data.y
logging.debug("{} pipeline elapsed time: {}".format(data.name, y[-1]))

# Hack for "cache-ness"
should_mult = False
if i == 0:
last_pipeline = data.name
elif last_pipeline != data.name:
last_pipeline = data.name
else:
should_mult = True

pipeline_y_pred = 0
x = None
for opunit_feature in data.opunit_features:
Expand Down Expand Up @@ -283,19 +273,6 @@ def _predict_grouped_opunit_data(data_list, mini_model_map, model_results_path,
pipeline_y_pred += y_pred[0]

pipeline_y = copy.deepcopy(pipeline_y_pred)
if should_mult and simulate_cache:
# Scale elapsed time by 40% (this is a hack)
fields = [
data_info.TARGET_CSV_INDEX[Target.CPU_CYCLE],
data_info.TARGET_CSV_INDEX[Target.CACHE_MISS],
data_info.TARGET_CSV_INDEX[Target.CPU_TIME],
data_info.TARGET_CSV_INDEX[Target.ELAPSED_US]

# Don't for instructions, cache ref, memory
]

for field in fields:
pipeline_y[field ] = pipeline_y[field] * 0.4

# Grouping if we're predicting queries
if "tpch" in data.name:
Expand Down
1 change: 1 addition & 0 deletions script/model/type.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class OpUnit(enum.IntEnum):
INSERT = 23,
UPDATE = 24,
DELETE = 25,
CREATE_INDEX = 26,


class ArithmeticFeature(enum.Enum):
Expand Down
2 changes: 2 additions & 0 deletions src/brain/brain_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ std::string BrainUtil::ExecutionOperatingUnitTypeToString(ExecutionOperatingUnit
return "OUTPUT";
case ExecutionOperatingUnitType::LIMIT:
return "LIMIT";
case ExecutionOperatingUnitType::CREATE_INDEX:
return "CREATE_INDEX";
default:
UNREACHABLE("Undefined ExecutionOperatingUnitType encountered");
break;
Expand Down
37 changes: 32 additions & 5 deletions src/brain/operating_unit_recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,15 @@ size_t OperatingUnitRecorder::ComputeKeySize(catalog::table_oid_t tbl_oid, const
return key_size;
}

size_t OperatingUnitRecorder::ComputeKeySize(catalog::index_oid_t idx_oid,
const std::vector<catalog::indexkeycol_oid_t> &cols, size_t *num_key) {
size_t OperatingUnitRecorder::ComputeKeySize(common::ManagedPointer<const catalog::IndexSchema> schema,
bool restrict_cols, const std::vector<catalog::indexkeycol_oid_t> &cols,
size_t *num_key) {
std::unordered_set<catalog::indexkeycol_oid_t> kcols;
for (auto &col : cols) kcols.insert(col);

size_t key_size = 0;
auto &schema = accessor_->GetIndexSchema(idx_oid);
for (auto &col : schema.GetColumns()) {
if (kcols.find(col.Oid()) != kcols.end()) {
for (auto &col : schema->GetColumns()) {
if (!restrict_cols || kcols.find(col.Oid()) != kcols.end()) {
AdjustKeyWithType(col.Type(), &key_size, num_key);
}
}
Expand All @@ -152,6 +152,14 @@ size_t OperatingUnitRecorder::ComputeKeySize(catalog::index_oid_t idx_oid,
return key_size;
}

size_t OperatingUnitRecorder::ComputeKeySize(catalog::index_oid_t idx_oid,
const std::vector<catalog::indexkeycol_oid_t> &cols, size_t *num_key) {
auto &schema = accessor_->GetIndexSchema(idx_oid);
auto key_size = ComputeKeySize(common::ManagedPointer(&schema), true, cols, num_key);
TERRIER_ASSERT(key_size > 0, "KeySize must be greater than 0");
return key_size;
}

void OperatingUnitRecorder::AggregateFeatures(brain::ExecutionOperatingUnitType type, size_t key_size, size_t num_keys,
const planner::AbstractPlanNode *plan, size_t scaling_factor,
double mem_factor) {
Expand Down Expand Up @@ -203,6 +211,15 @@ void OperatingUnitRecorder::AggregateFeatures(brain::ExecutionOperatingUnitType
}

cardinality = 1; // extract from plan num_rows (this is the scan size)
} else if (type == ExecutionOperatingUnitType::CREATE_INDEX) {
// We extract the num_rows and cardinality from the table name if possible
// This is a special case for mini-runners
std::string idx_name = reinterpret_cast<const planner::CreateIndexPlanNode *>(plan)->GetIndexName();
auto mrpos = idx_name.find("minirunners__");
if (mrpos != std::string::npos) {
num_rows = atoi(idx_name.c_str() + mrpos + sizeof("minirunners__") - 1);
cardinality = num_rows;
}
}

num_rows *= scaling_factor;
Expand Down Expand Up @@ -325,6 +342,16 @@ void OperatingUnitRecorder::VisitAbstractScanPlanNode(const planner::AbstractSca
}
}

void OperatingUnitRecorder::Visit(const planner::CreateIndexPlanNode *plan) {
std::vector<catalog::indexkeycol_oid_t> keys;

auto schema = plan->GetSchema();
size_t num_keys = schema->GetColumns().size();
size_t key_size =
ComputeKeySize(common::ManagedPointer<const catalog::IndexSchema>(schema.Get()), false, keys, &num_keys);
AggregateFeatures(plan_feature_type_, key_size, num_keys, plan, 1, 1);
}

void OperatingUnitRecorder::Visit(const planner::SeqScanPlanNode *plan) {
VisitAbstractScanPlanNode(plan);
RecordArithmeticFeatures(plan, 1);
Expand Down
11 changes: 11 additions & 0 deletions src/execution/compiler/operator/index_create_translator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ void IndexCreateTranslator::PerformPipelineWork(WorkContext *context, FunctionBu
// Close TVI, if need be.
function->Append(codegen_->TableIterClose(codegen_->MakeExpr(tvi_var_)));

{
auto *codegen = GetCodeGen();

// Get Memory Use
auto *get_mem =
codegen->CallBuiltin(ast::Builtin::StorageInterfaceGetIndexHeapSize, {codegen->AddressOf(inserter_)});
auto *record =
codegen->CallBuiltin(ast::Builtin::ExecutionContextSetMemoryUseOverride, {GetExecutionContext(), get_mem});
function->Append(codegen->MakeStmt(record));
}

FreeInserter(function);
}

Expand Down
7 changes: 6 additions & 1 deletion src/execution/exec/execution_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ void ExecutionContext::EndResourceTracker(const char *name, uint32_t len) {
void ExecutionContext::EndPipelineTracker(query_id_t query_id, pipeline_id_t pipeline) {
if (common::thread_context.metrics_store_ != nullptr && common::thread_context.resource_tracker_.IsRunning()) {
common::thread_context.resource_tracker_.Stop();
common::thread_context.resource_tracker_.SetMemory(mem_tracker_->GetAllocatedSize());
auto mem_size = mem_tracker_->GetAllocatedSize();
if (memory_use_override_) {
mem_size = memory_use_override_value_;
}

common::thread_context.resource_tracker_.SetMemory(mem_size);
auto &resource_metrics = common::thread_context.resource_tracker_.GetMetrics();

// TODO(wz2): With a query cache, see if we can avoid this copy
Expand Down
20 changes: 20 additions & 0 deletions src/execution/sema/sema_builtin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,7 @@ void Sema::CheckBuiltinExecutionContextCall(ast::CallExpr *call, ast::Builtin bu
break;
case ast::Builtin::ExecutionContextAddRowsAffected:
case ast::Builtin::ExecutionContextStartResourceTracker:
case ast::Builtin::ExecutionContextSetMemoryUseOverride:
case ast::Builtin::ExecutionContextEndResourceTracker:
expected_arg_count = 2;
break;
Expand Down Expand Up @@ -903,6 +904,15 @@ void Sema::CheckBuiltinExecutionContextCall(ast::CallExpr *call, ast::Builtin bu
call->SetType(GetBuiltinType(ast::BuiltinType::Nil));
break;
}
case ast::Builtin::ExecutionContextSetMemoryUseOverride: {
if (!call_args[1]->GetType()->IsIntegerType()) {
ReportIncorrectCallArg(call, 1, GetBuiltinType(ast::BuiltinType::Uint32));
return;
}
// Init returns nil
call->SetType(GetBuiltinType(ast::BuiltinType::Nil));
break;
}
default: {
UNREACHABLE("Impossible execution context call");
}
Expand Down Expand Up @@ -2365,6 +2375,14 @@ void Sema::CheckBuiltinStorageInterfaceCall(ast::CallExpr *call, ast::Builtin bu
call->SetType(GetBuiltinType(ast::BuiltinType::ProjectedRow)->PointerTo());
break;
}
case ast::Builtin::StorageInterfaceGetIndexHeapSize: {
if (!CheckArgCount(call, 1)) {
return;
}

call->SetType(GetBuiltinType(ast::BuiltinType::Uint32));
break;
}
case ast::Builtin::IndexInsert: {
if (!CheckArgCount(call, 1)) {
return;
Expand Down Expand Up @@ -2893,6 +2911,7 @@ void Sema::CheckBuiltinCall(ast::CallExpr *call) {
case ast::Builtin::ExecutionContextGetMemoryPool:
case ast::Builtin::ExecutionContextGetTLS:
case ast::Builtin::ExecutionContextStartResourceTracker:
case ast::Builtin::ExecutionContextSetMemoryUseOverride:
case ast::Builtin::ExecutionContextEndResourceTracker:
case ast::Builtin::ExecutionContextEndPipelineTracker: {
CheckBuiltinExecutionContextCall(call, builtin);
Expand Down Expand Up @@ -3174,6 +3193,7 @@ void Sema::CheckBuiltinCall(ast::CallExpr *call) {
}
case ast::Builtin::StorageInterfaceInit:
case ast::Builtin::GetTablePR:
case ast::Builtin::StorageInterfaceGetIndexHeapSize:
case ast::Builtin::TableInsert:
case ast::Builtin::TableDelete:
case ast::Builtin::TableUpdate:
Expand Down
9 changes: 7 additions & 2 deletions src/execution/sql/storage_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ storage::ProjectedRow *StorageInterface::GetIndexPR(catalog::index_oid_t index_o
curr_index_ = exec_ctx_->GetAccessor()->GetIndex(index_oid);
// index is created after the initialization of storage interface
if (curr_index_ != nullptr && !need_indexes_) {
index_pr_buffer_ = exec_ctx_->GetMemoryPool()->AllocateAligned(
curr_index_->GetProjectedRowInitializer().ProjectedRowSize(), alignof(uint64_t), false);
max_pr_size_ = curr_index_->GetProjectedRowInitializer().ProjectedRowSize();
index_pr_buffer_ = exec_ctx_->GetMemoryPool()->AllocateAligned(max_pr_size_, alignof(uint64_t), false);
need_indexes_ = true;
}
index_pr_ = curr_index_->GetProjectedRowInitializer().InitializeRow(index_pr_buffer_);
Expand All @@ -57,6 +57,11 @@ storage::ProjectedRow *StorageInterface::GetIndexPR(catalog::index_oid_t index_o

storage::TupleSlot StorageInterface::TableInsert() { return table_->Insert(exec_ctx_->GetTxn(), table_redo_); }

uint32_t StorageInterface::GetIndexHeapSize() {
TERRIER_ASSERT(curr_index_ != nullptr, "Index must have been loaded");
return curr_index_->EstimateHeapUsage();
}

bool StorageInterface::TableDelete(storage::TupleSlot table_tuple_slot) {
auto txn = exec_ctx_->GetTxn();
txn->StageDelete(exec_ctx_->DBOid(), table_oid_, table_tuple_slot);
Expand Down
13 changes: 13 additions & 0 deletions src/execution/vm/bytecode_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,11 @@ void BytecodeGenerator::VisitExecutionContextCall(ast::CallExpr *call, ast::Buil
GetEmitter()->Emit(Bytecode::ExecutionContextStartResourceTracker, exec_ctx, cmp);
break;
}
case ast::Builtin::ExecutionContextSetMemoryUseOverride: {
LocalVar use = VisitExpressionForRValue(call->Arguments()[1]);
GetEmitter()->Emit(Bytecode::ExecutionContextSetMemoryUseOverride, exec_ctx, use);
break;
}
case ast::Builtin::ExecutionContextEndResourceTracker: {
LocalVar name = VisitExpressionForRValue(call->Arguments()[1]);
GetEmitter()->Emit(Bytecode::ExecutionContextEndResourceTracker, exec_ctx, name);
Expand Down Expand Up @@ -2119,6 +2124,12 @@ void BytecodeGenerator::VisitBuiltinStorageInterfaceCall(ast::CallExpr *call, as
index_oid);
break;
}
case ast::Builtin::StorageInterfaceGetIndexHeapSize: {
LocalVar size = GetExecutionResult()->GetOrCreateDestination(call->GetType());
GetEmitter()->Emit(Bytecode::StorageInterfaceGetIndexHeapSize, size, storage_interface);
GetExecutionResult()->SetDestination(size.ValueOf());
break;
}
case ast::Builtin::IndexInsert: {
LocalVar cond = GetExecutionResult()->GetOrCreateDestination(ast::BuiltinType::Get(ctx, ast::BuiltinType::Bool));
GetEmitter()->Emit(Bytecode::StorageInterfaceIndexInsert, cond, storage_interface);
Expand Down Expand Up @@ -2387,6 +2398,7 @@ void BytecodeGenerator::VisitBuiltinCallExpr(ast::CallExpr *call) {
case ast::Builtin::ExecutionContextGetMemoryPool:
case ast::Builtin::ExecutionContextGetTLS:
case ast::Builtin::ExecutionContextStartResourceTracker:
case ast::Builtin::ExecutionContextSetMemoryUseOverride:
case ast::Builtin::ExecutionContextEndResourceTracker:
case ast::Builtin::ExecutionContextEndPipelineTracker: {
VisitExecutionContextCall(call, builtin);
Expand Down Expand Up @@ -2668,6 +2680,7 @@ void BytecodeGenerator::VisitBuiltinCallExpr(ast::CallExpr *call) {
case ast::Builtin::TableDelete:
case ast::Builtin::TableUpdate:
case ast::Builtin::GetIndexPR:
case ast::Builtin::StorageInterfaceGetIndexHeapSize:
case ast::Builtin::IndexInsert:
case ast::Builtin::IndexInsertUnique:
case ast::Builtin::IndexInsertWithSlot:
Expand Down
Loading

0 comments on commit fa7fecd

Please sign in to comment.