Skip to content

Commit

Permalink
PHOENIX-4701 Write client-side metrics asynchronously to SYSTEM.LOG(a…
Browse files Browse the repository at this point in the history
…ddendum)
  • Loading branch information
ankitsinghal committed May 17, 2018
1 parent e58fee3 commit fc562a3
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public static void doSetup() throws Exception {
props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
// disable renewing leases as this will force spooling to happen.
props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
props.put(QueryServices.LOG_LEVEL, LogLevel.DEBUG.toString());
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
// need the non-test driver for some tests that check number of hconnections, etc.
DriverManager.registerDriver(PhoenixDriver.INSTANCE);
Expand Down Expand Up @@ -700,7 +699,7 @@ private void assertMetricsHaveSameValues(Map<MetricType, Long> metricNameValueMa

private void changeInternalStateForTesting(PhoenixResultSet rs) {
// get and set the internal state for testing purposes.
ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(LogLevel.DEBUG);
ReadMetricQueue testMetricsQueue = new TestReadMetricsQueue(LogLevel.OFF,true);
StatementContext ctx = (StatementContext)Whitebox.getInternalState(rs, "context");
Whitebox.setInternalState(ctx, "readMetricsQueue", testMetricsQueue);
Whitebox.setInternalState(rs, "readMetricsQueue", testMetricsQueue);
Expand Down Expand Up @@ -766,8 +765,8 @@ private Connection insertRowsInTable(String tableName, long numRows) throws SQLE

private class TestReadMetricsQueue extends ReadMetricQueue {

public TestReadMetricsQueue(LogLevel connectionLogLevel) {
super(connectionLogLevel);
public TestReadMetricsQueue(LogLevel connectionLogLevel, boolean isRequestMetricsEnabled) {
super(isRequestMetricsEnabled, connectionLogLevel);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Sca
this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer> emptyMap() : Maps
.<PColumn, Integer> newLinkedHashMap();
this.subqueryResults = Maps.<SelectStatement, Object> newHashMap();
this.readMetricsQueue = new ReadMetricQueue(connection.getLogLevel());
this.overAllQueryMetrics = new OverAllQueryMetrics(connection.getLogLevel());
this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled,connection.getLogLevel());
this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled,connection.getLogLevel());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@
import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixIndexMetaData;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand;
import org.apache.phoenix.index.PhoenixIndexMetaData;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
Expand Down Expand Up @@ -1179,7 +1179,7 @@ public void doMutation() throws IOException {
numFailedMutations = uncommittedStatementIndexes.length;
GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations);
} finally {
MutationMetric mutationsMetric = new MutationMetric(connection.getLogLevel(),numMutations, mutationSizeBytes, mutationCommitTime, numFailedMutations);
MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime, numFailedMutations);
mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric);
try {
if (cache!=null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public interface CombinableMetric extends Metric {
String getPublishString();

CombinableMetric combine(CombinableMetric metric);

CombinableMetric clone();

public class NoOpRequestMetric implements CombinableMetric {

Expand Down Expand Up @@ -70,7 +72,14 @@ public CombinableMetric combine(CombinableMetric metric) {

@Override
public void decrement() {}

@Override
public CombinableMetric clone(){
return INSTANCE;
}

}



}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@
*/
package org.apache.phoenix.monitoring;

import static com.google.common.base.Preconditions.checkArgument;

public class CombinableMetricImpl implements CombinableMetric {

private final Metric metric;

public CombinableMetricImpl(MetricType type) {
metric = new NonAtomicMetric(type);
}

private CombinableMetricImpl(Metric metric) {
this.metric = metric;
}

@Override
public MetricType getMetricType() {
Expand Down Expand Up @@ -64,7 +66,6 @@ public String getPublishString() {

@Override
public CombinableMetric combine(CombinableMetric metric) {
checkArgument(this.getClass().equals(metric.getClass()));
this.metric.change(metric.getValue());
return this;
}
Expand All @@ -73,5 +74,12 @@ public CombinableMetric combine(CombinableMetric metric) {
public void decrement() {
metric.decrement();
}

@Override
public CombinableMetric clone(){
NonAtomicMetric metric = new NonAtomicMetric(this.metric.getMetricType());
metric.change(this.metric.getValue());
return new CombinableMetricImpl(metric);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

public class MetricUtil {

public static CombinableMetric getCombinableMetric(LogLevel connectionLogLevel, MetricType type) {
if (!type.isLoggingEnabled(connectionLogLevel)) { return NoOpRequestMetric.INSTANCE; }
public static CombinableMetric getCombinableMetric(boolean isRequestMetricsEnabled, LogLevel connectionLogLevel, MetricType type) {
if (!type.isLoggingEnabled(connectionLogLevel) && !isRequestMetricsEnabled) { return NoOpRequestMetric.INSTANCE; }
return new CombinableMetricImpl(type);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.Map;
import java.util.Map.Entry;

import org.apache.phoenix.log.LogLevel;

/**
* Queue that tracks various writes/mutations related phoenix request metrics.
Expand Down Expand Up @@ -83,17 +82,12 @@ public void clearMetrics() {
* Class that holds together the various metrics associated with mutations.
*/
public static class MutationMetric {
private final CombinableMetric numMutations;;
private final CombinableMetric mutationsSizeBytes;
private final CombinableMetric totalCommitTimeForMutations;
private final CombinableMetric numFailedMutations;

public MutationMetric(LogLevel connectionLogLevel, long numMutations, long mutationsSizeBytes, long commitTimeForMutations, long numFailedMutations) {
this.numMutations = MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BATCH_SIZE);
this.mutationsSizeBytes =MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BYTES);
this.totalCommitTimeForMutations =MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_COMMIT_TIME);
this.numFailedMutations = MetricUtil.getCombinableMetric(connectionLogLevel,MUTATION_BATCH_FAILED_SIZE);
this.numMutations.change(numMutations);
private final CombinableMetric numMutations = new CombinableMetricImpl(MUTATION_BATCH_SIZE);
private final CombinableMetric mutationsSizeBytes = new CombinableMetricImpl(MUTATION_BYTES);
private final CombinableMetric totalCommitTimeForMutations = new CombinableMetricImpl(MUTATION_COMMIT_TIME);
private final CombinableMetric numFailedMutations = new CombinableMetricImpl(MUTATION_BATCH_FAILED_SIZE);

public MutationMetric(long numMutations, long mutationsSizeBytes, long commitTimeForMutations, long numFailedMutations) { this.numMutations.change(numMutations);
this.mutationsSizeBytes.change(mutationsSizeBytes);
this.totalCommitTimeForMutations.change(commitTimeForMutations);
this.numFailedMutations.change(numFailedMutations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ public class OverAllQueryMetrics {
private final CombinableMetric queryFailed;
private final CombinableMetric cacheRefreshedDueToSplits;

public OverAllQueryMetrics(LogLevel connectionLogLevel) {
public OverAllQueryMetrics(boolean isRequestMetricsEnabled, LogLevel connectionLogLevel) {
queryWatch = new MetricsStopWatch(WALL_CLOCK_TIME_MS.isLoggingEnabled(connectionLogLevel));
resultSetWatch = new MetricsStopWatch(RESULT_SET_TIME_MS.isLoggingEnabled(connectionLogLevel));
numParallelScans = MetricUtil.getCombinableMetric(connectionLogLevel, NUM_PARALLEL_SCANS);
wallClockTimeMS = MetricUtil.getCombinableMetric(connectionLogLevel, WALL_CLOCK_TIME_MS);
resultSetTimeMS = MetricUtil.getCombinableMetric(connectionLogLevel, RESULT_SET_TIME_MS);
queryTimedOut = MetricUtil.getCombinableMetric(connectionLogLevel, QUERY_TIMEOUT_COUNTER);
queryFailed = MetricUtil.getCombinableMetric(connectionLogLevel, QUERY_FAILED_COUNTER);
cacheRefreshedDueToSplits = MetricUtil.getCombinableMetric(connectionLogLevel, CACHE_REFRESH_SPLITS_COUNTER);
numParallelScans = MetricUtil.getCombinableMetric(isRequestMetricsEnabled,connectionLogLevel, NUM_PARALLEL_SCANS);
wallClockTimeMS = MetricUtil.getCombinableMetric(isRequestMetricsEnabled,connectionLogLevel, WALL_CLOCK_TIME_MS);
resultSetTimeMS = MetricUtil.getCombinableMetric(isRequestMetricsEnabled,connectionLogLevel, RESULT_SET_TIME_MS);
queryTimedOut = MetricUtil.getCombinableMetric(isRequestMetricsEnabled,connectionLogLevel, QUERY_TIMEOUT_COUNTER);
queryFailed = MetricUtil.getCombinableMetric(isRequestMetricsEnabled,connectionLogLevel, QUERY_FAILED_COUNTER);
cacheRefreshedDueToSplits = MetricUtil.getCombinableMetric(isRequestMetricsEnabled,connectionLogLevel, CACHE_REFRESH_SPLITS_COUNTER);
}

public void updateNumParallelScans(long numParallelScans) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,15 @@ public class ReadMetricQueue {

private LogLevel connectionLogLevel;

public ReadMetricQueue(LogLevel connectionLogLevel) {
private boolean isRequestMetricsEnabled;

public ReadMetricQueue(boolean isRequestMetricsEnabled, LogLevel connectionLogLevel) {
this.isRequestMetricsEnabled = isRequestMetricsEnabled;
this.connectionLogLevel = connectionLogLevel;
}

public CombinableMetric allotMetric(MetricType type, String tableName) {
if (type.isLoggingEnabled(connectionLogLevel)) {
if (type.isLoggingEnabled(connectionLogLevel) || isRequestMetricsEnabled) {
MetricKey key = new MetricKey(type, tableName);
Queue<CombinableMetric> q = getMetricQueue(key);
CombinableMetric metric = getMetric(type);
Expand Down Expand Up @@ -95,7 +98,8 @@ private static CombinableMetric combine(Collection<CombinableMetric> metrics) {
int size = metrics.size();
if (size == 0) { throw new IllegalArgumentException("Metrics collection needs to have at least one element"); }
Iterator<CombinableMetric> itr = metrics.iterator();
CombinableMetric combinedMetric = itr.next();
//Clone first metric for combining so that aggregate always give consistent result
CombinableMetric combinedMetric = itr.next().clone();
while (itr.hasNext()) {
combinedMetric = combinedMetric.combine(itr.next());
}
Expand Down Expand Up @@ -186,6 +190,8 @@ public List<ScanMetricsHolder> getScanMetricsHolderList() {
return scanMetricsHolderList;
}


public boolean isRequestMetricsEnabled() {
return isRequestMetricsEnabled;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ public class ScanMetricsHolder {
private Object scan;

private static final ScanMetricsHolder NO_OP_INSTANCE =
new ScanMetricsHolder(new ReadMetricQueue(LogLevel.OFF), "",null);
new ScanMetricsHolder(new ReadMetricQueue(false,LogLevel.OFF), "",null);

public static ScanMetricsHolder getInstance(ReadMetricQueue readMetrics, String tableName,
Scan scan, LogLevel connectionLogLevel) {
if (connectionLogLevel == LogLevel.OFF) { return NO_OP_INSTANCE; }
if (connectionLogLevel == LogLevel.OFF && !readMetrics.isRequestMetricsEnabled()) { return NO_OP_INSTANCE; }
scan.setScanMetricsEnabled(true);
return new ScanMetricsHolder(readMetrics, tableName, scan);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class SpoolingMetricsHolder {

private final CombinableMetric spoolFileSizeMetric;
private final CombinableMetric numSpoolFileMetric;
public static final SpoolingMetricsHolder NO_OP_INSTANCE = new SpoolingMetricsHolder(new ReadMetricQueue(LogLevel.OFF), "");
public static final SpoolingMetricsHolder NO_OP_INSTANCE = new SpoolingMetricsHolder(new ReadMetricQueue(false,LogLevel.OFF), "");

public SpoolingMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
this.spoolFileSizeMetric = readMetrics.allotMetric(MetricType.SPOOL_FILE_SIZE, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TaskExecutionMetricsHolder {
private final CombinableMetric taskExecutionTime;
private final CombinableMetric numTasks;
private final CombinableMetric numRejectedTasks;
public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new TaskExecutionMetricsHolder(new ReadMetricQueue(LogLevel.OFF), "");
public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new TaskExecutionMetricsHolder(new ReadMetricQueue(false,LogLevel.OFF), "");

public TaskExecutionMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
taskQueueWaitTime = readMetrics.allotMetric(TASK_QUEUE_WAIT_TIME, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private void testSpooling(int threshold, long maxSizeSpool) throws Throwable {
MemoryManager memoryManager = new DelegatingMemoryManager(new GlobalMemoryManager(threshold));
ResultIterator scanner = new SpoolingResultIterator(
SpoolingMetricsHolder.NO_OP_INSTANCE,
new MemoryMetricsHolder(new ReadMetricQueue(LogLevel.OFF), ""), iterator, memoryManager, threshold,
new MemoryMetricsHolder(new ReadMetricQueue(false,LogLevel.OFF), ""), iterator, memoryManager, threshold,
maxSizeSpool, "/tmp");
AssertResults.assertResults(scanner, expectedResults);
}
Expand Down

0 comments on commit fc562a3

Please sign in to comment.