Skip to content

Commit

Permalink
message kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
andyadc committed Nov 29, 2024
1 parent 2f02b92 commit 645c9db
Showing 1 changed file with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,21 @@ public void send(String topic, Message<?> message) {
sendToKafka(topic, null, null, message, true);
}

@Override
public void send(String topic, Integer partition, String key, Message<?> message) {
sendToKafka(topic, partition, key, message, true);
}

@Override
public void syncSend(String topic, Message<?> message) {
sendToKafka(topic, null, null, message, false);
}

@Override
public void syncSend(String topic, Integer partition, String key, Message<?> message) {
sendToKafka(topic, partition, key, message, false);
}

private void sendToKafka(String topic, Integer partition, String key, Message<?> message, Boolean isAsync) {
if (message.getTimestamp() == null) {
message.setTimestamp(System.currentTimeMillis());
Expand All @@ -67,7 +77,7 @@ private void sendToKafka(String topic, Integer partition, String key, Message<?>

long sendTime = System.currentTimeMillis();
if (isAsync) {
producer.send(record, new sentCallback(sendTime, message.getMessageId(), value));
producer.send(record, new KafkaSentCallback(sendTime, message.getMessageId(), value));
} else {
RecordMetadata metadata = null;
try {
Expand Down Expand Up @@ -120,14 +130,14 @@ public void setClientId(String clientId) {
this.clientId = clientId;
}

static class sentCallback implements Callback {
static class KafkaSentCallback implements Callback {

/* 开始发送消息的时间戳 */
private final long sendTime;
private final String messageId;
private final String message;

public sentCallback(long sendTime, String messageId, String message) {
public KafkaSentCallback(long sendTime, String messageId, String message) {
this.sendTime = sendTime;
this.messageId = messageId;
this.message = message;
Expand Down

0 comments on commit 645c9db

Please sign in to comment.