Skip to content

Commit

Permalink
Events v2 conflict resolver bug (cadence-workflow#1294)
Browse files Browse the repository at this point in the history
* fix another bug

* add purge tool

add sample
  • Loading branch information
longquanzheng authored Nov 30, 2018
1 parent 1e64b68 commit e218544
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 2 deletions.
5 changes: 3 additions & 2 deletions service/history/conflictResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ func (r *conflictResolverImpl) reset(prevRunID string, requestID string, replayE
firstEvent.GetVersion(),
)

resetMutableStateBuilder.executionInfo.EventStoreVersion = info.EventStoreVersion
resetMutableStateBuilder.executionInfo.BranchToken = info.BranchToken
resetMutableStateBuilder.executionInfo.EventStoreVersion = eventStoreVersion
sBuilder = newStateBuilder(r.shard, resetMutableStateBuilder, r.logger)
}

Expand All @@ -120,6 +119,8 @@ func (r *conflictResolverImpl) reset(prevRunID string, requestID string, replayE
resetMutableStateBuilder.IncrementHistorySize(size)
}

// reset branchToken to the original one(they will be set to a wrong version in applyEvents for startEvent
resetMutableStateBuilder.executionInfo.BranchToken = branchToken
// Applying events to mutableState does not move the nextEventID. Explicitly set nextEventID to new value
resetMutableStateBuilder.executionInfo.SetNextEventID(replayNextEventID)
resetMutableStateBuilder.executionInfo.StartTimestamp = startTime
Expand Down
31 changes: 31 additions & 0 deletions tools/cli/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,37 @@ func newAdminKafkaCommands() []cli.Command {
AdminKafkaParse(c)
},
},
{
Name: "purgeTopic",
Aliases: []string{"purge"},
Usage: "purge Kafka topic by consumer group",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagCluster,
Usage: "Name of the Kafka cluster to publish replicationTasks",
},
cli.StringFlag{
Name: FlagTopic,
Usage: "Topic to publish replication task",
},
cli.StringFlag{
Name: FlagGroup,
Usage: "Group to read DLQ",
},
cli.StringFlag{
Name: FlagHostFile,
Usage: "Kafka host config file in format of: " + `
clusters:
localKafka:
brokers:
- 127.0.0.1
- 127.0.0.2`,
},
},
Action: func(c *cli.Context) {
AdminPurgeTopic(c)
},
},
{
Name: "mergeDLQ",
Aliases: []string{"mgdlq"},
Expand Down
28 changes: 28 additions & 0 deletions tools/cli/adminKafkaCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,34 @@ func newKafkaProducer(c *cli.Context) messaging.Producer {
return producer
}

func AdminPurgeTopic(c *cli.Context) {
hostFile := getRequiredOption(c, FlagHostFile)
topic := getRequiredOption(c, FlagTopic)
cluster := getRequiredOption(c, FlagCluster)
group := getRequiredOption(c, FlagGroup)
brokers, err := loadBrokers(hostFile, cluster)

consumer := createConsumerAndWaitForReady(brokers, group, topic)

highWaterMarks, ok := consumer.HighWaterMarks()[topic]
if !ok {
ErrorAndExit("", fmt.Errorf("cannot find high watermark"))
}
fmt.Printf("Topic high watermark %v.\n", highWaterMarks)
for partition, hi := range highWaterMarks {
consumer.MarkPartitionOffset(topic, partition, hi-1, "")
fmt.Printf("set partition offset %v:%v \n", partition, hi)
}
err = consumer.CommitOffsets()
if err != nil {
ErrorAndExit("fail to commit offset", err)
}

consumer = createConsumerAndWaitForReady(brokers, group, topic)
msg, ok := <-consumer.Messages()
fmt.Printf("current offset sample: %v: %v \n", msg.Partition, msg.Offset)
}

// AdminMergeDLQ publish replication tasks from DLQ or JSON file
func AdminMergeDLQ(c *cli.Context) {
hostFile := getRequiredOption(c, FlagHostFile)
Expand Down

0 comments on commit e218544

Please sign in to comment.