Skip to content

Commit

Permalink
Moved the estimator stuff around so that we can split up the Markov/F…
Browse files Browse the repository at this point in the history
…ixed/Remote TransactionEstimator code
  • Loading branch information
apavlo committed Oct 20, 2012
1 parent 99120ba commit 51b64a9
Show file tree
Hide file tree
Showing 27 changed files with 110 additions and 67 deletions.
2 changes: 1 addition & 1 deletion log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ log4j.logger.edu.brown.utils=INFO

log4j.logger.edu.brown.markov=INFO
log4j.logger.edu.brown.hstore.estimators=INFO
log4j.logger.edu.brown.hstore.estimators.markov=INFO
log4j.logger.edu.brown.hstore.specexec=INFO
log4j.logger.edu.brown.hstore.SpecExecScheduler=INFO

## VoltDB Stuff
log4j.logger.org.voltdb.VoltProcedure=INFO
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/edu/brown/costmodel/MarkovCostModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import edu.brown.catalog.CatalogUtil;
import edu.brown.hstore.HStoreConstants;
import edu.brown.hstore.conf.HStoreConf;
import edu.brown.hstore.estimators.MarkovEstimator;
import edu.brown.hstore.estimators.MarkovEstimatorState;
import edu.brown.hstore.estimators.TransactionEstimate;
import edu.brown.hstore.estimators.markov.MarkovEstimate;
import edu.brown.hstore.estimators.markov.MarkovEstimator;
import edu.brown.hstore.estimators.markov.MarkovEstimatorState;
import edu.brown.hstore.txns.AbstractTransaction;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;
import edu.brown.markov.EstimationThresholds;
import edu.brown.markov.MarkovEstimate;
import edu.brown.markov.MarkovGraph;
import edu.brown.markov.MarkovProbabilityCalculator;
import edu.brown.markov.MarkovUtil;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/edu/brown/hstore/HStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import org.voltdb.catalog.Database;

import edu.brown.hstore.conf.HStoreConf;
import edu.brown.hstore.estimators.FixedEstimator;
import edu.brown.hstore.estimators.MarkovEstimator;
import edu.brown.hstore.estimators.TransactionEstimator;
import edu.brown.hstore.estimators.fixed.FixedEstimator;
import edu.brown.hstore.estimators.markov.MarkovEstimator;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;
import edu.brown.mappings.ParameterMappingsSet;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/edu/brown/hstore/HStoreSiteStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import edu.brown.hstore.callbacks.TransactionInitQueueCallback;
import edu.brown.hstore.conf.HStoreConf;
import edu.brown.hstore.estimators.MarkovEstimator;
import edu.brown.hstore.estimators.markov.MarkovEstimator;
import edu.brown.hstore.txns.AbstractTransaction;
import edu.brown.hstore.txns.LocalTransaction;
import edu.brown.hstore.util.ThrottlingQueue;
Expand Down
34 changes: 22 additions & 12 deletions src/frontend/edu/brown/hstore/PartitionExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@

import edu.brown.catalog.CatalogUtil;
import edu.brown.catalog.PlanFragmentIdGenerator;
import edu.brown.catalog.special.CountedStatement;
import edu.brown.hstore.Hstoreservice.QueryEstimate;
import edu.brown.hstore.Hstoreservice.Status;
import edu.brown.hstore.Hstoreservice.TransactionPrefetchResult;
import edu.brown.hstore.Hstoreservice.TransactionWorkRequest;
Expand Down Expand Up @@ -2659,10 +2661,10 @@ private void requestWork(LocalTransaction ts, Collection<WorkFragment.Builder> f
// Now we can go back through and start running all of the WorkFragments that were not blocked
// waiting for an input dependency. Note that we pack all the fragments into a single
// CoordinatorFragment rather than sending each WorkFragment in its own message
for (WorkFragment.Builder ftask : fragmentBuilders) {
assert(!ts.isBlocked(ftask));
for (WorkFragment.Builder fragmentBuilder : fragmentBuilders) {
assert(!ts.isBlocked(fragmentBuilder));

int target_partition = ftask.getPartitionId();
int target_partition = fragmentBuilder.getPartitionId();
int target_site = catalogContext.getSiteIdForPartitionId(target_partition);

// Make sure that this isn't a single-partition txn trying to access a remote partition
Expand All @@ -2681,15 +2683,23 @@ else if (done_partitions.contains(target_partition)) {
break;
}
// Make sure we at least have something to do!
else if (ftask.getFragmentIdCount() == 0) {
else if (fragmentBuilder.getFragmentIdCount() == 0) {
LOG.warn(String.format("%s - Trying to send a WorkFragment request with 0 fragments", ts));
continue;
}

// Add in the specexec query estimate at this partition if needed
if (hstore_conf.site.specexec_enable && t_estimate != null) {
t_estimate.getQueryEstimate(target_partition);

if (hstore_conf.site.specexec_enable && t_estimate != null && t_estimate.hasQueryEstimate(target_partition)) {
QueryEstimate.Builder estBuilder = QueryEstimate.newBuilder();
List<CountedStatement> estimated = t_estimate.getQueryEstimate(target_partition);
if (d) LOG.debug(String.format("%s - Sending QueryEstimate to partition %d containing %d queries",
ts, target_partition, estimated.size()));
assert(estimated.isEmpty() == false);
for (CountedStatement countedStmt : estimated) {
estBuilder.addStmtIds(countedStmt.statement.getId());
estBuilder.addStmtCounters(countedStmt.counter);
} // FOR
fragmentBuilder.setFutureStatements(estBuilder);
}

// Get the TransactionWorkRequest.Builder for the remote HStoreSite
Expand All @@ -2702,14 +2712,14 @@ else if (ftask.getFragmentIdCount() == 0) {

// Also keep track of what Statements they are executing so that we know
// we need to send over the wire to them.
requestBuilder.addParamIndexes(ftask.getParamIndexList());
requestBuilder.addParamIndexes(fragmentBuilder.getParamIndexList());

// Input Dependencies
if (ftask.getNeedsInput()) {
if (fragmentBuilder.getNeedsInput()) {
if (d) LOG.debug("Retrieving input dependencies for " + ts);

tmp_removeDependenciesMap.clear();
this.getFragmentInputs(ts, ftask.getInputDepIdList(), tmp_removeDependenciesMap);
this.getFragmentInputs(ts, fragmentBuilder.getInputDepIdList(), tmp_removeDependenciesMap);

for (Entry<Integer, List<VoltTable>> e : tmp_removeDependenciesMap.entrySet()) {
if (requestBuilder.hasInputDependencyId(e.getKey())) continue;
Expand All @@ -2727,14 +2737,14 @@ else if (ftask.getFragmentIdCount() == 0) {
throw new ServerFaultException(msg, ts.getTransactionId());
}
if (d) LOG.debug(String.format("%s - Storing %d rows for InputDependency %d to send to partition %d [bytes=%d]",
ts, vt.getRowCount(), e.getKey(), ftask.getPartitionId(),
ts, vt.getRowCount(), e.getKey(), fragmentBuilder.getPartitionId(),
CollectionUtil.last(builder.getAttachedDataList()).size()));
} // FOR
requestBuilder.addInputDependencyId(e.getKey());
} // FOR
this.fs.getBBContainer().discard();
}
builder.addFragments(ftask);
builder.addFragments(fragmentBuilder);
} // FOR (tasks)

// Bad mojo! We need to throw a MispredictionException so that the VoltProcedure
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/edu/brown/hstore/TransactionInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import edu.brown.hstore.estimators.TransactionEstimator;
import edu.brown.hstore.estimators.TransactionEstimate;
import edu.brown.hstore.estimators.EstimatorState;
import edu.brown.hstore.estimators.MarkovEstimatorState;
import edu.brown.hstore.estimators.markov.MarkovEstimatorState;
import edu.brown.hstore.txns.AbstractTransaction;
import edu.brown.hstore.txns.LocalTransaction;
import edu.brown.hstore.txns.MapReduceTransaction;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package edu.brown.markov;
package edu.brown.hstore.estimators;

import edu.brown.hstore.estimators.TransactionEstimate;

/**
* Special estimation type that can be dynamically calculated as the
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package edu.brown.hstore.estimators;
package edu.brown.hstore.estimators.fixed;

import java.util.List;

Expand All @@ -7,6 +7,9 @@
import org.voltdb.utils.EstTime;

import edu.brown.catalog.special.CountedStatement;
import edu.brown.hstore.estimators.EstimatorState;
import edu.brown.hstore.estimators.TransactionEstimate;
import edu.brown.hstore.estimators.TransactionEstimator;
import edu.brown.markov.EstimationThresholds;
import edu.brown.utils.PartitionEstimator;
import edu.brown.utils.PartitionSet;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package edu.brown.hstore.estimators;
package edu.brown.hstore.estimators.fixed;

import org.apache.log4j.Logger;
import org.voltdb.VoltType;
import org.voltdb.catalog.Procedure;
import org.voltdb.catalog.Statement;

import edu.brown.hstore.Hstoreservice.Status;
import edu.brown.hstore.estimators.EstimatorState;
import edu.brown.hstore.estimators.TransactionEstimate;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;
import edu.brown.utils.PartitionEstimator;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package edu.brown.hstore.estimators;
package edu.brown.hstore.estimators.fixed;

import org.voltdb.catalog.Procedure;
import org.voltdb.catalog.Statement;

import edu.brown.hstore.Hstoreservice.Status;
import edu.brown.hstore.estimators.EstimatorState;
import edu.brown.hstore.estimators.TransactionEstimate;
import edu.brown.utils.PartitionEstimator;
import edu.brown.utils.PartitionSet;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package edu.brown.hstore.estimators;
package edu.brown.hstore.estimators.fixed;

import java.util.Arrays;

Expand All @@ -7,6 +7,8 @@
import org.voltdb.catalog.Statement;

import edu.brown.hstore.Hstoreservice.Status;
import edu.brown.hstore.estimators.EstimatorState;
import edu.brown.hstore.estimators.TransactionEstimate;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;
import edu.brown.utils.PartitionEstimator;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package edu.brown.markov;
package edu.brown.hstore.estimators.markov;

import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand All @@ -9,8 +9,12 @@
import org.voltdb.CatalogContext;

import edu.brown.catalog.special.CountedStatement;
import edu.brown.hstore.estimators.DynamicTransactionEstimate;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;
import edu.brown.markov.EstimationThresholds;
import edu.brown.markov.MarkovUtil;
import edu.brown.markov.MarkovVertex;
import edu.brown.pools.Poolable;
import edu.brown.utils.PartitionSet;
import edu.brown.utils.StringUtil;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package edu.brown.hstore.estimators;
package edu.brown.hstore.estimators.markov;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -17,14 +17,15 @@

import edu.brown.graphs.GraphvizExport;
import edu.brown.hstore.Hstoreservice.Status;
import edu.brown.hstore.estimators.EstimatorState;
import edu.brown.hstore.estimators.TransactionEstimator;
import edu.brown.hstore.estimators.markov.MarkovEstimatorState.Factory;
import edu.brown.hstore.txns.AbstractTransaction;
import edu.brown.interfaces.DebugContext;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;
import edu.brown.markov.MarkovEdge;
import edu.brown.markov.MarkovEstimate;
import edu.brown.markov.MarkovGraph;
import edu.brown.markov.MarkovPathEstimator;
import edu.brown.markov.MarkovUtil;
import edu.brown.markov.MarkovVertex;
import edu.brown.markov.containers.MarkovGraphsContainer;
Expand Down Expand Up @@ -229,11 +230,12 @@ public MarkovEstimatorState startTransactionImpl(Long txn_id, int base_partition
@Override
public MarkovEstimate executeQueries(EstimatorState s, Statement catalog_stmts[], PartitionSet partitions[], boolean allow_cache_lookup) {
MarkovEstimatorState state = (MarkovEstimatorState)s;
if (d) LOG.debug(String.format("Processing %d queries for txn #%d", catalog_stmts.length, state.txn_id));
if (d) LOG.debug(String.format("Processing %d queries for txn #%d", catalog_stmts.length, state.getTransactionId()));
int batch_size = catalog_stmts.length;
MarkovGraph markov = state.getMarkovGraph();

MarkovVertex current = state.getCurrent();
PartitionSet touchedPartitions = state.getTouchedPartitions();
MarkovVertex next_v = null;
MarkovEdge next_e = null;
Statement last_stmt = null;
Expand Down Expand Up @@ -271,12 +273,12 @@ public MarkovEstimate executeQueries(EstimatorState s, Statement catalog_stmts[]
if (d) LOG.debug(String.format("Got cached batch end for %s: %s -> %s", markov, current, next_v));

// Update the counters and other info for the next vertex and edge
next_v.addInstanceTime(state.txn_id, state.getExecutionTimeOffset());
next_v.addInstanceTime(state.getTransactionId(), state.getExecutionTimeOffset());

// Update the state information
state.setCurrent(next_v, next_e);
state.touched_partitions.addAll(state.cache_last_partitions);
state.touched_partitions.addAll(state.cache_past_partitions);
touchedPartitions.addAll(state.cache_last_partitions);
touchedPartitions.addAll(state.cache_past_partitions);
}
}

Expand All @@ -286,7 +288,7 @@ public MarkovEstimate executeQueries(EstimatorState s, Statement catalog_stmts[]
for (int i = 0; i < batch_size; i++) {
int idx = (attempt_cache_lookup ? stmt_idxs[i] : -1);
this.consume(state, markov, catalog_stmts[i], partitions[i], idx);
if (attempt_cache_lookup == false) state.touched_partitions.addAll(partitions[i]);
if (attempt_cache_lookup == false) touchedPartitions.addAll(partitions[i]);
} // FOR

// Update our cache if we tried and failed before
Expand All @@ -311,11 +313,11 @@ public MarkovEstimate executeQueries(EstimatorState s, Statement catalog_stmts[]
Object procArgs[] = state.getProcedureParameters();
this.estimatePath(state, estimate, catalog_proc, procArgs);

if (d) LOG.debug(String.format("Next MarkovEstimate for txn #%d\n%s", state.txn_id, estimate.toString()));
if (d) LOG.debug(String.format("Next MarkovEstimate for txn #%d\n%s", state.getTransactionId(), estimate.toString()));
assert(estimate.isInitialized()) :
String.format("Unexpected uninitialized MarkovEstimate for txn #%d\n%s", state.txn_id, estimate);
String.format("Unexpected uninitialized MarkovEstimate for txn #%d\n%s", state.getTransactionId(), estimate);
assert(estimate.isValid()) :
String.format("Invalid MarkovEstimate for txn #%d\n%s", state.txn_id, estimate);
String.format("Invalid MarkovEstimate for txn #%d\n%s", state.getTransactionId(), estimate);

// Once the workload shifts we detect it and trigger this method. Recomputes
// the graph with the data we collected with the current workload method.
Expand Down Expand Up @@ -517,6 +519,7 @@ private void consume(MarkovEstimatorState state,

// Examine all of the vertices that are adjacent to our current vertex
// and see which vertex we are going to move to next
PartitionSet touchedPartitions = state.getTouchedPartitions();
MarkovVertex current = state.getCurrent();
assert(current != null);
MarkovVertex next_v = null;
Expand All @@ -525,11 +528,11 @@ private void consume(MarkovEstimatorState state,
// Synchronize on the single vertex so that it's more fine-grained than the entire graph
synchronized (current) {
Collection<MarkovEdge> edges = markov.getOutEdges(current);
if (t) LOG.trace("Examining " + edges.size() + " edges from " + current + " for Txn #" + state.txn_id);
if (t) LOG.trace("Examining " + edges.size() + " edges from " + current + " for Txn #" + state.getTransactionId());
for (MarkovEdge e : edges) {
MarkovVertex v = markov.getDest(e);
if (v.isEqual(catalog_stmt, partitions, state.touched_partitions, queryInstanceIndex)) {
if (t) LOG.trace("Found next vertex " + v + " for Txn #" + state.txn_id);
if (v.isEqual(catalog_stmt, partitions, touchedPartitions, queryInstanceIndex)) {
if (t) LOG.trace("Found next vertex " + v + " for Txn #" + state.getTransactionId());
next_v = v;
next_e = e;
break;
Expand All @@ -544,21 +547,21 @@ private void consume(MarkovEstimatorState state,
MarkovVertex.Type.QUERY,
queryInstanceIndex,
partitions,
state.touched_partitions);
touchedPartitions);
markov.addVertex(next_v);
next_e = markov.addToEdge(current, next_v);
if (t) LOG.trace(String.format("Created new edge from %s to new %s for txn #%d",
state.getCurrent(), next_v, state.txn_id));
assert(state.getCurrent().getPartitions().size() <= state.touched_partitions.size());
state.getCurrent(), next_v, state.getTransactionId()));
assert(state.getCurrent().getPartitions().size() <= touchedPartitions.size());
}
} // SYNCH

// Update the counters and other info for the next vertex and edge
next_v.addInstanceTime(state.txn_id, state.getExecutionTimeOffset());
next_v.addInstanceTime(state.getTransactionId(), state.getExecutionTimeOffset());

// Update the state information
state.setCurrent(next_v, next_e);
if (t) LOG.trace("Updated State Information for Txn #" + state.txn_id + ":\n" + state);
if (t) LOG.trace("Updated State Information for Txn #" + state.getTransactionId() + ":\n" + state);
if (this.profiler != null) this.profiler.time_consume.stop();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package edu.brown.hstore.estimators;
package edu.brown.hstore.estimators.markov;

import java.io.File;
import java.util.ArrayList;
Expand All @@ -11,10 +11,10 @@

import edu.brown.graphs.GraphvizExport;
import edu.brown.hstore.conf.HStoreConf;
import edu.brown.hstore.estimators.EstimatorState;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;
import edu.brown.markov.MarkovEdge;
import edu.brown.markov.MarkovEstimate;
import edu.brown.markov.MarkovGraph;
import edu.brown.markov.MarkovUtil;
import edu.brown.markov.MarkovVertex;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package edu.brown.markov;
package edu.brown.hstore.estimators.markov;

import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -27,13 +27,15 @@
import edu.brown.graphs.VertexTreeWalker;
import edu.brown.hstore.HStoreConstants;
import edu.brown.hstore.conf.HStoreConf;
import edu.brown.hstore.estimators.MarkovEstimator;
import edu.brown.hstore.estimators.MarkovEstimatorState;
import edu.brown.interfaces.Loggable;
import edu.brown.logging.LoggerUtil;
import edu.brown.logging.LoggerUtil.LoggerBoolean;
import edu.brown.mappings.ParameterMapping;
import edu.brown.mappings.ParameterMappingsSet;
import edu.brown.markov.MarkovEdge;
import edu.brown.markov.MarkovGraph;
import edu.brown.markov.MarkovUtil;
import edu.brown.markov.MarkovVertex;
import edu.brown.markov.containers.MarkovGraphsContainer;
import edu.brown.pools.TypedPoolableObjectFactory;
import edu.brown.utils.ArgumentsParser;
Expand Down
Loading

0 comments on commit 51b64a9

Please sign in to comment.