diff --git a/common/messaging/kafkaProducer.go b/common/messaging/kafkaProducer.go index 6bac935f6d5..11bc7434309 100644 --- a/common/messaging/kafkaProducer.go +++ b/common/messaging/kafkaProducer.go @@ -97,10 +97,16 @@ func (p *kafkaProducer) getKeyForReplicationTask(task *replicator.ReplicationTas switch task.GetTaskType() { case replicator.ReplicationTaskTypeHistory: // Use workflowID as the partition key so all replication tasks for a workflow are dispatched to the same - // Kafka partition. This will give us some ordering guarantee for workflow replication tasks atleast at + // Kafka partition. This will give us some ordering guarantee for workflow replication tasks at least at // the messaging layer perspective attributes := task.HistoryTaskAttributes return sarama.StringEncoder(attributes.GetWorkflowId()) + case replicator.ReplicationTaskTypeHistoryV2: + // Use workflowID as the partition key so all replication tasks for a workflow are dispatched to the same + // Kafka partition. This will give us some ordering guarantee for workflow replication tasks at least at + // the messaging layer perspective + attributes := task.HistoryTaskV2Attributes + return sarama.StringEncoder(attributes.GetWorkflowId()) case replicator.ReplicationTaskTypeSyncActivity: // Use workflowID as the partition key so all sync activity tasks for a workflow are dispatched to the same // Kafka partition. This will give us some ordering guarantee for workflow replication tasks atleast at