Skip to content

Commit

Permalink
Shutdown load manager executor on pulsar service close (apache#489)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jun 18, 2017
1 parent 0219829 commit 07d2daf
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public class PulsarService implements AutoCloseable {
private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10,
new DefaultThreadFactory("zk-cache-callback"));
private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-ordered");
private ScheduledExecutorService loadManagerExecutor = null;
private final ScheduledExecutorService loadManagerExecutor;
private ScheduledFuture<?> loadReportTask = null;
private ScheduledFuture<?> loadSheddingTask = null;
private ScheduledFuture<?> loadResourceQuotaTask = null;
Expand Down Expand Up @@ -133,7 +133,8 @@ public PulsarService(ServiceConfiguration config) {
this.brokerVersion = PulsarBrokerVersionStringUtils.getNormalizedVersionString();
this.config = config;
this.shutdownService = new MessagingServiceShutdownHook(this);
loadManagerExecutor = Executors.newSingleThreadScheduledExecutor();
this.loadManagerExecutor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager"));
}

/**
Expand Down Expand Up @@ -174,10 +175,7 @@ public void close() throws PulsarServerException {
this.leaderElectionService = null;
}

if (loadManagerExecutor != null) {
loadManagerExecutor.shutdownNow();
}
loadManager = null;
loadManagerExecutor.shutdown();

if (globalZkCache != null) {
globalZkCache.close();
Expand Down Expand Up @@ -205,6 +203,12 @@ public void close() throws PulsarServerException {

orderedExecutor.shutdown();
cacheExecutor.shutdown();

LoadManager loadManager = this.loadManager.get();
if (loadManager != null) {
loadManager.stop();
}

state = State.Closed;

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;

import io.netty.util.concurrent.DefaultThreadFactory;

public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCacheListener<LocalBrokerData> {
private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImpl.class);

Expand Down Expand Up @@ -163,13 +165,13 @@ public ModularLoadManagerImpl() {
loadSheddingPipeline = new ArrayList<>();
loadSheddingPipeline.add(new OverloadShedder(conf));
preallocatedBundleToBroker = new ConcurrentHashMap<>();
scheduler = Executors.newScheduledThreadPool(1);
scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager"));
}

/**
* Initialize this load manager using the given PulsarService. Should be called only once, after invoking the
* default constructor.
*
*
* @param pulsar
* The service to initialize with.
*/
Expand Down Expand Up @@ -224,7 +226,7 @@ public LocalBrokerData deserialize(String key, byte[] content) throws Exception

/**
* Initialize this load manager.
*
*
* @param pulsar
* Client to construct this manager from.
*/
Expand Down Expand Up @@ -475,7 +477,7 @@ private void updateBundleData() {

/**
* As any broker, disable the broker this manager is running on.
*
*
* @throws PulsarServerException
* If ZooKeeper failed to disable the broker.
*/
Expand Down Expand Up @@ -548,7 +550,7 @@ public void onUpdate(final String path, final LocalBrokerData data, final Stat s

/**
* As the leader broker, find a suitable broker for the assignment of the given bundle.
*
*
* @param serviceUnit
* ServiceUnitId for the bundle.
* @return The name of the selected broker, as it appears on ZooKeeper.
Expand Down Expand Up @@ -610,7 +612,7 @@ public String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {

/**
* As any broker, start the load manager.
*
*
* @throws PulsarServerException
* If an unexpected error prevented the load manager from being started.
*/
Expand Down Expand Up @@ -647,7 +649,7 @@ public void start() throws PulsarServerException {

/**
* As any broker, stop the load manager.
*
*
* @throws PulsarServerException
* If an unexpected error occurred when attempting to stop the load manager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;

import io.netty.util.concurrent.DefaultThreadFactory;

public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListener<LoadReport> {

private static final Logger log = LoggerFactory.getLogger(SimpleLoadManagerImpl.class);
Expand Down Expand Up @@ -179,7 +181,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene

// Perform initializations which may be done without a PulsarService.
public SimpleLoadManagerImpl() {
scheduler = Executors.newScheduledThreadPool(1);
scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-simple-load-manager"));
this.sortedRankings.set(new TreeMap<>());
this.currentLoadReports = new HashMap<>();
this.resourceUnitRankings = new HashMap<>();
Expand Down

0 comments on commit 07d2daf

Please sign in to comment.