forked from GoogleCloudPlatform/professional-services
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
GCS to BQ using server-less services (GoogleCloudPlatform#769)
- Loading branch information
Showing
8 changed files
with
712 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
# Ingesting GCS files to BigQuery using Cloud Functions and Serverless Spark | ||
|
||
In this solution, we build an approch to ingestion flat files (in GCS) to BigQuery using serverless technology. This solution might be not be performanct if you have frequent small files that lands to GCP. We use [Daily Shelter Occupancy](https://open.toronto.ca/dataset/daily-shelter-occupancy/) data in this example. Figure below shows an overall approach. Once the object is uploaded in the GCS bucket, a object notification is recevied by the Pub/Sub Topic. Pub/Sub Topic triggers a cloud function which then invokes the serverless spark. Any error during the cloud function invocation and serverless spark execution is send to dead letter topic. | ||
|
||
![](docs/gcs2bq_serverless_spark.jpg) | ||
|
||
|
||
- **Step 1:** Create a bucket, the bucket holds the data to be ingested in GCP. Once the object is upload in a bucket, the notification is created in Pub/Sub topic. | ||
|
||
``` | ||
PROJECT_ID=<<project_id>> | ||
GCS_BUCKET_NAME=<<Bucket name>> | ||
gsutil mb gs://${GCS_BUCKET_NAME} | ||
gsutil notification create \ | ||
-t projects/${PROJECT_ID}/topics/create_notification_${GCS_BUCKET_NAME} \ | ||
-e OBJECT_FINALIZE \ | ||
-f json gs://${GCS_BUCKET_NAME} | ||
``` | ||
- **Step 2:** Build and copy jar to a GCS bucket(Create a GCS bucket to store the jar if you dont have one). There are number of dataproce templates that are avaliable to [use](https://github.com/GoogleCloudPlatform/dataproc-templates). | ||
``` | ||
GCS_ARTIFACT_REPO=<<artifact repo name>> | ||
gsutil mb gs://${GCS_ARTIFACT_REPO} | ||
cd gcs2bq-spark | ||
mvn clean install | ||
gsutil cp target/GCS2BQWithSpark-1.0-SNAPSHOT.jar gs://${GCS_ARTIFACT_REPO}/ | ||
``` | ||
- **Step 3:** [The page](https://cloud.google.com/dataproc-serverless/docs/concepts/network) describe the network configuration required to run serverless spark | ||
- **Open subnet connectivity:** The subnet must allow subnet communication on all ports. The following gcloud command attaches a network firewall to a subnet that allows ingress communications using all protocols on all ports if the source and destination are tagged with "serverless-spark" | ||
``` | ||
gcloud compute firewall-rules create allow-internal-ingress \ | ||
--network="default" \ | ||
--source-tags="serverless-spark" \ | ||
--target-tags="serverless-spark" \ | ||
--direction="ingress" \ | ||
--action="allow" \ | ||
--rules="all" | ||
```` | ||
- **Private Google Access:** The subnet must have [Private Google Access](https://cloud.google.com/vpc/docs/configure-private-google-access) enabled. | ||
- External network access. Drivers and executors have internal IP addresses. You can set up [Cloud NAT](https://cloud.google.com/nat/docs/overview) to allow outbound traffic using internal IPs on your VPC network. | ||
- **Step 4:** Create necessary GCP resources required by Serverless Spark | ||
- **Create BQ Dataset** Create a dataset to load GCS files. | ||
``` | ||
DATASET_NAME=<<dataset_name>> | ||
bq --location=US mk -d \ | ||
${DATASET_NAME} | ||
``` | ||
- **Create BQ table** Create a table using the schema in `schema/schema.json` | ||
``` | ||
TABLE_NAME=<<table_name>> | ||
bq mk --table ${PROJECT_ID}:${DATASET_NAME}.${TABLE_NAME} \ | ||
./schema/schema.json | ||
``` | ||
- **Create service account** Create service acccount used run the service account. We also create the permission required to read from GCS bucket, write to BigQuery table and publish error message in deadletter queue. The service account is used to run the serverless spark, so it needs dataproc worker role as well. | ||
``` | ||
SERVICE_ACCOUNT_ID="gcs-to-bq-sa" | ||
gcloud iam service-accounts create ${SERVICE_ACCOUNT_ID} \ | ||
--description="GCS to BQ service account for Serverless Spark" \ | ||
--display-name="GCS2BQ-SA" | ||
roles=("roles/dataproc.worker" "roles/bigquery.dataEditor" "roles/bigquery.jobUser" "roles/storage.objectViewer" "roles/pubsub.publisher") | ||
for role in ${roles[@]}; do | ||
gcloud projects add-iam-policy-binding ${PROJECT_ID} \ | ||
--member="serviceAccount:${SERVICE_ACCOUNT_ID}@${PROJECT_ID}.iam.gserviceaccount.com" \ | ||
--role="$role" | ||
done | ||
``` | ||
- **Create BQ temp Bucket** GCS to BigQuery requires a temporary bucket. Lets create a temporary bucket | ||
``` | ||
GCS_TEMP_BUCKET=<<temp_bucket>> | ||
gsutil mb gs://${GCS_TEMP_BUCKET} | ||
``` | ||
- **Create Deadletter Topic and Subscription** Lets create a dead letter topic and subscription | ||
``` | ||
ERROR_TOPIC=err_gcs2bq_${GCS_BUCKET_NAME} | ||
gcloud pubsub topics create $ERROR_TOPIC | ||
gcloud pubsub subscriptions create err_sub_${GCS_BUCKET_NAME}} \ | ||
--topic=${ERROR_TOPIC} | ||
``` | ||
Once all resources are create, please change the varaibles value () in `trigger-serverless-spark-fxn/main.py` from line 25 to 29 | ||
``` | ||
bq_temp_bucket = <<GCS_TEMP_BUCKET>> | ||
gcs_artifact_rep = <<GCS_ARTIFACT_REPO>> | ||
dataset= <<DATASET_NAME>> | ||
bq_table = <<TABLE_NAME>> | ||
error_topic=<<ERROR_TOPIC>> | ||
``` | ||
- **Step 5:** The cloud function is triggered once the object is copied to bucket. The cloud function triggers the Servereless spark | ||
Deploy the function. | ||
``` | ||
cd trigger-serverless-spark-fxn | ||
gcloud functions deploy trigger-serverless-spark-fxn --entry-point \ | ||
invoke_sreverless_spark --runtime python37 \ | ||
--trigger-resource ${GCS_BUCKET_NAME}_create_notification \ | ||
--trigger-event google.pubsub.topic.publish | ||
``` | ||
- **Step 6:** Invoke the end-to-end pipeline. Download [2020 Daily Center Data](https://ckan0.cf.opendata.inter.prod-toronto.ca/download_resource/800cc97f-34b3-4d4d-9bc1-6e2ce2d6f44a?format=csv) and upload to the GCS bucket(<<GCS_BUCKET_NAME>>) in Step 1. | ||
**Debugging Pipelines** | ||
Error message for the failed data pipelines are publish to Pub/Sub topic (ERROR_TOPIC) created in Step 4(Create Deadletter Topic and Subscription). The errors from both cloud function and spark are forwarded to Pub/Sub. Pub/Sub topic might have multiple entry for the same data-pipeline instance. Messages in Pub/Sub topic can be filtered using "oid" attribute. The attribute(oid) is unique for each pipeline run and holds full object name with the [generation id](https://cloud.google.com/storage/docs/metadata#generation-number). | ||
Binary file added
BIN
+36 KB
examples/gcs-to-bq-serverless-services/docs/gcs2bq_serverless_spark.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
224 changes: 224 additions & 0 deletions
224
examples/gcs-to-bq-serverless-services/gcs2bq-spark/pom.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
Copyright (C) 2022 Google Inc. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>org.example</groupId> | ||
<artifactId>GCS2BQWithSpark</artifactId> | ||
<version>1.0-SNAPSHOT</version> | ||
|
||
<properties> | ||
<maven.compiler.source>11</maven.compiler.source> | ||
<maven.compiler.target>11</maven.compiler.target> | ||
<java.version>1.8</java.version> | ||
<spark.version>3.1.0</spark.version> | ||
<scala.binary.version>2.12</scala.binary.version> | ||
<project.version>3.1</project.version> | ||
<junit-platform.version>5.7.2</junit-platform.version> | ||
<spotless-maven-plugin.version>2.1.0</spotless-maven-plugin.version> | ||
</properties> | ||
|
||
<dependencies> | ||
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-core_2.12</artifactId> | ||
<version>${spark.version}</version> | ||
<scope>provided</scope> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-log4j12</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>log4j</groupId> | ||
<artifactId>log4j</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-api</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>jul-to-slf4j</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>jcl-over-slf4j</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>ch.qos.logback</groupId> | ||
<artifactId>logback-classic</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
|
||
<dependency> <!-- Spark dependency --> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-sql_2.12</artifactId> | ||
<version>${spark.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>ch.qos.logback</groupId> | ||
<artifactId>logback-classic</artifactId> | ||
<version>1.2.6</version> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-log4j12</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>log4j</groupId> | ||
<artifactId>log4j</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-api</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter</artifactId> | ||
<version>${junit-platform.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.google.cloud.spark</groupId> | ||
<artifactId>spark-bigquery-with-dependencies_2.12</artifactId> | ||
<version>0.22.0</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>info.picocli</groupId> | ||
<artifactId>picocli</artifactId> | ||
<version>4.6.2</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.google.cloud</groupId> | ||
<artifactId>google-cloud-pubsub</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.google.cloud</groupId> | ||
<artifactId>google-cloud-storage</artifactId> | ||
</dependency> | ||
</dependencies> | ||
<dependencyManagement> | ||
<dependencies> | ||
<dependency> | ||
<groupId>com.google.cloud</groupId> | ||
<artifactId>libraries-bom</artifactId> | ||
<version>24.3.0</version> | ||
<type>pom</type> | ||
<scope>import</scope> | ||
</dependency> | ||
</dependencies> | ||
</dependencyManagement> | ||
<build> | ||
<plugins> | ||
|
||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-surefire-plugin</artifactId> | ||
<version>3.0.0-M5</version> | ||
<dependencies> | ||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter-engine</artifactId> | ||
<version>${junit-platform.version}</version> | ||
</dependency> | ||
</dependencies> | ||
</plugin> | ||
|
||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-shade-plugin</artifactId> | ||
<executions> | ||
<execution> | ||
<phase>package</phase> | ||
<goals> | ||
<goal>shade</goal> | ||
</goals> | ||
<configuration> | ||
<transformers> | ||
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> | ||
</transformer> | ||
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> | ||
</transformers> | ||
<filters> | ||
<filter> | ||
<artifact>*:*</artifact> | ||
<excludes> | ||
<exclude>META-INF/maven/**</exclude> | ||
<exclude>META-INF/*.SF</exclude> | ||
<exclude>META-INF/*.DSA</exclude> | ||
<exclude>META-INF/*.RSA</exclude> | ||
</excludes> | ||
</filter> | ||
</filters> | ||
<relocations> | ||
<relocation> | ||
<pattern>com</pattern> | ||
<shadedPattern>repackaged.com.google</shadedPattern> | ||
<includes> | ||
<include>com.google.protobuf.**</include> | ||
<!-- <inclde>com.google.common.**</inclde>--> | ||
<!-- <include>io.grpc.internal.**</include>--> | ||
</includes> | ||
</relocation> | ||
</relocations> | ||
</configuration> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
<plugin> | ||
<groupId>com.diffplug.spotless</groupId> | ||
<artifactId>spotless-maven-plugin</artifactId> | ||
<version>${spotless-maven-plugin.version}</version> | ||
<configuration> | ||
<formats> | ||
<!-- Avoid trailing whitespace and require ending newline. --> | ||
<format> | ||
<includes> | ||
<include>*.md</include> | ||
<include>.gitignore</include> | ||
</includes> | ||
<trimTrailingWhitespace/> | ||
<endWithNewline/> | ||
</format> | ||
</formats> | ||
<java> | ||
|
||
<googleJavaFormat> | ||
<version>1.7</version> | ||
<style>GOOGLE</style> | ||
</googleJavaFormat> | ||
</java> | ||
</configuration> | ||
<!-- Bind to verify. --> | ||
<executions> | ||
<execution> | ||
<goals> | ||
<goal>check</goal> | ||
</goals> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
</project> |
Oops, something went wrong.