Skip to content

Commit

Permalink
Merge pull request apache#13615 from [BEAM-11524] Remove usages of Wr…
Browse files Browse the repository at this point in the history
…iteStringsToPubSub in examples

[BEAM-11524] Remove usages of WriteStringsToPubSub in examples
  • Loading branch information
boyuanzz authored Dec 24, 2020
2 parents 5c755ea + feb2b84 commit b0e3d36
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
6 changes: 3 additions & 3 deletions sdks/python/apache_beam/examples/snippets/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,12 +697,12 @@ def examples_wordcount_streaming(argv):
| 'Group' >> beam.GroupByKey()
|
'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
|
'Format' >> beam.Map(lambda word_and_count: '%s: %d' % word_and_count))
| 'Format' >>
beam.MapTuple(lambda word, count: f'{word}: {count}'.encode('utf-8')))

# [START example_wordcount_streaming_write]
# Write to Pub/Sub
output | beam.io.WriteStringsToPubSub(known_args.output_topic)
output | beam.io.WriteToPubSub(known_args.output_topic)
# [END example_wordcount_streaming_write]


Expand Down
10 changes: 5 additions & 5 deletions sdks/python/apache_beam/examples/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ def test_examples_wordcount_debugging(self):

@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
@mock.patch('apache_beam.io.ReadFromPubSub')
@mock.patch('apache_beam.io.WriteStringsToPubSub')
@mock.patch('apache_beam.io.WriteToPubSub')
def test_examples_wordcount_streaming(self, *unused_mocks):
def FakeReadFromPubSub(topic=None, subscription=None, values=None):
expected_topic = topic
Expand All @@ -768,7 +768,7 @@ def __init__(self, matcher):
def expand(self, pcoll):
assert_that(pcoll, self.matcher)

def FakeWriteStringsToPubSub(topic=None, values=None):
def FakeWriteToPubSub(topic=None, values=None):
expected_topic = topic

def _inner(topic=None, subscription=None):
Expand All @@ -785,11 +785,11 @@ def _inner(topic=None, subscription=None):
TimestampedValue(b'a b c c c', 20)
]
output_topic = 'projects/fake-beam-test-project/topic/outtopic'
output_values = ['a: 1', 'a: 2', 'b: 1', 'b: 3', 'c: 3']
output_values = [b'a: 1', b'a: 2', b'b: 1', b'b: 3', b'c: 3']
beam.io.ReadFromPubSub = (
FakeReadFromPubSub(topic=input_topic, values=input_values))
beam.io.WriteStringsToPubSub = (
FakeWriteStringsToPubSub(topic=output_topic, values=output_values))
beam.io.WriteToPubSub = (
FakeWriteToPubSub(topic=output_topic, values=output_values))
snippets.examples_wordcount_streaming([
'--input_topic',
'projects/fake-beam-test-project/topic/intopic',
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/examples/sql_taxi.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def run(output_topic, pipeline_args):
"window_end": window.end.to_rfc3339()
})
| "Convert to JSON" >> beam.Map(json.dumps)
| beam.io.WriteStringsToPubSub(topic=output_topic))
| "UTF-8 encode" >> beam.Map(lambda s: s.encode("utf-8"))
| beam.io.WriteToPubSub(topic=output_topic))


if __name__ == '__main__':
Expand Down

0 comments on commit b0e3d36

Please sign in to comment.