Skip to content

Commit

Permalink
Publishing write session metrics from connector (GoogleCloudDataproc#…
Browse files Browse the repository at this point in the history
  • Loading branch information
isha97 authored May 6, 2024
1 parent e2b8cd2 commit c5a18ab
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ private TableInfo materializeTable(String querySql, TempTableBuilder tmpTableBui
}
}

public void loadDataIntoTable(
public JobStatistics.LoadStatistics loadDataIntoTable(
LoadDataOptions options,
List<String> sourceUris,
FormatOptions formatOptions,
Expand Down Expand Up @@ -796,6 +796,7 @@ public void loadDataIntoTable(
"Done loading to {}. jobId: {}",
BigQueryUtil.friendlyTableName(options.getTableId()),
finishedJob.getJobId());
return finishedJob.getStatistics();
}
} catch (Exception e) {
if (finishedJob == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class BigQueryDirectDataWriterHelper {
private long appendRequestRowCount = 0; // number of rows waiting for the next append request
private long appendRequestSizeBytes = 0; // number of bytes waiting for the next append request
private long writeStreamRowCount = 0; // total offset / rows of the current write-stream
private long writeStreamTotalBytes = 0; // total bytes written by the current write-stream

private final ExecutorService appendRowsExecutor = Executors.newSingleThreadExecutor();
private final Queue<ApiFuture<AppendRowsResponse>> appendRowsFuturesQueue = new LinkedList<>();
Expand Down Expand Up @@ -266,6 +267,7 @@ private void sendAppendRowsRequest() throws IOException {
appendRowsFuturesQueue.add(validatedAppendRowsFuture);
clearProtoRows();
this.writeStreamRowCount += appendRequestRowCount;
this.writeStreamTotalBytes += appendRequestSizeBytes;
this.appendRequestRowCount = 0;
this.appendRequestSizeBytes = 0;
}
Expand Down Expand Up @@ -310,7 +312,7 @@ private ApiFuture<AppendRowsResponse> validateAppendRowsResponse(
* the expected offset (which is equal to the number of rows appended thus far).
* @see this#writeStreamRowCount
*/
public long finalizeStream() throws IOException {
public WriteStreamStatistics finalizeStream() throws IOException {
if (this.protoRows.getSerializedRowsCount() != 0) {
sendAppendRowsRequest();
}
Expand All @@ -320,6 +322,7 @@ public long finalizeStream() throws IOException {
appendRowsExecutor.shutdown();
}
long responseFinalizedRowCount = writeStreamRowCount;
long responseFinalizedBytesWritten = writeStreamTotalBytes;

if (!this.writeAtLeastOnce) {
waitBeforeFinalization();
Expand Down Expand Up @@ -347,7 +350,7 @@ public long finalizeStream() throws IOException {

clean();

return responseFinalizedRowCount;
return new WriteStreamStatistics(responseFinalizedRowCount, responseFinalizedBytesWritten);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.google.cloud.bigquery.connector.common;

public class WriteStreamStatistics {
private final long rowCount;
private final long bytesWritten;

public WriteStreamStatistics(long rowCount, long bytesWritten) {
this.rowCount = rowCount;
this.bytesWritten = bytesWritten;
}

public long getRowCount() {
return rowCount;
}

public long getBytesWritten() {
return bytesWritten;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package com.google.cloud.spark.bigquery.metrics;

import com.google.cloud.spark.bigquery.SparkBigQueryConfig;
import java.lang.reflect.Method;
import java.util.Optional;
import org.apache.spark.SparkContext;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.slf4j.Logger;
Expand Down Expand Up @@ -60,6 +62,66 @@ public static void postConnectorVersion(SparkContext sparkContext, String connec
}
}

public static void postWriteSessionMetrics(
long timestamp,
SparkBigQueryConfig.WriteMethod writeMethod,
long bytesWritten,
Optional<SparkBigQueryConfig.IntermediateFormat> intermediateDataFormat,
SparkContext sparkContext) {
try {
// Reflection is used here to load classes in spark events jar which is google
// internal and not available during complie time.
Class<?> eventBuilderClass =
Class.forName(
"com.google.cloud.spark.events.SparkBigQueryConnectorWriteEvent$SparkBigQueryConnectorWriteEventBuilder");

Object builderInstance =
eventBuilderClass.getDeclaredConstructor(long.class).newInstance(timestamp);
eventBuilderClass
.getMethod("setBytesWritten", long.class)
.invoke(builderInstance, bytesWritten);

if (intermediateDataFormat.isPresent()) {
Class<?> dataFormatEnum = Class.forName("com.google.cloud.spark.events.DataFormat");
Object[] dataFormatEnumConstants = dataFormatEnum.getEnumConstants();
Object generatedDataFormatEnumValue = dataFormatEnumConstants[0];
for (Object constant : dataFormatEnumConstants) {
Method nameMethod = constant.getClass().getMethod("name");
String name = (String) nameMethod.invoke(constant);
if (name.equalsIgnoreCase(intermediateDataFormat.get().getDataSource())) {
generatedDataFormatEnumValue = constant;
break;
}
}
eventBuilderClass
.getMethod("setIntermediateDataFormat", dataFormatEnum)
.invoke(builderInstance, generatedDataFormatEnumValue);
}

Class<?> dataWriteMethodEnum = Class.forName("com.google.cloud.spark.events.DataWriteMethod");
Object[] dataWriteMethodConstants = dataWriteMethodEnum.getEnumConstants();
Object generatedDataWriteMethodEnumValue = dataWriteMethodConstants[0];
for (Object constant : dataWriteMethodConstants) {
Method nameMethod = constant.getClass().getMethod("name");
String name = (String) nameMethod.invoke(constant);
if (name.equalsIgnoreCase(writeMethod.toString())) {
generatedDataWriteMethodEnumValue = constant;
break;
}
}
eventBuilderClass
.getMethod("setWriteMethod", dataWriteMethodEnum)
.invoke(builderInstance, generatedDataWriteMethodEnumValue);

Method buildMethod = eventBuilderClass.getDeclaredMethod("build");

sparkContext.listenerBus().post((SparkListenerEvent) buildMethod.invoke(builderInstance));

} catch (ReflectiveOperationException ex) {
logger.debug("spark.events.SparkBigQueryConnectorWriteEvent library not in class path");
}
}

public static String getAccumulatorNameForMetric(String metricName, String sessionName) {
return String.format("%s-%s", sessionName, metricName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
Expand All @@ -32,6 +33,7 @@
import com.google.cloud.spark.bigquery.SparkBigQueryConfig;
import com.google.cloud.spark.bigquery.SparkBigQueryUtil;
import com.google.cloud.spark.bigquery.SupportedCustomDataType;
import com.google.cloud.spark.bigquery.metrics.SparkBigQueryConnectorMetricsUtils;
import com.google.cloud.spark.bigquery.util.HdfsUtils;
import com.google.common.collect.Streams;
import java.io.IOException;
Expand All @@ -44,6 +46,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
Expand All @@ -70,6 +73,7 @@ public class BigQueryWriteHelper {
private final JobInfo.WriteDisposition writeDisposition;

private Optional<TableId> temporaryTableId = Optional.empty();
private final SparkContext sparkContext;

public BigQueryWriteHelper(
BigQueryClient bigQueryClient,
Expand Down Expand Up @@ -99,6 +103,7 @@ public BigQueryWriteHelper(
}
this.tableSchema = schema;
this.writeDisposition = SparkBigQueryUtil.saveModeToWriteDisposition(saveMode);
this.sparkContext = sqlContext.sparkContext();
}

public void writeDataFrameToBigQuery() {
Expand Down Expand Up @@ -165,13 +170,22 @@ void loadDataToBigQuery() throws IOException {
JobInfo.WriteDisposition writeDisposition =
SparkBigQueryUtil.saveModeToWriteDisposition(saveMode);
TableId destinationTableId = temporaryTableId.orElse(config.getTableId());
bigQueryClient.loadDataIntoTable(
config,
optimizedSourceUris,
formatOptions,
writeDisposition,
Optional.of(tableSchema),
destinationTableId);
JobStatistics.LoadStatistics loadStatistics =
bigQueryClient.loadDataIntoTable(
config,
optimizedSourceUris,
formatOptions,
writeDisposition,
Optional.of(tableSchema),
destinationTableId);

long currentTimeMillis = System.currentTimeMillis();
SparkBigQueryConnectorMetricsUtils.postWriteSessionMetrics(
currentTimeMillis,
SparkBigQueryConfig.WriteMethod.INDIRECT,
loadStatistics.getOutputBytes(),
Optional.of(config.getIntermediateFormat()),
sparkContext);
}

String friendlyTableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public void configure(Binder binder) {
@Singleton
@Provides
public BigQueryDirectDataSourceWriterContext provideDirectDataSourceWriterContext(
BigQueryClient bigQueryClient, BigQueryClientFactory bigQueryWriteClientFactory) {
BigQueryClient bigQueryClient,
BigQueryClientFactory bigQueryWriteClientFactory,
SparkSession spark) {
TableId tableId = tableConfig.getTableId();
RetrySettings bigqueryDataWriteHelperRetrySettings =
tableConfig.getBigqueryDataWriteHelperRetrySettings();
Expand All @@ -79,7 +81,8 @@ public BigQueryDirectDataSourceWriterContext provideDirectDataSourceWriterContex
SaveMode
.Overwrite), // writeAtLeastOnce mode is currently not supported in OverWrite
// mode.
tableConfig.getPartitionOverwriteModeValue());
tableConfig.getPartitionOverwriteModeValue(),
spark.sparkContext());
}

@Singleton
Expand Down Expand Up @@ -108,6 +111,7 @@ public BigQueryIndirectDataSourceWriterContext provideIndirectDataSourceWriterCo
writeUUID,
mode,
gcsPath,
intermediateDataCleaner);
intermediateDataCleaner,
spark.sparkContext());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@
import com.google.cloud.spark.bigquery.PartitionOverwriteMode;
import com.google.cloud.spark.bigquery.SchemaConverters;
import com.google.cloud.spark.bigquery.SchemaConvertersConfiguration;
import com.google.cloud.spark.bigquery.SparkBigQueryConfig;
import com.google.cloud.spark.bigquery.metrics.SparkBigQueryConnectorMetricsUtils;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
Expand Down Expand Up @@ -77,6 +80,7 @@ enum WritingMode {
}

private WritingMode writingMode = WritingMode.ALL_ELSE;
private final SparkContext sparkContext;

public BigQueryDirectDataSourceWriterContext(
BigQueryClient bigQueryClient,
Expand All @@ -92,7 +96,8 @@ public BigQueryDirectDataSourceWriterContext(
SchemaConvertersConfiguration schemaConvertersConfiguration,
java.util.Optional<String> destinationTableKmsKeyName,
boolean writeAtLeastOnce,
PartitionOverwriteMode overwriteMode)
PartitionOverwriteMode overwriteMode,
SparkContext sparkContext)
throws IllegalArgumentException {
this.bigQueryClient = bigQueryClient;
this.writeClientFactory = bigQueryWriteClientFactory;
Expand All @@ -106,6 +111,7 @@ public BigQueryDirectDataSourceWriterContext(
this.schemaConvertersConfiguration = schemaConvertersConfiguration;
this.destinationTableKmsKeyName = Optional.fromJavaUtil(destinationTableKmsKeyName);
this.writeAtLeastOnce = writeAtLeastOnce;
this.sparkContext = sparkContext;
Schema bigQuerySchema =
SchemaConverters.from(this.schemaConvertersConfiguration).toBigQuerySchema(sparkSchema);
try {
Expand Down Expand Up @@ -218,11 +224,12 @@ public void commit(WriterCommitMessageContext[] messages) {
"BigQuery DataSource writer {} committed with messages:\n{}",
writeUUID,
Arrays.toString(messages));

long bytesWritten = 0;
if (!writeAtLeastOnce) {
BatchCommitWriteStreamsRequest.Builder batchCommitWriteStreamsRequest =
BatchCommitWriteStreamsRequest.newBuilder().setParent(tablePathForBigQueryStorage);
for (WriterCommitMessageContext message : messages) {
bytesWritten += ((BigQueryDirectWriterCommitMessageContext) message).getBytesWritten();
batchCommitWriteStreamsRequest.addWriteStreams(
((BigQueryDirectWriterCommitMessageContext) message).getWriteStreamName());
}
Expand Down Expand Up @@ -263,6 +270,13 @@ public void commit(WriterCommitMessageContext[] messages) {
updatedTableInfo.setLabels(tableLabels);
bigQueryClient.update(updatedTableInfo.build());
}
long currentTimeMillis = System.currentTimeMillis();
SparkBigQueryConnectorMetricsUtils.postWriteSessionMetrics(
currentTimeMillis,
SparkBigQueryConfig.WriteMethod.DIRECT,
bytesWritten,
java.util.Optional.empty(),
sparkContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.bigquery.connector.common.BigQueryClientFactory;
import com.google.cloud.bigquery.connector.common.BigQueryConnectorException;
import com.google.cloud.bigquery.connector.common.BigQueryDirectDataWriterHelper;
import com.google.cloud.bigquery.connector.common.WriteStreamStatistics;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -107,14 +108,22 @@ public void write(InternalRow record) throws IOException {
public WriterCommitMessageContext commit() throws IOException {
logger.debug("Data Writer {} finalizeStream()", partitionId);

long rowCount = writerHelper.finalizeStream();
WriteStreamStatistics stats = writerHelper.finalizeStream();
String writeStreamName = writerHelper.getWriteStreamName();

logger.debug(
"Data Writer {}'s write-stream has finalized with row count: {}", partitionId, rowCount);
"Data Writer {}'s write-stream has finalized with row count: {}",
partitionId,
stats.getRowCount());

return new BigQueryDirectWriterCommitMessageContext(
writeStreamName, partitionId, taskId, epochId, tablePath, rowCount);
writeStreamName,
partitionId,
taskId,
epochId,
tablePath,
stats.getRowCount(),
stats.getBytesWritten());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,23 @@ public class BigQueryDirectWriterCommitMessageContext implements WriterCommitMes
private final long epochId;
private final String tablePath;
private final long rowCount;
private final long bytesWritten;

public BigQueryDirectWriterCommitMessageContext(
String writeStreamName /*List<String> writeStreamNames*/,
int partitionId,
long taskId,
long epochId,
String tablePath,
long rowCount) {
long rowCount,
long bytesWritten) {
this.writeStreamName = writeStreamName;
this.partitionId = partitionId;
this.taskId = taskId;
this.epochId = epochId;
this.tablePath = tablePath;
this.rowCount = rowCount;
this.bytesWritten = bytesWritten;
}

public String getWriteStreamName() {
Expand All @@ -64,6 +67,10 @@ public long getRowCount() {
return rowCount;
}

public long getBytesWritten() {
return bytesWritten;
}

@Override
public String toString() {
return "BigQueryWriterCommitMessage{"
Expand Down
Loading

0 comments on commit c5a18ab

Please sign in to comment.