Skip to content

Commit

Permalink
[BEAM-11524] Change WriteStringsToPubsub to use WriteToPubsub. (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
boyuanzz authored Dec 28, 2020
1 parent b8f273e commit 8df07b6
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 10 deletions.
8 changes: 2 additions & 6 deletions sdks/python/apache_beam/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,16 +252,12 @@ def __init__(self, topic):
topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
"""
super(_WriteStringsToPubSub, self).__init__()
self.with_attributes = False
self.id_label = None
self.timestamp_attribute = None
self.project, self.topic_name = parse_topic(topic)
self._sink = _PubSubSink(topic, id_label=None, timestamp_attribute=None)
self.topic = topic

def expand(self, pcoll):
pcoll = pcoll | 'EncodeString' >> Map(lambda s: s.encode('utf-8'))
pcoll.element_type = bytes
return pcoll | Write(self._sink)
return pcoll | WriteToPubSub(self.topic)


class WriteToPubSub(PTransform):
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ def test_write_messages_success(self, mock_pubsub):

def test_write_messages_deprecated(self, mock_pubsub):
data = 'data'
data_bytes = b'data'
payloads = [data]

options = PipelineOptions([])
Expand All @@ -839,7 +840,7 @@ def test_write_messages_deprecated(self, mock_pubsub):
| Create(payloads)
| WriteStringsToPubSub('projects/fakeprj/topics/a_topic'))
mock_pubsub.return_value.publish.assert_has_calls(
[mock.call(mock.ANY, data)])
[mock.call(mock.ANY, data_bytes)])

def test_write_messages_with_attributes_success(self, mock_pubsub):
data = b'data'
Expand Down
4 changes: 1 addition & 3 deletions sdks/python/apache_beam/runners/direct/direct_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,7 @@ def get_replacement_transform_for_applied_ptransform(

class WriteToPubSubOverride(PTransformOverride):
def matches(self, applied_ptransform):
return isinstance(
applied_ptransform.transform,
(beam_pubsub.WriteToPubSub, beam_pubsub._WriteStringsToPubSub))
return isinstance(applied_ptransform.transform, beam_pubsub.WriteToPubSub)

def get_replacement_transform_for_applied_ptransform(
self, applied_ptransform):
Expand Down

0 comments on commit 8df07b6

Please sign in to comment.