Skip to content

Commit

Permalink
Deserialize LoadManagerReport with custom-deserializer + fix broker-d…
Browse files Browse the repository at this point in the history
…iscovery (apache#908)
  • Loading branch information
rdhabalia authored and merlimat committed Nov 17, 2017
1 parent aa9d267 commit 63fe18c
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

import java.util.Map;


import com.fasterxml.jackson.databind.annotation.JsonDeserialize;

/**
* This class represents the overall load of the broker - it includes overall SystemResourceUsage and Bundle-usage
*/
@JsonDeserialize(using = LoadReportDeserializer.class)
public interface LoadManagerReport extends ServiceLookupData {

public ResourceUsage getCpu();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage.ResourceType;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.common.collect.Maps;

/**
* This class represents the overall load of the broker - it includes overall {@link SystemResourceUsage} and
* {@link NamespaceUsage} for all the namespaces hosted by this broker.
*/
@JsonDeserialize(as = LoadReport.class)
public class LoadReport implements LoadManagerReport {
private String name;
private String brokerVersionString;
Expand All @@ -54,7 +56,9 @@ public class LoadReport implements LoadManagerReport {
private int numConsumers;
private int numProducers;
private int numBundles;

// This place-holder requires to identify correct LoadManagerReport type while deserializing
public static final String loadReportType = LoadReport.class.getSimpleName();

public LoadReport() {
this(null, null, null, null);
}
Expand Down Expand Up @@ -193,6 +197,10 @@ public double getMsgRateOut() {
return msgRateOut;
}

public String getLoadReportType() {
return loadReportType;
}

@Override
public int getNumTopics() {
numTopics = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.policies.data.loadbalancer;

import java.io.IOException;

import org.apache.pulsar.common.util.ObjectMapperFactory;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

public class LoadReportDeserializer extends JsonDeserializer<LoadManagerReport> {
@Override
public LoadManagerReport deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException, JsonProcessingException {
ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
ObjectNode root = ObjectMapperFactory.getThreadLocal().readTree(jsonParser);
if ((root.has("loadReportType") && root.get("loadReportType").asText().equals(LoadReport.loadReportType))
|| (root.has("underLoaded"))) {
return mapper.readValue(root.toString(), LoadReport.class);
} else {
return mapper.readValue(root.toString(), LocalBrokerData.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import java.util.Map;
import java.util.Set;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;


/**
* Contains all the data that is maintained locally on each broker.
*/
@JsonDeserialize(as = LocalBrokerData.class)
public class LocalBrokerData extends JSONWritable implements LoadManagerReport {

// URLs to satisfy contract of ServiceLookupData (used by NamespaceService).
Expand Down Expand Up @@ -74,6 +77,8 @@ public class LocalBrokerData extends JSONWritable implements LoadManagerReport {

// The version string that this broker is running, obtained from the Maven build artifact in the POM
private String brokerVersionString;
// This place-holder requires to identify correct LoadManagerReport type while deserializing
public static final String loadReportType = LocalBrokerData.class.getSimpleName();

// For JSON only.
public LocalBrokerData() {
Expand Down Expand Up @@ -197,6 +202,10 @@ public double getMaxResourceUsage() {
/ 100;
}

public String getLoadReportType() {
return loadReportType;
}

@Override
public ResourceUsage getCpu() {
return cpu;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.common.lookup.data;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import java.util.Map;

Expand All @@ -27,6 +28,11 @@
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.testng.annotations.Test;

@Test
Expand Down Expand Up @@ -67,4 +73,50 @@ void testUrlEncoder() {
assertEquals(str, Codec.decode(urlEncoded));
assertEquals(Codec.decode(urlEncoded), Codec.decode(uriEncoded));
}

@Test
public void testLoadReportSerialization() throws Exception {
final String simpleLmBrokerUrl = "simple";
final String simpleLmReportName = "simpleLoadManager";
final String modularLmBrokerUrl = "modular";
final SystemResourceUsage simpleLmSystemResourceUsage = new SystemResourceUsage();
final ResourceUsage resource = new ResourceUsage();
final double usage = 55.0;
resource.usage = usage;
simpleLmSystemResourceUsage.bandwidthIn = resource;

LoadReport simpleReport = getSimpleLoadManagerLoadReport(simpleLmBrokerUrl, simpleLmReportName,
simpleLmSystemResourceUsage);

LocalBrokerData modularReport = getModularLoadManagerLoadReport(modularLmBrokerUrl, resource);

LoadManagerReport simpleLoadReport = ObjectMapperFactory.getThreadLocal().readValue(
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(simpleReport), LoadManagerReport.class);
LoadManagerReport modularLoadReport = ObjectMapperFactory.getThreadLocal().readValue(
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(modularReport), LoadManagerReport.class);

assertEquals(simpleLoadReport.getWebServiceUrl(), simpleLmBrokerUrl);
assertTrue(simpleLoadReport instanceof LoadReport);
assertEquals(((LoadReport) simpleLoadReport).getName(), simpleLmReportName);
assertEquals(((LoadReport) simpleLoadReport).getSystemResourceUsage().bandwidthIn.usage, usage);

assertEquals(modularLoadReport.getWebServiceUrl(), modularLmBrokerUrl);
assertTrue(modularLoadReport instanceof LocalBrokerData);
assertEquals(((LocalBrokerData) modularLoadReport).getBandwidthIn().usage, usage);

}

private LoadReport getSimpleLoadManagerLoadReport(String brokerUrl, String reportName,
SystemResourceUsage systemResourceUsage) {
LoadReport report = new LoadReport(brokerUrl, null, null, null);
report.setName(reportName);
report.setSystemResourceUsage(systemResourceUsage);
return report;
}

private LocalBrokerData getModularLoadManagerLoadReport(String brokerUrl, ResourceUsage bandwidthIn) {
LocalBrokerData report = new LocalBrokerData(brokerUrl, null, null, null);
report.setBandwidthIn(bandwidthIn);
return report;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.discovery.service;

import static org.apache.bookkeeper.util.MathUtils.signSafeMod;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;

import java.io.Closeable;
Expand All @@ -35,7 +36,7 @@
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.discovery.service.server.ServiceConfig;
import org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.zookeeper.KeeperException;
Expand All @@ -45,7 +46,6 @@
import com.google.common.base.Joiner;

import io.netty.util.concurrent.DefaultThreadFactory;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

/**
* Maintains available active broker list and returns next active broker in round-robin for discovery service.
Expand Down Expand Up @@ -78,13 +78,13 @@ public BrokerDiscoveryProvider(ServiceConfig config, ZooKeeperClientFactory zkCl
}

/**
* Find next broke {@link LoadReport} in round-robin fashion.
* Find next broker {@link LoadManagerReport} in round-robin fashion.
*
* @return
* @throws PulsarServerException
*/
LoadReport nextBroker() throws PulsarServerException {
List<LoadReport> availableBrokers = localZkCache.getAvailableBrokers();
LoadManagerReport nextBroker() throws PulsarServerException {
List<LoadManagerReport> availableBrokers = localZkCache.getAvailableBrokers();

if (availableBrokers.isEmpty()) {
throw new PulsarServerException("No active broker is available");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -129,7 +129,7 @@ private void close() {

private void sendLookupResponse(long requestId) {
try {
LoadReport availableBroker = service.getDiscoveryProvider().nextBroker();
LoadManagerReport availableBroker = service.getDiscoveryProvider().nextBroker();
ctx.writeAndFlush(Commands.newLookupResponse(availableBroker.getPulsarServiceUrl(),
availableBroker.getPulsarServiceUrlTls(), false, Redirect, requestId, false));
} catch (PulsarServerException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response.Status;

import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.slf4j.Logger;
Expand Down Expand Up @@ -122,7 +122,7 @@ private void redirect(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {

try {
LoadReport broker = nextBroker();
LoadManagerReport broker = nextBroker();

URI brokerURI;
if (request.getScheme().equals("http")) {
Expand Down Expand Up @@ -155,8 +155,8 @@ private void redirect(HttpServletRequest request, HttpServletResponse response)
*
* @return
*/
LoadReport nextBroker() {
List<LoadReport> availableBrokers = zkCache.getAvailableBrokers();
LoadManagerReport nextBroker() {
List<LoadManagerReport> availableBrokers = zkCache.getAvailableBrokers();

if (availableBrokers.isEmpty()) {
throw new RestException(Status.SERVICE_UNAVAILABLE, "No active broker is available");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
Expand All @@ -48,10 +48,10 @@ public class ZookeeperCacheLoader implements Closeable {
private final ZooKeeperCache localZkCache;
private final LocalZooKeeperConnectionService localZkConnectionSvc;

private final ZooKeeperDataCache<LoadReport> brokerInfo;
private final ZooKeeperDataCache<LoadManagerReport> brokerInfo;
private final ZooKeeperChildrenCache availableBrokersCache;

private volatile List<LoadReport> availableBrokers;
private volatile List<LoadManagerReport> availableBrokers;

private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-discovery-ordered-cache");
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(8,
Expand Down Expand Up @@ -83,10 +83,10 @@ public ZookeeperCacheLoader(ZooKeeperClientFactory zkClientFactory, String zooke
}
});

this.brokerInfo = new ZooKeeperDataCache<LoadReport>(localZkCache) {
this.brokerInfo = new ZooKeeperDataCache<LoadManagerReport>(localZkCache) {
@Override
public LoadReport deserialize(String key, byte[] content) throws Exception {
return ObjectMapperFactory.getThreadLocal().readValue(content, LoadReport.class);
public LoadManagerReport deserialize(String key, byte[] content) throws Exception {
return ObjectMapperFactory.getThreadLocal().readValue(content, LoadManagerReport.class);
}
};

Expand All @@ -103,7 +103,7 @@ public LoadReport deserialize(String key, byte[] content) throws Exception {
updateBrokerList(availableBrokersCache.get());
}

public List<LoadReport> getAvailableBrokers() {
public List<LoadManagerReport> getAvailableBrokers() {
return availableBrokers;
}

Expand All @@ -118,7 +118,7 @@ public void close() {
}

private void updateBrokerList(Set<String> brokerNodes) throws Exception {
List<LoadReport> availableBrokers = new ArrayList<>(brokerNodes.size());
List<LoadManagerReport> availableBrokers = new ArrayList<>(brokerNodes.size());
for (String broker : brokerNodes) {
availableBrokers.add(brokerInfo.get(LOADBALANCE_BROKERS_ROOT + '/' + broker).get());
}
Expand Down
Loading

0 comments on commit 63fe18c

Please sign in to comment.