Skip to content

Commit

Permalink
[pulsar-flink]Cache Pulsar client to make it shared among tasks in a …
Browse files Browse the repository at this point in the history
…process (apache#5900)

* Cache Pulsar client to make it shared among tasks in a process

* code format & add tests

* fix style

Co-authored-by: Sijie Guo <[email protected]>
  • Loading branch information
2 people authored and wolfstudy committed Jan 3, 2020
1 parent dcaa1d3 commit 5c58ff4
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.streaming.connectors.pulsar;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A Pulsar Client cache that enables client sharing among different flink tasks in same process.
*/
public class CachedPulsarClient {
private static final Logger LOG = LoggerFactory.getLogger(CachedPulsarClient.class);

private static int cacheSize = 5;

public static void setCacheSize(int size) {
cacheSize = size;
}

private static CacheLoader<ClientConfigurationData, PulsarClientImpl> cacheLoader =
new CacheLoader<ClientConfigurationData, PulsarClientImpl>() {
@Override
public PulsarClientImpl load(ClientConfigurationData key) throws Exception {
return createPulsarClient(key);
}
};

private static RemovalListener<ClientConfigurationData, PulsarClientImpl> removalListener = notification -> {
ClientConfigurationData config = notification.getKey();
PulsarClientImpl client = notification.getValue();
LOG.debug("Evicting pulsar client %s with config %s, due to %s",
client.toString(), config.toString(), notification.getCause().toString());
close(config, client);
};

private static LoadingCache<ClientConfigurationData, PulsarClientImpl> guavaCache =
CacheBuilder.newBuilder().maximumSize(cacheSize).removalListener(removalListener).build(cacheLoader);

private static PulsarClientImpl createPulsarClient(
ClientConfigurationData clientConfig) throws PulsarClientException {
PulsarClientImpl client;
try {
client = new PulsarClientImpl(clientConfig);
LOG.debug(String.format("Created a new instance of PulsarClientImpl for clientConf = %s",
clientConfig.toString()));
} catch (PulsarClientException e) {
LOG.error(String.format("Failed to create PulsarClientImpl for clientConf = %s",
clientConfig.toString()));
throw e;
}
return client;
}

public static PulsarClientImpl getOrCreate(ClientConfigurationData config) throws ExecutionException {
return guavaCache.get(config);
}

private static void close(ClientConfigurationData clientConfig, PulsarClientImpl client) {
try {
LOG.info(String.format("Closing the Pulsar client with conifg %s", clientConfig.toString()));
client.close();
} catch (PulsarClientException e) {
LOG.warn(String.format("Error while closing the Pulsar client ", clientConfig.toString()), e);
}
}

static void close(ClientConfigurationData clientConfig) {
guavaCache.invalidate(clientConfig);
}

static void clear() {
LOG.info("Cleaning up guava cache.");
guavaCache.invalidateAll();
}

static ConcurrentMap<ClientConfigurationData, PulsarClientImpl> getAsMap() {
return guavaCache.asMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private static <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyExtracto
}

private Producer<byte[]> createProducer() throws Exception {
PulsarClientImpl client = new PulsarClientImpl(clientConf);
PulsarClientImpl client = CachedPulsarClient.getOrCreate(clientConf);
return client.createProducerAsync(producerConf).get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
Expand Down Expand Up @@ -91,7 +92,7 @@ public void open(Configuration parameters) throws Exception {
isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled();
}

client = createClient();
client = getClient();
consumer = createConsumer(client);

isRunning = true;
Expand Down Expand Up @@ -188,8 +189,8 @@ boolean isCheckpointingEnabled() {
return isCheckpointingEnabled;
}

PulsarClient createClient() throws PulsarClientException {
return new PulsarClientImpl(clientConfigurationData);
PulsarClient getClient() throws ExecutionException {
return CachedPulsarClient.getOrCreate(clientConfigurationData);
}

Consumer<byte[]> createConsumer(PulsarClient client) throws PulsarClientException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.streaming.connectors.pulsar;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;

import java.util.concurrent.ConcurrentMap;

import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

/**
* Unit test of {@link CachedPulsarClient}.
*/
public class CachedPulsarClientTest {

private static final String SERVICE_URL = "pulsar://localhost:6650";

@BeforeTest
public void clearCache() {
CachedPulsarClient.clear();
}

@Test
public void testShouldReturnSameInstanceWithSameParam() throws Exception {
PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);
PulsarClientImpl impl2 = Mockito.mock(PulsarClientImpl.class);

ClientConfigurationData conf1 = new ClientConfigurationData();
conf1.setServiceUrl(SERVICE_URL);

ClientConfigurationData conf2 = new ClientConfigurationData();
conf2.setServiceUrl(SERVICE_URL);

PowerMockito.whenNew(PulsarClientImpl.class)
.withArguments(conf1).thenReturn(impl1);
PowerMockito.whenNew(PulsarClientImpl.class)
.withArguments(conf2).thenReturn(impl2);

PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);
PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf2);
PulsarClientImpl client3 = CachedPulsarClient.getOrCreate(conf1);

assertEquals(client1, client2);
assertEquals(client1, client3);

assertEquals(CachedPulsarClient.getAsMap().size(), 1);
}

@Test
public void testShouldCloseTheCorrectClient() throws Exception {
PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);
PulsarClientImpl impl2 = Mockito.mock(PulsarClientImpl.class);

ClientConfigurationData conf1 = new ClientConfigurationData();
conf1.setServiceUrl(SERVICE_URL);

ClientConfigurationData conf2 = new ClientConfigurationData();
conf2.setServiceUrl(SERVICE_URL);
conf2.setNumIoThreads(5);

PowerMockito.whenNew(PulsarClientImpl.class)
.withArguments(conf1).thenReturn(impl1);
PowerMockito.whenNew(PulsarClientImpl.class)
.withArguments(conf2).thenReturn(impl2);

PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);
PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf2);

assertNotEquals(client1, client2);

ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map1 = CachedPulsarClient.getAsMap();
assertEquals(map1.size(), 2);

CachedPulsarClient.close(conf2);

ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map2 = CachedPulsarClient.getAsMap();
assertEquals(map2.size(), 1);

assertEquals(map2.values().iterator().next(), client1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public ArrayDeque<Tuple2<Long, Set<MessageId>>> getRestoredState() {
}

@Override
PulsarClient createClient() {
PulsarClient getClient() {
return mock(PulsarClient.class);
}

Expand Down

0 comments on commit 5c58ff4

Please sign in to comment.