Skip to content

Commit

Permalink
Add an endpoint to check whether function worker service is initializ…
Browse files Browse the repository at this point in the history
…ed (apache#7350)

* Add an endpoint to check whether function worker service is initialized

Co-authored-by: Jerry Peng <[email protected]>
  • Loading branch information
jerrypeng and Jerry Peng authored Jun 25, 2020
1 parent 3a3d08f commit ec0271b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class WorkerService {
private StorageAdminClient stateStoreAdminClient;
private MembershipManager membershipManager;
private SchedulerManager schedulerManager;
private boolean isInitialized = false;
private volatile boolean isInitialized = false;
private final ScheduledExecutorService statsUpdater;
private AuthenticationService authenticationService;
private AuthorizationService authorizationService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
*/
package org.apache.pulsar.functions.worker.rest;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
import org.apache.pulsar.functions.worker.rest.api.v2.FunctionsApiV2Resource;
import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
Expand All @@ -33,6 +29,10 @@
import org.apache.pulsar.functions.worker.rest.api.v3.SourcesApiV3Resource;
import org.glassfish.jersey.media.multipart.MultiPartFeature;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

public final class Resources {

private Resources() {
Expand Down Expand Up @@ -64,7 +64,8 @@ public static Set<Class<?>> getRootResources() {
return new HashSet<>(
Arrays.asList(
ConfigurationResource.class,
FunctionsMetricsResource.class
FunctionsMetricsResource.class,
WorkerReadinessResource.class
));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker.rest;

import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.apache.pulsar.functions.worker.WorkerService;

import java.util.function.Supplier;

import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;

@Path("/")
public class WorkerReadinessResource implements Supplier<WorkerService> {

public static final String ATTRIBUTE_WORKER_SERVICE = "worker";

private WorkerService workerService;
@Context
protected ServletContext servletContext;
@Context
protected HttpServletRequest httpRequest;

@Override
public synchronized WorkerService get() {
if (this.workerService == null) {
this.workerService = (WorkerService) servletContext.getAttribute(ATTRIBUTE_WORKER_SERVICE);
}
return this.workerService;
}

@GET
@ApiOperation(
value = "Determines whether the worker service is initialized and ready for use",
response = Boolean.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 408, message = "Request timeout")
})
@Path("/initialized")
public boolean isInitialized() {
if (!get().isInitialized()) {
throw new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE);
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
Expand All @@ -37,11 +43,6 @@
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

@Slf4j
@Path("/worker")
Expand Down Expand Up @@ -123,6 +124,11 @@ public Map<String, Collection<String>> getAssignments() {
return worker.getAssignments(clientAppId());
}

@GET
@ApiOperation(
value = "Fetches a list of supported Pulsar IO connectors currently running in cluster mode",
response = List.class
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request"),
Expand Down

0 comments on commit ec0271b

Please sign in to comment.