Skip to content

Commit

Permalink
publish metrics using spark custom metrics API (GoogleCloudDataproc#1008
Browse files Browse the repository at this point in the history
)

* WIP

* adding custom metrics into new module

* addressing review comments

* updating readme-template and changes.md
  • Loading branch information
suryasoma authored Aug 15, 2023
1 parent 7287549 commit 977a0a7
Show file tree
Hide file tree
Showing 29 changed files with 548 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Issue #867: Support writing with RangePartitioning
* Issue #144: allow writing Spark String to BQ TIME type
* PR #1038: Logical plan now shows the BigQuery table of DirectBigQueryRelation. Thanks @idc101 !
* PR #1008: Adding support to expose bigquery metrics using Spark custom metrics API.

## 0.32.2 - 2023-08-07

Expand Down
44 changes: 44 additions & 0 deletions README-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,50 @@ To include the connector in your project:
libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "${next-release-tag}"
```

### Connector metrics and how to view them

Spark populates a lot of metrics which can be found by the end user in the spark history page. But all these metrics are spark related which are implicitly collected without any change from the connector.
But there are few metrics which are populated from the BigQuery and currently are visible in the application logs which can be read in the driver/executor logs.

From Spark 3.2 onwards, spark has provided the API to expose custom metrics in the spark UI page https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric/CustomMetric.html

Currently, using this API, connector exposes the following bigquery metrics during read
<table id="metricstable">
<style>
table#metricstable td, table th
{
word-break:break-word
}
</style>
<tr valign="top">
<th style="min-width:240px">Metric Name</th>
<th style="min-width:240px">Description</th>
</tr>
<tr valign="top">
<td><code>bytes read</code></td>
<td>number of BigQuery bytes read</td>
</tr>
<tr valign="top">
<td><code>rows read</code></td>
<td>number of BigQuery rows read</td>
</tr>
<tr valign="top">
<td><code>scan time</code></td>
<td>the amount of time spent between read rows response requested to obtained across all the executors, in milliseconds.</td>
</tr>
<tr valign="top">
<td><code>parse time</code></td>
<td>the amount of time spent for parsing the rows read across all the executors, in milliseconds.</td>
</tr>
<tr valign="top">
<td><code>spark time</code></td>
<td>the amount of time spent in spark to process the queries (i.e., apart from scanning and parsing), across all the executors, in milliseconds.</td>
</tr>
</table>


**Note:** To use the metrics in the Spark UI page, you need to make sure the `spark-bigquery-metrics-${next-release-tag}.jar` is the class path before starting the history-server and the connector version is `spark-3.2` or above.

## FAQ

### What is the Pricing for the Storage API?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,14 @@ public interface BigQueryStorageReadRowsTracer extends Serializable {
* @return A new tracer with the ID>
*/
BigQueryStorageReadRowsTracer forkWithPrefix(String id);

long getBytesRead();

long getRowsRead();

long getScanTimeInMilliSec();

long getParseTimeInMilliSec();

long getTimeInSparkInMilliSec();
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,17 @@ private void logData() {
jsonObject.addProperty("Parse Timings", format(parseTime));
jsonObject.addProperty("Time in Spark", difference(sparkTime, parseTime));
jsonObject.addProperty("Time waiting for service", format(serviceTime));
jsonObject.addProperty("Bytes/s", perSecond(serviceTime, bytes));
jsonObject.addProperty("Rows/s", perSecond(parseTime, rows));
jsonObject.addProperty("Bytes", bytes);
jsonObject.addProperty("Rows", rows);
jsonObject.addProperty("I/O time", serviceTime.getAccumulatedTime().toMillis());
jsonObject.addProperty("Bytes/s", perSecond(serviceTime, getBytesRead()));
jsonObject.addProperty("Rows/s", perSecond(parseTime, getRowsRead()));
jsonObject.addProperty("Bytes", getBytesRead());
jsonObject.addProperty("Rows", getRowsRead());
jsonObject.addProperty("I/O time", getScanTimeInMilliSec());
log.trace("Tracer Logs:{}", new Gson().toJson(jsonObject));
bigQueryMetrics.incrementBytesReadCounter(bytes);
bigQueryMetrics.incrementRowsReadCounter(rows);
bigQueryMetrics.updateScanTime(serviceTime.getAccumulatedTime().toMillis());
bigQueryMetrics.updateParseTime(parseTime.getAccumulatedTime().toMillis());
bigQueryMetrics.updateTimeInSpark(
sparkTime.getAccumulatedTime().minus(parseTime.getAccumulatedTime()).toMillis());
bigQueryMetrics.incrementBytesReadCounter(getBytesRead());
bigQueryMetrics.incrementRowsReadCounter(getRowsRead());
bigQueryMetrics.updateScanTime(getScanTimeInMilliSec());
bigQueryMetrics.updateParseTime(getParseTimeInMilliSec());
bigQueryMetrics.updateTimeInSpark(getTimeInSparkInMilliSec());
linesLogged++;
}

Expand All @@ -157,6 +156,31 @@ public BigQueryStorageReadRowsTracer forkWithPrefix(String id) {
"id-" + id + "-" + streamName, logIntervalPowerOf2, bigQueryMetrics);
}

@Override
public long getBytesRead() {
return bytes;
}

@Override
public long getRowsRead() {
return rows;
}

@Override
public long getScanTimeInMilliSec() {
return serviceTime.getAccumulatedTime().toMillis();
}

@Override
public long getParseTimeInMilliSec() {
return parseTime.getAccumulatedTime().toMillis();
}

@Override
public long getTimeInSparkInMilliSec() {
return sparkTime.getAccumulatedTime().minus(parseTime.getAccumulatedTime()).toMillis();
}

String getStreamName() {
return streamName;
}
Expand Down
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@
<module>spark-bigquery-dsv2/spark-bigquery-dsv2-common</module>
<module>spark-bigquery-dsv2/spark-bigquery-dsv2-parent</module>
<module>spark-bigquery-dsv2/spark-3.1-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-bigquery-metrics</module>
<module>spark-bigquery-dsv2/spark-3.2-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-3.2-bigquery</module>
<module>spark-bigquery-pushdown/spark-bigquery-pushdown-parent</module>
Expand All @@ -221,6 +222,7 @@
<module>spark-bigquery-dsv2/spark-bigquery-dsv2-common</module>
<module>spark-bigquery-dsv2/spark-bigquery-dsv2-parent</module>
<module>spark-bigquery-dsv2/spark-3.1-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-bigquery-metrics</module>
<module>spark-bigquery-dsv2/spark-3.2-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-3.3-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-3.3-bigquery</module>
Expand All @@ -240,6 +242,7 @@
<module>spark-bigquery-dsv2/spark-bigquery-dsv2-common</module>
<module>spark-bigquery-dsv2/spark-bigquery-dsv2-parent</module>
<module>spark-bigquery-dsv2/spark-3.1-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-bigquery-metrics</module>
<module>spark-bigquery-dsv2/spark-3.2-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-3.3-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-3.4-bigquery-lib</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,8 @@ public void testCustomAccessTokenProvider() throws Exception {
}

// additional tests are from the super-class

@Test
public void testNew() {
testReadWithOption();
}
}
1 change: 1 addition & 0 deletions spark-bigquery-dsv2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<module>spark-2.4-bigquery</module>
<module>spark-3.1-bigquery-lib</module>
<module>spark-3.1-bigquery</module>
<module>spark-bigquery-metrics</module>
<module>spark-3.2-bigquery-lib</module>
<module>spark-3.2-bigquery</module>
<module>spark-3.3-bigquery-lib</module>
Expand Down
5 changes: 5 additions & 0 deletions spark-bigquery-dsv2/spark-3.2-bigquery-lib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
</license>
</licenses>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>spark-bigquery-metrics</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>spark-3.1-bigquery-lib</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.google.cloud.spark.bigquery.v2;

import static com.google.cloud.spark.bigquery.v2.customMetrics.SparkBigQueryCustomMetricConstants.*;

import com.google.cloud.spark.bigquery.v2.context.InputPartitionReaderContext;
import com.google.cloud.spark.bigquery.v2.customMetrics.SparkBigQueryTaskMetric;
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Spark32BigQueryPartitionReader<T> extends BigQueryPartitionReader {

public Logger log = LoggerFactory.getLogger(this.getClass());
private InputPartitionReaderContext<T> context;

public Spark32BigQueryPartitionReader(InputPartitionReaderContext context) {
super(context);
this.context = context;
}

@Override
public CustomTaskMetric[] currentMetricsValues() {
log.trace("in current metric values");
return context
.getBigQueryStorageReadRowsTracer()
.map(
bigQueryStorageReadRowsTracer ->
new SparkBigQueryTaskMetric[] {
new SparkBigQueryTaskMetric(
BIG_QUERY_BYTES_READ_METRIC_NAME,
bigQueryStorageReadRowsTracer.getBytesRead()),
new SparkBigQueryTaskMetric(
BIG_QUERY_ROWS_READ_METRIC_NAME, bigQueryStorageReadRowsTracer.getRowsRead()),
new SparkBigQueryTaskMetric(
BIG_QUERY_SCAN_TIME_METRIC_NAME,
bigQueryStorageReadRowsTracer.getScanTimeInMilliSec()),
new SparkBigQueryTaskMetric(
BIG_QUERY_PARSE_TIME_METRIC_NAME,
bigQueryStorageReadRowsTracer.getParseTimeInMilliSec()),
new SparkBigQueryTaskMetric(
BIG_QUERY_TIME_IN_SPARK_METRIC_NAME,
bigQueryStorageReadRowsTracer.getTimeInSparkInMilliSec())
})
.orElse(new SparkBigQueryTaskMetric[] {});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.google.cloud.spark.bigquery.v2;

import com.google.cloud.spark.bigquery.v2.context.InputPartitionContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.vectorized.ColumnarBatch;

public class Spark32BigQueryPartitionReaderFactory implements PartitionReaderFactory {

@Override
public PartitionReader<InternalRow> createReader(InputPartition partition) {
InputPartitionContext<InternalRow> ctx = ((BigQueryInputPartition) partition).getContext();
return new Spark32BigQueryPartitionReader<>(ctx.createPartitionReaderContext());
}

@Override
public PartitionReader<ColumnarBatch> createColumnarReader(InputPartition partition) {
InputPartitionContext<ColumnarBatch> ctx = ((BigQueryInputPartition) partition).getContext();
return new Spark32BigQueryPartitionReader<>(ctx.createPartitionReaderContext());
}

@Override
public boolean supportColumnarReads(InputPartition partition) {
InputPartitionContext ctx = ((BigQueryInputPartition) partition).getContext();
return ctx.supportColumnarReads();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

import com.google.cloud.bigquery.connector.common.BigQueryUtil;
import com.google.cloud.spark.bigquery.v2.context.BigQueryDataSourceReaderContext;
import com.google.cloud.spark.bigquery.v2.customMetrics.*;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.metric.CustomMetric;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
import org.apache.spark.sql.sources.Filter;

Expand Down Expand Up @@ -51,4 +54,20 @@ public NamedReference[] filterAttributes() {
public void filter(Filter[] filters) {
ctx.filter(filters);
}

@Override
public CustomMetric[] supportedCustomMetrics() {
return new CustomMetric[] {
new SparkBigQueryBytesReadMetric(),
new SparkBigQueryRowsReadMetric(),
new SparkBigQueryScanTimeMetric(),
new SparkBigQueryParseTimeMetric(),
new SparkBigQueryTimeInSparkMetric()
};
}

@Override
public PartitionReaderFactory createReaderFactory() {
return new Spark32BigQueryPartitionReaderFactory();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ public ColumnarBatch get() {
return currentBatch;
}

@Override
public Optional<BigQueryStorageReadRowsTracer> getBigQueryStorageReadRowsTracer() {
return Optional.of(tracer);
}

public void close() throws IOException {
closed = true;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package com.google.cloud.spark.bigquery.v2.context;

import com.google.cloud.bigquery.connector.common.BigQueryStorageReadRowsTracer;
import com.google.cloud.bigquery.connector.common.ReadRowsHelper;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.spark.bigquery.ReadRowsResponseToInternalRowIteratorConverter;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;
import org.apache.spark.sql.catalyst.InternalRow;

class BigQueryInputPartitionReaderContext implements InputPartitionReaderContext<InternalRow> {
Expand Down Expand Up @@ -58,6 +60,11 @@ public InternalRow get() {
return currentRow;
}

@Override
public Optional<BigQueryStorageReadRowsTracer> getBigQueryStorageReadRowsTracer() {
return Optional.empty();
}

@Override
public void close() throws IOException {
readRowsHelper.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package com.google.cloud.spark.bigquery.v2.context;

import com.google.cloud.bigquery.connector.common.BigQueryStorageReadRowsTracer;
import java.io.IOException;
import java.util.Optional;
import org.apache.spark.sql.catalyst.InternalRow;

class EmptyProjectionInputPartitionReaderContext
Expand All @@ -40,6 +42,11 @@ public InternalRow get() {
return InternalRow.empty();
}

@Override
public Optional<BigQueryStorageReadRowsTracer> getBigQueryStorageReadRowsTracer() {
return Optional.empty();
}

@Override
public void close() throws IOException {
// empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
*/
package com.google.cloud.spark.bigquery.v2.context;

import com.google.cloud.bigquery.connector.common.BigQueryStorageReadRowsTracer;
import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;

public interface InputPartitionReaderContext<T> extends Closeable {

boolean next() throws IOException;

T get();

Optional<BigQueryStorageReadRowsTracer> getBigQueryStorageReadRowsTracer();
}
Loading

0 comments on commit 977a0a7

Please sign in to comment.