Skip to content

Commit

Permalink
[hotfix][kafka] Improve logging in FlinkKafkaProducer011
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski authored and aljoscha committed Nov 20, 2017
1 parent 7c63526 commit 34120ef
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,12 @@ static class KafkaTransactionState {

@Override
public String toString() {
return String.format("%s [transactionalId=%s]", this.getClass().getSimpleName(), transactionalId);
return String.format(
"%s [transactionalId=%s, producerId=%s, epoch=%s]",
this.getClass().getSimpleName(),
transactionalId,
producerId,
epoch);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void flush() {
*/
public void resumeTransaction(long producerId, short epoch) {
Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId {} and epoch {}", producerId, epoch);
LOG.info("Attempting to resume transaction with producerId {} and epoch {}", producerId, epoch);
LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch);

Object transactionManager = getValue(kafkaProducer, "transactionManager");
synchronized (transactionManager) {
Expand Down

0 comments on commit 34120ef

Please sign in to comment.