Skip to content

Commit

Permalink
[BEAM-3342] Create a Cloud Bigtable Python connector Write
Browse files Browse the repository at this point in the history
  • Loading branch information
Juan Rael committed Jan 29, 2019
1 parent ad00c99 commit 59499f6
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 2 deletions.
2 changes: 2 additions & 0 deletions .test-infra/jenkins/dependency_check/generate_report.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ REPORT_DESCRIPTION="
/usr/bin/virtualenv dependency/check
. dependency/check/bin/activate
pip install --upgrade google-cloud-bigquery
pip install --upgrade google-cloud-bigtable
pip install --upgrade google-cloud-core
rm -f build/dependencyUpdates/beam-dependency-check-report.txt

# Insall packages and run the unit tests of the report generator and the jira manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ job(testConfiguration.jobName) {
shell('.env/bin/pip install --upgrade setuptools pip')

// Install job requirements for analysis script.
shell('.env/bin/pip install requests google.cloud.bigquery mock')
shell('.env/bin/pip install requests google.cloud.bigquery mock google.cloud.bigtable google.cloud')

// Launch verification tests before executing script.
shell('.env/bin/python ' + commonJobProperties.checkoutDir + '/.test-infra/jenkins/verify_performance_test_results_test.py')
Expand Down
6 changes: 6 additions & 0 deletions ownership/PYTHON_DEPENDENCY_OWNERS.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ deps:
google-apitools:
owners:

google-cloud-core:
owners:

google-cloud-bigquery:
owners: markflyhigh

google-cloud-bigtable:
owners:

google-cloud-pubsub:
owners: markflyhigh

Expand Down
194 changes: 194 additions & 0 deletions sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Unittest for GCP Bigtable testing."""
from __future__ import absolute_import

import datetime
import logging
import random
import string
import unittest
import uuid

import pytz

import apache_beam as beam
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.runner import PipelineState
from apache_beam.testing.test_pipeline import TestPipeline

# Protect against environments where bigtable library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud._helpers import _microseconds_from_datetime
from google.cloud._helpers import UTC
from google.cloud.bigtable import row, column_family, Client
except ImportError:
Client = None
UTC = pytz.utc
_microseconds_from_datetime = lambda label_stamp: label_stamp
_datetime_from_microseconds = lambda micro: micro


EXISTING_INSTANCES = []
LABEL_KEY = u'python-bigtable-beam'
label_stamp = datetime.datetime.utcnow().replace(tzinfo=UTC)
label_stamp_micros = _microseconds_from_datetime(label_stamp)
LABELS = {LABEL_KEY: str(label_stamp_micros)}


class GenerateTestRows(beam.PTransform):
""" A transform test to run write to the Bigtable Table.
A PTransform that generate a list of `DirectRow` to write it in
Bigtable Table.
"""
def __init__(self, number, project_id=None, instance_id=None,
table_id=None):
super(WriteToBigTable, self).__init__()
self.number = number
self.rand = random.choice(string.ascii_letters + string.digits)
self.column_family_id = 'cf1'
self.beam_options = {'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id}

def _generate(self):
value = ''.join(self.rand for i in range(100))

for index in range(self.number):
key = "beam_key%s" % ('{0:07}'.format(index))
direct_row = row.DirectRow(row_key=key)
for column_id in range(10):
direct_row.set_cell(self.column_family_id,
('field%s' % column_id).encode('utf-8'),
value,
datetime.datetime.now())
yield direct_row

def expand(self, pvalue):
beam_options = self.beam_options
return (pvalue
| beam.Create(self._generate())
| WriteToBigTable(beam_options['project_id'],
beam_options['instance_id'],
beam_options['table_id']))


@unittest.skipIf(Client is None, 'GCP Bigtable dependencies are not installed')
class BigtableIOWriteTest(unittest.TestCase):
""" Bigtable Write Connector Test
"""
DEFAULT_TABLE_PREFIX = "python-test"
instance_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
cluster_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
table_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
number = 500
LOCATION_ID = "us-east1-b"

def setUp(self):
try:
from google.cloud.bigtable import enums
self.STORAGE_TYPE = enums.StorageType.HDD
self.INSTANCE_TYPE = enums.Instance.Type.DEVELOPMENT
except ImportError:
self.STORAGE_TYPE = 2
self.INSTANCE_TYPE = 2

self.test_pipeline = TestPipeline(is_integration_test=True)
self.runner_name = type(self.test_pipeline.runner).__name__
self.project = self.test_pipeline.get_option('project')
self.client = Client(project=self.project, admin=True)

self._delete_old_instances()

self.instance = self.client.instance(self.instance_id,
instance_type=self.INSTANCE_TYPE,
labels=LABELS)

if not self.instance.exists():
cluster = self.instance.cluster(self.cluster_id,
self.LOCATION_ID,
default_storage_type=self.STORAGE_TYPE)
self.instance.create(clusters=[cluster])
self.table = self.instance.table(self.table_id)

if not self.table.exists():
max_versions_rule = column_family.MaxVersionsGCRule(2)
column_family_id = 'cf1'
column_families = {column_family_id: max_versions_rule}
self.table.create(column_families=column_families)

def _delete_old_instances(self):
instances = self.client.list_instances()
EXISTING_INSTANCES[:] = instances

def age_in_hours(micros):
return (datetime.datetime.utcnow().replace(tzinfo=UTC) - (
_datetime_from_microseconds(micros))).total_seconds() // 3600
CLEAN_INSTANCE = [i for instance in EXISTING_INSTANCES for i in instance if(
LABEL_KEY in i.labels.keys() and
(age_in_hours(int(i.labels[LABEL_KEY])) >= 2))]

if CLEAN_INSTANCE:
for instance in CLEAN_INSTANCE:
instance.delete()

def tearDown(self):
if self.instance.exists():
self.instance.delete()

def test_bigtable_write(self):
number = self.number
pipeline_args = self.test_pipeline.options_list
pipeline_options = PipelineOptions(pipeline_args)

with beam.Pipeline(options=pipeline_options) as pipeline:
config_data = {'project_id':self.project,
'instance_id':self.instance,
'table_id':self.table}
_ = (
pipeline
| 'Generate Direct Rows' >> GenerateTestRows(number, **config_data))

result = pipeline.run()
result.wait_until_finish()

assert result.state == PipelineState.DONE

read_rows = self.table.read_rows()
assert len([_ for _ in read_rows]) == number

if not hasattr(result, 'has_job') or result.has_job:
read_filter = MetricsFilter().with_name('Written Row')
query_result = result.metrics().query(read_filter)
if query_result['counters']:
read_counter = query_result['counters'][0]

logging.info('Number of Rows: %d', read_counter.committed)
assert read_counter.committed == number


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
143 changes: 143 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigtableio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""BigTable connector
This module implements writing to BigTable tables.
The default mode is to set row data to write to BigTable tables.
The syntax supported is described here:
https://cloud.google.com/bigtable/docs/quickstart-cbt
BigTable connector can be used as main outputs. A main output
(common case) is expected to be massive and will be split into
manageable chunks and processed in parallel. In the example below
we created a list of rows then passed to the GeneratedDirectRows
DoFn to set the Cells and then we call the BigTableWriteFn to insert
those generated rows in the table.
main_table = (p
| beam.Create(self._generate())
| WriteToBigTable(project_id,
instance_id,
table_id))
"""
from __future__ import absolute_import

import apache_beam as beam
from apache_beam.metrics import Metrics
from apache_beam.transforms.display import DisplayDataItem

try:
from google.cloud.bigtable import Client
except ImportError:
pass

__all__ = ['WriteToBigTable']


class _BigTableWriteFn(beam.DoFn):
""" Creates the connector can call and add_row to the batcher using each
row in beam pipe line
Args:
project_id(str): GCP Project ID
instance_id(str): GCP Instance ID
table_id(str): GCP Table ID
"""

def __init__(self, project_id, instance_id, table_id):
""" Constructor of the Write connector of Bigtable
Args:
project_id(str): GCP Project of to write the Rows
instance_id(str): GCP Instance to write the Rows
table_id(str): GCP Table to write the `DirectRows`
"""
super(_BigTableWriteFn, self).__init__()
self.beam_options = {'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id}
self.table = None
self.batcher = None
self.written = Metrics.counter(self.__class__, 'Written Row')

def __getstate__(self):
return self.beam_options

def __setstate__(self, options):
self.beam_options = options
self.table = None
self.batcher = None
self.written = Metrics.counter(self.__class__, 'Written Row')

def start_bundle(self):
if self.table is None:
client = Client(project=self.beam_options['project_id'])
instance = client.instance(self.beam_options['instance_id'])
self.table = instance.table(self.beam_options['table_id'])
self.batcher = self.table.mutations_batcher()

def process(self, row):
self.written.inc()
# You need to set the timestamp in the cells in this row object,
# when we do a retry we will mutating the same object, but, with this
# we are going to set our cell with new values.
# Example:
# direct_row.set_cell('cf1',
# 'field1',
# 'value1',
# timestamp=datetime.datetime.now())
self.batcher.mutate(row)

def finish_bundle(self):
self.batcher.flush()
self.batcher = None

def display_data(self):
return {'projectId': DisplayDataItem(self.beam_options['project_id'],
label='Bigtable Project Id'),
'instanceId': DisplayDataItem(self.beam_options['instance_id'],
label='Bigtable Instance Id'),
'tableId': DisplayDataItem(self.beam_options['table_id'],
label='Bigtable Table Id')
}


class WriteToBigTable(beam.PTransform):
""" A transform to write to the Bigtable Table.
A PTransform that write a list of `DirectRow` into the Bigtable Table
"""
def __init__(self, project_id=None, instance_id=None,
table_id=None):
""" The PTransform to access the Bigtable Write connector
Args:
project_id(str): GCP Project of to write the Rows
instance_id(str): GCP Instance to write the Rows
table_id(str): GCP Table to write the `DirectRows`
"""
super(WriteToBigTable, self).__init__()
self.beam_options = {'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id}

def expand(self, pvalue):
beam_options = self.beam_options
return (pvalue
| beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
beam_options['instance_id'],
beam_options['table_id'])))
2 changes: 2 additions & 0 deletions sdks/python/container/base_image_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ googledatastore==7.0.1
google-cloud-pubsub==0.39.0
google-cloud-bigquery==1.6.0
proto-google-cloud-datastore-v1==0.90.4
google-cloud-bigtable==0.31.1
google-cloud-core==0.28.1

# Optional packages
cython==0.28.1
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ def get_version():
'google-cloud-pubsub==0.39.0',
# GCP packages required by tests
'google-cloud-bigquery>=1.6.0,<1.7.0',
'google-cloud-core==0.28.1',
'google-cloud-bigtable==0.31.1',
]


Expand Down
Loading

0 comments on commit 59499f6

Please sign in to comment.