Skip to content

Commit

Permalink
[hotfix]Python function protobuf missing field and broker test failed (
Browse files Browse the repository at this point in the history
…apache#6641)

### Motivation

At present, in the test, we found that due to the addition of a field `forwardSourceMessageProperty` in the proto file of function, this field was lost in the proto file generated by python and go. Due to python parsing with the following code:


```
json_format.Parse(args.function_details, function_details)
```

the following exception will be thrown. 

```
2020-03-30T13:13:25.2339031Z 13:13:24.379 [pulsar-external-listener-20-1] INFO  org.apache.pulsar.functions.runtime.process.ProcessRuntime - Started process successfully
2020-03-30T13:13:25.2339190Z Traceback (most recent call last):
2020-03-30T13:13:25.2340782Z   File "/pulsar/instances/python-instance/python_instance_main.py", line 211, in <module>
2020-03-30T13:13:25.2340944Z     main()
2020-03-30T13:13:25.2342589Z   File "/pulsar/instances/python-instance/python_instance_main.py", line 98, in main
2020-03-30T13:13:25.2342744Z     json_format.Parse(args.function_details, function_details)
2020-03-30T13:13:25.2354119Z   File "/usr/local/lib/python2.7/dist-packages/google/protobuf/json_format.py", line 430, in Parse
2020-03-30T13:13:25.2354284Z     return ParseDict(js, message, ignore_unknown_fields, descriptor_pool)
2020-03-30T13:13:25.2354689Z   File "/usr/local/lib/python2.7/dist-packages/google/protobuf/json_format.py", line 450, in ParseDict
2020-03-30T13:13:25.2354882Z     parser.ConvertMessage(js_dict, message)
2020-03-30T13:13:25.2355386Z   File "/usr/local/lib/python2.7/dist-packages/google/protobuf/json_format.py", line 481, in ConvertMessage
2020-03-30T13:13:25.2355537Z     self._ConvertFieldValuePair(value, message)
2020-03-30T13:13:25.2356082Z   File "/usr/local/lib/python2.7/dist-packages/google/protobuf/json_format.py", line 590, in _ConvertFieldValuePair
2020-03-30T13:13:25.2356559Z     raise ParseError('Failed to parse {0} field: {1}.'.format(name, e))
2020-03-30T13:13:25.2357199Z google.protobuf.json_format.ParseError: Failed to parse sink field: Message type "proto.SinkSpec" has no field named "forwardSourceMessageProperty".
2020-03-30T13:13:25.2357634Z  Available Fields(except extensions): ['className', 'configs', 'typeClassName', 'topic', 'serDeClassName', 'builtin', 'schemaType'].
```

This pr is mainly to fix the proto file generated by python first. In order to make the test pass smoothly, I will gradually fix this problem in other languages in the next pull request.


### Modifications

* Enable build docker image to pulsar and pulsar-all.
* Add new generated protobuf file for python
* Disable go function integration test
* Add sleep for method testGetPartitionedStatsInternal

### Verifying this change

The integration process test passed AmateurEvents#22
  • Loading branch information
tuteng authored Mar 31, 2020
1 parent b5e9be6 commit c955ff9
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 140 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/ci-integration-process.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DskipTests

- name: build pulsar image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true

- name: build pulsar-all image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true

- name: build artifacts and docker image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/ci-integration-thread.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DskipTests

- name: build pulsar image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true

- name: build pulsar-all image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true

- name: build artifacts and docker image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.validation.constraints.AssertTrue;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
Expand Down Expand Up @@ -970,7 +972,7 @@ public void partitionedTopics(String topicName) throws Exception {

@Test
public void testGetPartitionedInternalInfo() throws Exception {
String partitionedTopic = "my-topic";
String partitionedTopic = "my-topic" + UUID.randomUUID().toString();
assertEquals(admin.topics().getPartitionedTopicList("prop-xyz/ns1"), Lists.newArrayList());
final String partitionedTopicName = "persistent://prop-xyz/ns1/" + partitionedTopic;
admin.topics().createPartitionedTopic(partitionedTopicName, 2);
Expand Down Expand Up @@ -1001,7 +1003,7 @@ public void testGetPartitionedInternalInfo() throws Exception {

@Test
public void testGetPartitionedStatsInternal() throws Exception {
String partitionedTopic = "my-topic";
String partitionedTopic = "my-topic" + UUID.randomUUID().toString();
String subName = "my-sub";
assertEquals(admin.topics().getPartitionedTopicList("prop-xyz/ns1"), Lists.newArrayList());
final String partitionedTopicName = "persistent://prop-xyz/ns1/" + partitionedTopic;
Expand All @@ -1018,6 +1020,8 @@ public void testGetPartitionedStatsInternal() throws Exception {
String partitionTopic0 = partitionedTopicName + "-partition-0";
String partitionTopic1 = partitionedTopicName + "-partition-1";

Thread.sleep(1000);

PersistentTopicInternalStats internalStats0 = admin.topics().getInternalStats(partitionTopic0);
assertEquals(internalStats0.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList(Codec.encode(subName))));

Expand Down
Loading

0 comments on commit c955ff9

Please sign in to comment.