diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index a5d2b7bfa7d8..42e5886ceb64 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -697,12 +697,12 @@ def examples_wordcount_streaming(argv): | 'Group' >> beam.GroupByKey() | 'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1]))) - | - 'Format' >> beam.Map(lambda word_and_count: '%s: %d' % word_and_count)) + | 'Format' >> + beam.MapTuple(lambda word, count: f'{word}: {count}'.encode('utf-8'))) # [START example_wordcount_streaming_write] # Write to Pub/Sub - output | beam.io.WriteStringsToPubSub(known_args.output_topic) + output | beam.io.WriteToPubSub(known_args.output_topic) # [END example_wordcount_streaming_write] diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 238ecc8aef59..35c60e7833fc 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -748,7 +748,7 @@ def test_examples_wordcount_debugging(self): @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed') @mock.patch('apache_beam.io.ReadFromPubSub') - @mock.patch('apache_beam.io.WriteStringsToPubSub') + @mock.patch('apache_beam.io.WriteToPubSub') def test_examples_wordcount_streaming(self, *unused_mocks): def FakeReadFromPubSub(topic=None, subscription=None, values=None): expected_topic = topic @@ -768,7 +768,7 @@ def __init__(self, matcher): def expand(self, pcoll): assert_that(pcoll, self.matcher) - def FakeWriteStringsToPubSub(topic=None, values=None): + def FakeWriteToPubSub(topic=None, values=None): expected_topic = topic def _inner(topic=None, subscription=None): @@ -785,11 +785,11 @@ def _inner(topic=None, subscription=None): TimestampedValue(b'a b c c c', 20) ] output_topic = 'projects/fake-beam-test-project/topic/outtopic' - output_values = ['a: 1', 'a: 2', 'b: 1', 'b: 3', 'c: 3'] + output_values = [b'a: 1', b'a: 2', b'b: 1', b'b: 3', b'c: 3'] beam.io.ReadFromPubSub = ( FakeReadFromPubSub(topic=input_topic, values=input_values)) - beam.io.WriteStringsToPubSub = ( - FakeWriteStringsToPubSub(topic=output_topic, values=output_values)) + beam.io.WriteToPubSub = ( + FakeWriteToPubSub(topic=output_topic, values=output_values)) snippets.examples_wordcount_streaming([ '--input_topic', 'projects/fake-beam-test-project/topic/intopic', diff --git a/sdks/python/apache_beam/examples/sql_taxi.py b/sdks/python/apache_beam/examples/sql_taxi.py index 607dea1d3f82..32fd80aea066 100644 --- a/sdks/python/apache_beam/examples/sql_taxi.py +++ b/sdks/python/apache_beam/examples/sql_taxi.py @@ -80,7 +80,8 @@ def run(output_topic, pipeline_args): "window_end": window.end.to_rfc3339() }) | "Convert to JSON" >> beam.Map(json.dumps) - | beam.io.WriteStringsToPubSub(topic=output_topic)) + | "UTF-8 encode" >> beam.Map(lambda s: s.encode("utf-8")) + | beam.io.WriteToPubSub(topic=output_topic)) if __name__ == '__main__':