Skip to content

Commit

Permalink
Reduce the probability of cache inconsistencies (apache#11423)
Browse files Browse the repository at this point in the history
### Motivation
Now when updating the function metadata, the cache is updated first and then send message to the topic. 
There may be a situation where the local cache updated successfully but the message sending fails.

### Modifications
Send the message first, then update the local cache
  • Loading branch information
315157973 authored Jul 23, 2021
1 parent cba4ea9 commit 5819242
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -212,12 +211,9 @@ public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaDat
if (exclusiveLeaderProducer == null) {
throw new IllegalStateException("Not the leader");
}
// Check first to avoid local cache update failure
checkRequestOutDated(functionMetaData, delete);

if (delete) {
needsScheduling = proccessDeregister(functionMetaData);
} else {
needsScheduling = processUpdate(functionMetaData);
}
byte[] toWrite;
if (workerConfig.getUseCompactedMetadataTopic()) {
if (delete) {
Expand All @@ -243,6 +239,11 @@ public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaDat
builder = builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()));
}
lastMessageSeen = builder.send();
if (delete) {
needsScheduling = proccessDeregister(functionMetaData);
} else {
needsScheduling = processUpdate(functionMetaData);
}
} catch (Exception e) {
log.error("Could not write into Function Metadata topic", e);
throw new IllegalStateException("Internal Error updating function at the leader", e);
Expand All @@ -253,6 +254,22 @@ public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaDat
}
}

private void checkRequestOutDated(FunctionMetaData functionMetaData, boolean delete) {
Function.FunctionDetails details = functionMetaData.getFunctionDetails();
if (isRequestOutdated(details.getTenant(), details.getNamespace(),
details.getName(), functionMetaData.getVersion())) {
if (log.isDebugEnabled()) {
log.debug("{}/{}/{} Ignoring outdated request version: {}", details.getTenant(), details.getNamespace(),
details.getName(), functionMetaData.getVersion());
}
if (delete) {
throw new IllegalArgumentException(
"Delete request ignored because it is out of date. Please try again.");
}
throw new IllegalArgumentException("Update request ignored because it is out of date. Please try again.");
}
}

/**
* Acquires a exclusive producer. This method cannot return null. It can only return a valid exclusive producer
* or throw NotLeaderAnymore exception.
Expand Down Expand Up @@ -455,6 +472,10 @@ private boolean isRequestOutdated(FunctionMetaData requestFunctionMetaData) {
}

private boolean isRequestOutdated(String tenant, String namespace, String functionName, long version) {
// avoid NPE
if(!containsFunctionMetaData(tenant, namespace, functionName)){
return false;
}
FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(tenant)
.get(namespace).get(functionName);
return currentFunctionMetaData.getVersion() >= version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
Expand Down Expand Up @@ -121,6 +122,50 @@ public void testListFunctions() throws PulsarClientException {
"tenant-1", "namespace-2").contains(f3));
}

@Test
public void testSendMsgFailWithCompaction() throws Exception {
testSendMsgFail(true);
}

@Test
public void testSendMsgFailWithoutCompaction() throws Exception {
testSendMsgFail(false);
}

private void testSendMsgFail(boolean compact) throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setUseCompactedMetadataTopic(compact);
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
.setVersion(1)
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")).build();

// become leader
Producer<byte[]> exclusiveProducer = spy(functionMetaDataManager.acquireExclusiveWrite(() -> true));
// make sure send msg fail
functionMetaDataManager.acquireLeadership(exclusiveProducer);
exclusiveProducer.close();
when(exclusiveProducer.newMessage()).thenThrow(new RuntimeException("should failed"));
try {
functionMetaDataManager.updateFunctionOnLeader(m1, false);
fail("should failed");
} catch (Exception e) {
assertTrue(e.getCause().getMessage().contains("should failed"));
}
assertEquals(functionMetaDataManager.getAllFunctionMetaData().size(), 0);
try {
functionMetaDataManager.updateFunctionOnLeader(m1, true);
fail("should failed");
} catch (Exception e) {
assertTrue(e.getCause().getMessage().contains("should failed"));
}
assertEquals(functionMetaDataManager.getAllFunctionMetaData().size(), 0);
}

@Test
public void testUpdateIfLeaderFunctionWithoutCompaction() throws Exception {
testUpdateIfLeaderFunction(false);
Expand Down

0 comments on commit 5819242

Please sign in to comment.