Skip to content

Commit

Permalink
Pubsub2Inbox: Add GCS copy and logger outputs and example configurati…
Browse files Browse the repository at this point in the history
…on, and refactored (GoogleCloudPlatform#663)

Terraform code to use archive provider.
  • Loading branch information
rosmo authored Jun 30, 2021
1 parent f63982e commit 70c4223
Show file tree
Hide file tree
Showing 17 changed files with 380 additions and 34 deletions.
4 changes: 4 additions & 0 deletions tools/pubsub2inbox/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ Out of the box, you'll have the following functionality:
- From [Recommender API](https://cloud.google.com/recommender/docs/overview).
- Also see [example with attached spreadsheet](examples/recommendations-example-2.yaml).
- [Cloud Monitoring alerts](examples/monitoring-config.yaml)
- [Cloud Storage copier](examples/gcscopy-example.yaml)
- Copies objects between two buckets, useful for backing up.
- Any JSON
- [See the example of generic JSON processing](examples/generic-config.yaml)

Expand Down Expand Up @@ -89,6 +91,8 @@ Available output processors are:
- [gcs.py](output/gcs.py): can create objects on GCS from any inputs.
- [webhook.py](output/webhook.py): can send arbitrary HTTP requests, optionally
with added OAuth2 bearer token from GCP.
- [gcscopy.py](output/gcscopy.py): copies files between buckets.
- [logger.py](output/logger.py): Logs message in Cloud Logging.

Please note that the output processors have some IAM requirements to be able to
pull information from GCP:
Expand Down
13 changes: 13 additions & 0 deletions tools/pubsub2inbox/examples/bigquery-example.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

retryPeriod: 3 days ago

Expand Down
14 changes: 14 additions & 0 deletions tools/pubsub2inbox/examples/budget-config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Input processors to use
processors:
- budget
Expand Down
35 changes: 35 additions & 0 deletions tools/pubsub2inbox/examples/gcscopy-example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copies files from one bucket to another based on Storage Notifications API.
# You may want to increase function timeout for large files, or use Cloud Run for longer timeouts.
# (copying 6 GB might take from 7 minutes to 2 second depending on source/destination region,
# storage class, etc.)
retryPeriod: 3 day ago

processors:
- genericjson

processIf: "{% if event.attributes.eventType == 'OBJECT_FINALIZE' %}1{% endif %}"

outputs:
- type: logger
message: 'Backing up object from source bucket.'
variables:
mime_type: "{{ ('gs://' ~ data.bucket ~ '/' ~ data.name)|read_gcs_object(0, 1024)|filemagic }}"
- type: gcscopy
sourceBucket: "{{ data.bucket }}"
sourceObject: "{{ data.name }}"
destinationBucket: "my-gcscopy-testing-bucket"
destinationObject: "copy-{{ data.name }}"
13 changes: 13 additions & 0 deletions tools/pubsub2inbox/examples/generic-config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
processors:
- genericjson

Expand Down
14 changes: 14 additions & 0 deletions tools/pubsub2inbox/examples/monitoring-config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Generates alert emails from Cloud Monitoring alert Pub/Sub messages
#
processors:
Expand Down
13 changes: 13 additions & 0 deletions tools/pubsub2inbox/examples/recommendations-example.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
retryPeriod: 10 years ago

processors:
Expand Down
13 changes: 13 additions & 0 deletions tools/pubsub2inbox/examples/scc-config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
processors:
- scc

Expand Down
15 changes: 14 additions & 1 deletion tools/pubsub2inbox/examples/storage-example.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@

# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
retryPeriod: 3 day ago

processors:
Expand Down
6 changes: 4 additions & 2 deletions tools/pubsub2inbox/filters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
from .regex import regex_replace
from .lists import split, index
from .strings import add_links, urlencode, generate_signed_url, json_encode, csv_encode, html_table_to_xlsx, make_list
from .strings import add_links, urlencode, generate_signed_url, json_encode, csv_encode, html_table_to_xlsx, make_list, read_gcs_object, filemagic
from .date import strftime, recurring_date
from .gcp import format_cost, get_cost

Expand All @@ -33,5 +33,7 @@ def get_jinja_filters():
'format_cost': format_cost,
'get_cost': get_cost,
'recurring_date': recurring_date,
'make_list': make_list
'make_list': make_list,
'read_gcs_object': read_gcs_object,
'filemagic': filemagic
}
32 changes: 32 additions & 0 deletions tools/pubsub2inbox/filters/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io
from tablepyxl import tablepyxl
import base64
import magic


def make_list(s):
Expand Down Expand Up @@ -58,6 +59,37 @@ def html_table_to_xlsx(s):
return base64.encodebytes(output.getvalue()).decode('utf-8')


def read_gcs_object(url, start=None, end=None):
parsed_url = urllib.parse.urlparse(url)
if parsed_url.scheme != 'gs':
raise InvalidSchemeURLException(
'Invalid scheme for read_gcs_object(%s): %s' %
(url, parsed_url.scheme))
client = storage.Client()

bucket = client.bucket(parsed_url.netloc)
blob = bucket.get_blob(parsed_url.path[1:])
if not blob:
raise ObjectNotFoundException(
'Failed to download object %s from bucket %s' %
(parsed_url.netloc, parsed_url.path[1:]))
contents = blob.download_as_bytes(start=start, end=end)
return base64.encodebytes(contents).decode('utf-8')


def filemagic(contents):
with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m:
return m.id_buffer(base64.b64decode(contents))


class ObjectNotFoundException(Exception):
pass


class InvalidSchemeURLException(Exception):
pass


class InvalidSchemeSignedURLException(Exception):
pass

Expand Down
37 changes: 28 additions & 9 deletions tools/pubsub2inbox/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,20 @@ def load_configuration(file_name):
if os.getenv('CONFIG'):
logger = logging.getLogger('pubsub2inbox')
secret_manager_url = os.getenv('CONFIG')
logger.debug('Loading configuration from Secret Manager: %s' %
(secret_manager_url))
client_info = grpc_client_info.ClientInfo(
user_agent='google-pso-tool/pubsub2inbox/1.1.0')
client = secretmanager.SecretManagerServiceClient(
client_info=client_info)
response = client.access_secret_version(name=secret_manager_url)
configuration = response.payload.data.decode('UTF-8')
if secret_manager_url.startswith('projects/'):
logger.debug('Loading configuration from Secret Manager: %s' %
(secret_manager_url))
client_info = grpc_client_info.ClientInfo(
user_agent='google-pso-tool/pubsub2inbox/1.1.0')
client = secretmanager.SecretManagerServiceClient(
client_info=client_info)
response = client.access_secret_version(name=secret_manager_url)
configuration = response.payload.data.decode('UTF-8')
else:
logger.debug('Loading configuration from bundled file: %s' %
(secret_manager_url))
with open(secret_manager_url) as config_file:
configuration = config_file.read()
else:
with open(file_name) as config_file:
configuration = config_file.read()
Expand Down Expand Up @@ -225,6 +231,18 @@ def process_message(config, data, event, context):
if 'type' not in output_config:
raise NoTypeConfiguredException(
'No type configured for output!')

if 'processIf' in output_config:
processif_template = jinja_environment.from_string(
output_config['processIf'])
processif_template.name = 'processif'
processif_contents = processif_template.render()
if processif_contents.strip() == '':
logger.info(
'Will not use output processor %s because processIf evaluated to empty.'
% output_config['type'])
continue

logger.debug('Processing message using output processor: %s' %
output_config['type'])

Expand Down Expand Up @@ -266,7 +284,8 @@ def decode_and_process(logger, config, event, context):
logger.debug('Starting Pub/Sub message processing...',
extra={
'event_id': context.event_id,
'data': data
'data': data,
'attributes': event['attributes']
})
process_message(config, data, event, context)
logger.debug('Pub/Sub message processing finished.',
Expand Down
47 changes: 25 additions & 22 deletions tools/pubsub2inbox/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ terraform {
required_version = ">= 0.13.0"

required_providers {
google = ">= 3.40.0"
google = ">= 3.40.0"
archive = ">= 2.2.0"
}
}

Expand All @@ -25,6 +26,9 @@ provider "google" {
region = var.region
}

provider "archive" {
}

resource "google_secret_manager_secret" "config-secret" {
secret_id = var.secret_id != "" ? var.secret_id : var.function_name

Expand Down Expand Up @@ -61,34 +65,34 @@ resource "random_id" "bucket-suffix" {
}

resource "google_storage_bucket" "function-bucket" {
name = format("%s-%s", var.bucket_name, random_id.bucket-suffix.hex)
name = format("%s-%s", var.bucket_name, random_id.bucket-suffix.hex)
uniform_bucket_level_access = true
}

resource "null_resource" "function-zip" {
triggers = {
always_run = timestamp()
}

provisioner "local-exec" {
command = format("zip %s/index.zip main.py requirements.txt filters/*.py output/*.py processors/*.py", path.root)
}
locals {
function_files = ["main.py", "requirements.txt", "filters/*.py", "output/*.py", "processors/*.py"]
all_function_files = setunion([for glob in local.function_files : fileset(path.module, glob)]...)
function_file_hashes = [for file_path in local.all_function_files : filemd5(format("%s/%s", path.module, file_path))]
}

resource "random_string" "function-zip-name" {
length = 8
special = false
upper = false
keepers = {
md5 = filemd5(format("%s/index.zip", path.root))
data "archive_file" "function-zip" {
type = "zip"
output_path = "${path.module}/index.zip"
dynamic "source" {
for_each = local.all_function_files
content {
content = file(format("%s/%s", path.module, source.value))
filename = source.value
}
}
}

resource "google_storage_bucket_object" "function-archive" {
name = format("index-%s.zip", random_string.function-zip-name.result)
name = format("index-%s.zip", md5(join(",", local.function_file_hashes)))
bucket = google_storage_bucket.function-bucket.name
source = format("%s/index.zip", path.root)
depends_on = [
null_resource.function-zip
data.archive_file.function-zip
]
}

Expand All @@ -108,6 +112,7 @@ resource "google_cloudfunctions_function" "function" {
source_archive_bucket = google_storage_bucket.function-bucket.name
source_archive_object = google_storage_bucket_object.function-archive.name
entry_point = "process_pubsub"
timeout = var.function_timeout

event_trigger {
event_type = "google.pubsub.topic.publish"
Expand All @@ -118,12 +123,10 @@ resource "google_cloudfunctions_function" "function" {
}

environment_variables = {
# You could also specify latest secret version here, in case you don't want to redeploy
# and are fine with the function picking up the new config on subsequent runs.
CONFIG = google_secret_manager_secret_version.config-secret-version.name
LOG_LEVEL = 10
SERVICE_ACCOUNT = google_service_account.service-account.email
}

depends_on = [
null_resource.function-zip
]
}
Loading

0 comments on commit 70c4223

Please sign in to comment.