Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#461 from prathapreddy123/strea…
Browse files Browse the repository at this point in the history
…mingbenchmark

Include eventid and eventtimestamp fields as pubsub attributes
  • Loading branch information
ryanmcdowell authored May 15, 2020
2 parents 704af4c + 5dbc816 commit f8d0739
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 46 deletions.
9 changes: 9 additions & 0 deletions examples/dataflow-streaming-benchmark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ instruct the data generator of what type of fake data to create in each field. S
json-data-generator [docs](https://github.com/vincentrussell/json-data-generator) for more
information on the faker functions.

#### Message Attributes
If the message schema contains fields matching (case-insensitive) the following names then such fields
will be added to the output Pub/Sub message attributes:
eventId, eventTimestamp

Attribute fields can be helpful in various scenarios like deduping messages, inspecting message timestamps etc

#### Example Schema File
Below is an example schema file which generates fake game event payloads with random data.
```javascript
Expand Down Expand Up @@ -71,6 +78,8 @@ Pub/Sub topic.
"completed": false
}
```
Since the schema includes the reserved field names of `eventId` and `eventTimestamp`, the output Pub/Sub
message will also contain these fields in the message attributes in addition to the regular payload.

### Executing the Pipeline
```bash
Expand Down
10 changes: 9 additions & 1 deletion examples/dataflow-streaming-benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@

<!-- Dependency properties -->
<checkstyle.version>8.29</checkstyle.version>
<beam.version>2.6.0</beam.version>
<beam.version>2.19.0</beam.version>
<gson.version>2.7</gson.version>
<hamcrest.version>1.3</hamcrest.version>
<java.version>1.8</java.version>
<j2v8.version>4.8.0</j2v8.version>
Expand Down Expand Up @@ -84,6 +85,13 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<!-- As of beam 2.19 if this exclusion was not added causing issue java.lang.NoClassDefFoundError: io/opencensus/trace/Tracestate
Refer https://issues.apache.org/jira/browse/BEAM-9304. Will be fixed in beam 2.20
-->
<exclusion>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,41 @@
import com.github.vincentrussell.json.datagenerator.JsonDataGenerator;
import com.github.vincentrussell.json.datagenerator.JsonDataGeneratorException;
import com.github.vincentrussell.json.datagenerator.impl.JsonDataGeneratorImpl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.joda.time.Duration;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

/**
* The {@link StreamingBenchmark} is a streaming pipeline which generates messages at a specified
Expand Down Expand Up @@ -73,23 +86,28 @@
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.pso.pipeline.StreamingBenchmark \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --runner=${RUNNER} \
* --zone=us-east1-d \
* --autoscalingAlgorithm=THROUGHPUT_BASED \
* --maxNumWorkers=5 \
* --qps=50000 \
* --schemaLocation=gs://<bucket>/<path>/<to>/game-event-schema \
* --topic=projects/<project-id>/topics/<topic-id>"
* * -Dexec.mainClass=com.google.cloud.pso.pipeline.StreamingBenchmark \
* * -Dexec.cleanupDaemonThreads=false \
* * -Dexec.args=" \
* * --project=${PROJECT_ID} \
* * --stagingLocation=${PIPELINE_FOLDER}/staging \
* * --tempLocation=${PIPELINE_FOLDER}/temp \
* * --runner=${RUNNER} \
* * --zone=us-east1-d \
* * --autoscalingAlgorithm=THROUGHPUT_BASED \
* * --maxNumWorkers=5 \
* * --qps=50000 \
* * --validateSchema=true \
* * --schemaLocation=gs://<bucket>/<path>/<to>/game-event-schema \
* * --topic=projects/<project-id>/topics/<topic-id>"
* </pre>
*/
public class StreamingBenchmark {

private static final Logger LOG = LoggerFactory.getLogger(StreamingBenchmark.class);
private static final ImmutableList<String> ATTRIBUTES_SCHEMA_FIELDS =
ImmutableList.of("eventid", "eventtimestamp");

/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
Expand All @@ -107,13 +125,28 @@ public interface Options extends PipelineOptions {

void setSchemaLocation(String value);

@Description("The path to the schema to generate.")
@Required
@Default.Boolean(true)
Boolean getValidateSchema();

void setValidateSchema(Boolean value);

@Description("The Pub/Sub topic to write to.")
@Required
String getTopic();

void setTopic(String value);
}

/** Class {@link MalformedSchemaException} captures json schema syntax related exceptions */
static class MalformedSchemaException extends Exception {

public MalformedSchemaException(String message) {
super(message);
}
}

/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
Expand All @@ -123,11 +156,7 @@ public interface Options extends PipelineOptions {
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(Options.class);

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}

Expand Down Expand Up @@ -155,7 +184,10 @@ public static PipelineResult run(Options options) {
.apply(
"Trigger",
GenerateSequence.from(0L).withRate(options.getQps(), Duration.standardSeconds(1L)))
.apply("GenerateMessages", ParDo.of(new MessageGeneratorFn(options.getSchemaLocation())))
.apply(
"GenerateMessages",
ParDo.of(
new MessageGeneratorFn(options.getSchemaLocation(), options.getValidateSchema())))
.apply("WriteToPubsub", PubsubIO.writeMessages().to(options.getTopic()));

return pipeline.run();
Expand All @@ -172,16 +204,67 @@ static class MessageGeneratorFn extends DoFn<Long, PubsubMessage> {

private final String schemaLocation;
private String schema;
private List<String> attributeFields = new ArrayList<>();
private final boolean shouldValidateSchema;
private boolean includeAttributeValues = false;

private JsonParser jsonParser;

// Not initialized inline or constructor because {@link JsonDataGenerator} is not serializable.
private transient JsonDataGenerator dataGenerator;

MessageGeneratorFn(String schemaLocation) {
MessageGeneratorFn(String schemaLocation, boolean shouldValidateSchema) {
this.schemaLocation = schemaLocation;
this.shouldValidateSchema = shouldValidateSchema;
}

// Leave the scope as package private since its referred inside test methods for assertions
@VisibleForTesting
List<String> getAttributeFields() {
return this.attributeFields;
}

/**
* Validate Schema and checks for presence of attribute schema fields
*
* @param dataGenerator The execution options.
* @param schema The execution options.
* @throws IOException, MalformedSchemaException
*/
private void validateSchema(JsonDataGenerator dataGenerator, String schema)
throws IOException, MalformedSchemaException {

String payload;
JsonObject jsonObject;

try {
// Generate sample message based on the provided schema.
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
dataGenerator.generateTestDataJson(schema, byteArrayOutputStream);
payload = byteArrayOutputStream.toString();
}

LOG.info("Validating sample message: {}", payload);
jsonObject = new JsonParser().parse(payload).getAsJsonObject();
} catch (JsonDataGeneratorException | JsonSyntaxException ex) {
throw new MalformedSchemaException(
String.format("Invalid schema format. Error:%s ", ex.getMessage()));
}

for (Map.Entry<String, JsonElement> property : jsonObject.entrySet()) {
if (ATTRIBUTES_SCHEMA_FIELDS.contains(property.getKey().toLowerCase())) {
this.attributeFields.add(property.getKey());
}
}

// Set these attributes appropriately to minimize per element processing costs inside the
// process method
this.includeAttributeValues = this.attributeFields.size() > 0;
this.jsonParser = this.includeAttributeValues ? new JsonParser() : null;
}

@Setup
public void setup() throws IOException {
public void setup() throws IOException, MalformedSchemaException {
dataGenerator = new JsonDataGeneratorImpl();
Metadata metadata = FileSystems.matchSingleFileSpec(schemaLocation);

Expand All @@ -194,24 +277,33 @@ public void setup() throws IOException {
}

schema = byteArrayOutputStream.toString();
if (this.shouldValidateSchema) {
this.validateSchema(dataGenerator, schema);
}
}
}

@ProcessElement
public void processElement(ProcessContext context)
throws IOException, JsonDataGeneratorException {

// TODO: Add the ability to place eventId and eventTimestamp in the attributes.
byte[] payload;
Map<String, String> attributes = Maps.newHashMap();

// Generate the fake JSON according to the schema.
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
dataGenerator.generateTestDataJson(schema, byteArrayOutputStream);

payload = byteArrayOutputStream.toByteArray();
}

if (this.includeAttributeValues) {
// Build a json Object, extract fields and include them in attributes
JsonObject jsonObject =
this.jsonParser.parse(new String(payload, StandardCharsets.UTF_8)).getAsJsonObject();
for (String field : attributeFields) {
attributes.put(field, jsonObject.get(field).getAsString());
}
}

context.output(new PubsubMessage(payload, attributes));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,5 @@
* limitations under the License.
*/

/**
* All Dataflow pipelines contained within the project.
*/
/** All Dataflow pipelines contained within the project. */
package com.google.cloud.pso.pipeline;
Loading

0 comments on commit f8d0739

Please sign in to comment.