Skip to content

Commit

Permalink
[improve][connector] Sink support custom acknowledge type (apache#15491)
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd authored Jun 2, 2022
1 parent 9b01d1c commit a75dfdf
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;

@Slf4j
@Data
Expand Down Expand Up @@ -82,6 +83,30 @@ public void ack() {
sourceRecord.ack();
}

/**
* Some sink sometimes wants to control the ack type.
*/
public void cumulativeAck() {
if (sourceRecord instanceof PulsarRecord) {
PulsarRecord pulsarRecord = (PulsarRecord) sourceRecord;
pulsarRecord.cumulativeAck();
} else {
throw new RuntimeException("SourceRecord class type must be PulsarRecord");
}
}

/**
* Some sink sometimes wants to control the ack type.
*/
public void individualAck() {
if (sourceRecord instanceof PulsarRecord) {
PulsarRecord pulsarRecord = (PulsarRecord) sourceRecord;
pulsarRecord.individualAck();
} else {
throw new RuntimeException("SourceRecord class type must be PulsarRecord");
}
}

@Override
public void fail() {
sourceRecord.fail();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -44,6 +45,7 @@ public class PulsarRecord<T> implements RecordWithEncryptionContext<T> {

private final Runnable failFunction;
private final Runnable ackFunction;
private final Consumer<Boolean> customAckFunction;

@Override
public Optional<String> getKey() {
Expand Down Expand Up @@ -93,6 +95,20 @@ public Optional<Long> getEventTime() {
}
}

/**
* Some sink sometimes wants to control the ack type.
*/
public void cumulativeAck() {
this.customAckFunction.accept(true);
}

/**
* Some sink sometimes wants to control the ack type.
*/
public void individualAck() {
this.customAckFunction.accept(false);
}

@Override
public Optional<EncryptionContext> getEncryptionCtx() {
return message.getEncryptionCtx();
Expand Down Expand Up @@ -121,4 +137,5 @@ public void fail() {
public Optional<Message<T>> getMessage() {
return Optional.of(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,21 @@ protected Record<T> buildRecord(Consumer<T> consumer, Message<T> message) {
.message(message)
.schema(schema)
.topicName(message.getTopicName())
.customAckFunction(cumulative -> {
if (cumulative) {
consumer.acknowledgeCumulativeAsync(message)
.whenComplete((unused, throwable) -> message.release());
} else {
consumer.acknowledgeAsync(message).whenComplete((unused, throwable) -> message.release());
}
})
.ackFunction(() -> {
try {
if (pulsarSourceConfig
.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
consumer.acknowledgeCumulativeAsync(message);
} else {
consumer.acknowledgeAsync(message);
}
} finally {
// don't need to check if message pooling is set
// client will automatically check
message.release();
if (pulsarSourceConfig
.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
consumer.acknowledgeCumulativeAsync(message)
.whenComplete((unused, throwable) -> message.release());
} else {
consumer.acknowledgeAsync(message).whenComplete((unused, throwable) -> message.release());
}
}).failFunction(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;

import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.junit.Assert;
import org.mockito.Mockito;
import org.testng.annotations.Test;

public class SinkRecordTest {

@Test
public void testCustomAck() {

PulsarRecord pulsarRecord = Mockito.mock(PulsarRecord.class);
SinkRecord sinkRecord = new SinkRecord<>(pulsarRecord, new Object());

sinkRecord.cumulativeAck();
Mockito.verify(pulsarRecord, Mockito.times(1)).cumulativeAck();

sinkRecord = new SinkRecord(Mockito.mock(Record.class), new Object());
try {
sinkRecord.individualAck();
Assert.fail("Should throw runtime exception");
} catch (Exception e) {
Assert.assertTrue(e instanceof RuntimeException);
Assert.assertEquals(e.getMessage(), "SourceRecord class type must be PulsarRecord");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
Expand Down Expand Up @@ -360,4 +361,24 @@ public void testInputConsumersGetter(PulsarSourceConfig pulsarSourceConfig) thro

fail("Unknown config type");
}


@Test(dataProvider = "sourceImpls")
public void testPulsarRecordCustomAck(PulsarSourceConfig pulsarSourceConfig) throws Exception {

PulsarSource pulsarSource = getPulsarSource(pulsarSourceConfig);
Message message = Mockito.mock(Message.class);
Consumer consumer = Mockito.mock(Consumer.class);
Mockito.when(consumer.acknowledgeAsync(message)).thenReturn(CompletableFuture.completedFuture(null));
Mockito.when(consumer.acknowledgeCumulativeAsync(message)).thenReturn(CompletableFuture.completedFuture(null));

PulsarRecord record = (PulsarRecord) pulsarSource.buildRecord(consumer, message);

record.cumulativeAck();
Mockito.verify(consumer, Mockito.times(1)).acknowledgeCumulativeAsync(message);

record.individualAck();
Mockito.verify(consumer, Mockito.times(1)).acknowledgeAsync(message);
}

}

0 comments on commit a75dfdf

Please sign in to comment.