Samples showing how to use Pub/Sub Lite with Cloud Dataflow.
This sample shows how to create an Apache Beam streaming pipeline that reads messages from Pub/Sub Lite, group the messages using a fixed-sized windowing function, and writes them to Cloud Storage.
Resources needed for this example:
- A pair of Pub/Sub Lite topic and subscription.
- A Cloud Storage bucket.
- Enable the APIs: Cloud Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Pub/Sub Lite.
When you enable Cloud Dataflow, which uses Compute Engine, a default Compute Engine service account with the Editor role (
roles/editor
) is created.
-
You can skip this step if you are trying this example in a Google Cloud environment like Cloud Shell.
Otherwise, create a user-managed service account and grant it the following roles on your project:
roles/dataflow.admin
roles/pubsublite.viewer
roles/pubsublite.subscriber
roles/logging.viewer
Then create a service account key and point
GOOGLE_APPLICATION_CREDNETIALS
to your downloaded key file.
export GOOGLE_APPLICATION_CREDENTIALS=path/to/your/key/file
- Create a Cloud Storage bucket. Your bucket name needs to be globally unique.
export PROJECT_ID=$(gcloud config get-value project)
export BUCKET=your-gcs-bucket
gsutil mb gs://$BUCKET
- Create a Pub/Sub Lite topic and subscription. Set
LITE_LOCATION
to a Pub/Sub Lite location.
export TOPIC=your-lite-topic
export SUBSCRIPTION=your-lite-subscription
export LITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \
--zone=$LITE_LOCATION \
--partitions=1 \
--per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
--zone=$LITE_LOCATION \
--topic=$TOPIC
- Set
DATAFLOW_REGION
to a Dataflow region close to your Pub/Sub Lite location.
export DATAFLOW_REGION=your-dateflow-region
The following example runs a streaming pipeline. Choose DirectRunner
to test it locally or DataflowRunner
to run it on Dataflow.
--subscription
: the Pub/Sub Lite subscription to read messages from--output
: the full filepath of the output files--windowSize [optional]
: the window size in minutes, defaults to 1--runner [optional]
:DataflowRunner
orDirectRunner
--project [optional]
: your project ID, optional if usingDirectRunner
--region [optional]
: the Dataflow region, optional if usingDirectRunner
--tempLocation
: a Cloud Storage location for temporary files, optional if usingDirectRunner
Gradle:
gradle execute -Dexec.args="\
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--runner=DataflowRunner \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp"
Maven:
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args="\
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--runner=DataflowRunner \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp"
Publish some messages to your Lite topic. Then check for files in your Cloud Storage bucket.
gsutil ls "gs://$BUCKET/samples/output*"
With a metadata.md
, you can create a Dataflow Flex template. Custom Dataflow Flex templates can be shared. You can run them with different input parameters.
- Create a fat JAR. You should see
target/pubsublite-streaming-bundled-1.0.jar
as an output.
mvn clean package -DskipTests=true
ls -lh
- Provide names and locations for your template file and template container image.
export TEMPLATE_PATH="gs://$BUCKET/samples/pubsublite-to-gcs.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/pubsublite-to-gcs:latest"
- Build a custom Flex template.
gcloud dataflow flex-template build $TEMPLATE_PATH \
--image-gcr-path "$TEMPLATE_IMAGE" \
--sdk-language "JAVA" \
--flex-template-base-image JAVA11 \
--metadata-file "metadata.json" \
--jar "target/pubsublite-streaming-bundled-1.0.jar" \
--env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
- Run a job with the custom Flex template using
gcloud
or in Cloud Console.
Note: Pub/Sub Lite allows only one subscriber to pull messages from one partition. If your Pub/Sub Lite topic has only one partition and you use a subscription attached to that topic in more than one Dataflow jobs, only one of them will get messages.
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location "$TEMPLATE_PATH" \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region "$DATAFLOW_REGION"
-
Stop the pipeline. If you use
DirectRunner
,Ctrl+C
to cancel. If you useDataflowRunner
, click on the job you want to stop, then choose "Cancel". -
Delete the Lite topic and subscription.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscription delete $SUBSCRIPTION
- Delete the Cloud Storage objects:
gsutil -m rm -rf "gs://$BUCKET/samples/output*"
- Delete the template image in Cloud Registry and delete the Flex template if you have created them.
gcloud container images delete $TEMPLATE_IMAGE
gsutil rm $TEMPLATE_PATH
- Delete the Cloud Storage bucket:
gsutil rb "gs://$BUCKET"