Skip to content

Commit

Permalink
Close reader for metadata topic during initialization (apache#8637)
Browse files Browse the repository at this point in the history
Co-authored-by: Jerry Peng <[email protected]>
  • Loading branch information
jerrypeng and Jerry Peng authored Nov 20, 2020
1 parent a8cd908 commit 95755fd
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,
* We create a new reader
*/
public synchronized void initialize() {
try {
try (Reader reader = FunctionMetaDataTopicTailer.createReader(
workerConfig, pulsarClient.newReader(), MessageId.earliest)){
// read all existing messages
Reader reader = FunctionMetaDataTopicTailer.createReader(workerConfig, pulsarClient.newReader(), MessageId.earliest);
while (reader.hasMessageAvailable()) {
processMetaDataTopicMessage(reader.readNext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,11 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
* @return the message id of the message processed during init phase
*/
public MessageId initialize() {
try {
Reader<byte[]> reader = WorkerUtils.createReader(
workerService.getClient().newReader(),
workerConfig.getWorkerId() + "-function-assignment-initialize",
workerConfig.getFunctionAssignmentTopic(),
MessageId.earliest);
try (Reader<byte[]> reader = WorkerUtils.createReader (
workerService.getClient().newReader(),
workerConfig.getWorkerId() + "-function-assignment-initialize",
workerConfig.getFunctionAssignmentTopic(),
MessageId.earliest)) {

// start init phase
this.isInitializePhase = true;
Expand All @@ -246,8 +245,6 @@ public MessageId initialize() {
}
// init phase is done
this.isInitializePhase = false;
// close reader
reader.close();
// realize existing assignments
Map<String, Assignment> assignmentMap = workerIdToAssignments.get(this.workerConfig.getWorkerId());
if (assignmentMap != null) {
Expand Down

0 comments on commit 95755fd

Please sign in to comment.