Skip to content

Commit

Permalink
[FLINK-2381] [Storm Compatibility] Failing Test: WrapperSetupHelperTest
Browse files Browse the repository at this point in the history
  • Loading branch information
mjsax committed Oct 7, 2015
1 parent 5c2c112 commit f332fa5
Showing 1 changed file with 11 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,13 @@ public void testCreateTopologyContext() {
builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
.fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
.fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);

final int maxRetry = 3;
int counter;
for (counter = 0; counter < maxRetry; ++counter) {
.fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
.fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);

int counter = 0;
while (true) {
LocalCluster cluster = new LocalCluster();
Config c = new Config();
c.setNumAckers(0);
Expand All @@ -200,7 +199,6 @@ public void testCreateTopologyContext() {
break;
}
}
Assert.assertTrue(counter < maxRetry);

TestTopologyBuilder flinkBuilder = new TestTopologyBuilder();

Expand All @@ -209,10 +207,10 @@ public void testCreateTopologyContext() {
flinkBuilder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
flinkBuilder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
flinkBuilder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
.fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
.fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
.fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
.fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);

flinkBuilder.createTopology();
StormTopology stormTopology = flinkBuilder.getStormTopology();
Expand Down

0 comments on commit f332fa5

Please sign in to comment.