Skip to content

Commit

Permalink
Fix bug that WebSocket proxy returns empty metrics (apache#1567)
Browse files Browse the repository at this point in the history
  • Loading branch information
massakam authored and merlimat committed Apr 14, 2018
1 parent 31dae05 commit aa9e306
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,15 @@ private void verifyProxyMetrics(Client client, String baseUrl) {
Response response = (Response) invocationBuilder.get();
String responseStr = response.readEntity(String.class);
final Gson gson = new Gson();
System.err.println("REQ: " + statUrl);
System.err.println("RESPONSE: " + responseStr);
final List<Metrics> data = gson.fromJson(responseStr, new TypeToken<List<Metrics>>() {
List<Metrics> data = gson.fromJson(responseStr, new TypeToken<List<Metrics>>() {
}.getType());
Assert.assertFalse(data.isEmpty());
// re-generate metrics
service.getProxyStats().generate();
invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON);
response = (Response) invocationBuilder.get();
responseStr = response.readEntity(String.class);
data = gson.fromJson(responseStr, new TypeToken<List<Metrics>>() {
}.getType());
Assert.assertFalse(data.isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* It periodically generates stats metrics of proxy service,
*
Expand All @@ -41,15 +44,13 @@ public class ProxyStats {
private final WebSocketService service;
private final JvmMetrics jvmMetrics;
private ConcurrentOpenHashMap<String, ProxyNamespaceStats> topicStats;
private List<Metrics> tempMetricsCollection;
private List<Metrics> metricsCollection;

public ProxyStats(WebSocketService service) {
super();
this.service = service;
this.jvmMetrics = new JvmMetrics(service);
this.topicStats = new ConcurrentOpenHashMap<>();
this.tempMetricsCollection = Lists.newArrayList();
this.metricsCollection = Lists.newArrayList();
// schedule stat generation task every 1 minute
service.getExecutor().scheduleAtFixedRate(() -> generate(), 120, 60, TimeUnit.SECONDS);
Expand All @@ -59,25 +60,32 @@ public ProxyStats(WebSocketService service) {
* generates stats-metrics of proxy service and updates metricsCollection cache with latest stats.
*/
public synchronized void generate() {
if (log.isDebugEnabled()) {
log.debug("Start generating proxy metrics");
}

topicStats.clear();

service.getProducers().forEach((topic, handlers) -> {
if (log.isDebugEnabled()) {
log.debug("Collect stats from {} producer handlers for topic {}", handlers.size(), topic);
}

final String namespaceName = TopicName.get(topic).getNamespace();
ProxyNamespaceStats nsStat = topicStats.computeIfAbsent(namespaceName, ns -> new ProxyNamespaceStats());
handlers.forEach(handler -> {
nsStat.numberOfMsgPublished += handler.getAndResetNumMsgsSent();
nsStat.numberOfBytesPublished += handler.getAndResetNumBytesSent();
nsStat.numberOfPublishFailure += handler.getAndResetNumMsgsFailed();
if (nsStat.publishMsgLatency == null) {
nsStat.publishMsgLatency = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
}
handler.getPublishLatencyStatsUSec().refresh();
nsStat.publishMsgLatency.addAll(handler.getPublishLatencyStatsUSec());
System.out.println(nsStat.publishMsgLatency);
});
});
service.getConsumers().forEach((topic, handlers) -> {
if (log.isDebugEnabled()) {
log.debug("Collect stats from {} consumer handlers for topic {}", handlers.size(), topic);
}

final String namespaceName = TopicName.get(topic).getNamespace();
ProxyNamespaceStats nsStat = topicStats.computeIfAbsent(namespaceName, ns -> new ProxyNamespaceStats());
handlers.forEach(handler -> {
Expand All @@ -87,16 +95,28 @@ public synchronized void generate() {
});
});

tempMetricsCollection.clear();
List<Metrics> tempMetricsCollection = Lists.newArrayList();
topicStats.forEach((namespace, stats) -> {
if (log.isDebugEnabled()) {
log.debug("Add ns-stats of namespace {} to metrics", namespace);
}
tempMetricsCollection.add(stats.add(namespace));
});

// add jvm-metrics
if (log.isDebugEnabled()) {
log.debug("Add jvm-stats to metrics");
}
tempMetricsCollection.add(jvmMetrics.generate());

// swap tempmetrics to stat-metrics
List<Metrics> tempRef = metricsCollection;
metricsCollection = tempMetricsCollection;
tempRef.clear();

if (log.isDebugEnabled()) {
log.debug("Complete generating proxy metrics");
}
}

public synchronized List<Metrics> getMetrics() {
Expand All @@ -114,6 +134,10 @@ private static class ProxyNamespaceStats {
public long numberOfBytesDelivered;
public long numberOfMsgsAcked;

public ProxyNamespaceStats() {
this.publishMsgLatency = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
}

public Metrics add(String namespace) {

publishMsgLatency.refresh();
Expand All @@ -137,4 +161,6 @@ public Metrics add(String namespace) {
}
}

private static final Logger log = LoggerFactory.getLogger(ProxyStats.class);

}

0 comments on commit aa9e306

Please sign in to comment.