Skip to content

Commit

Permalink
Refactor metrics data Composite (apache#12036)
Browse files Browse the repository at this point in the history
* tmp

* tmp

* tmp

* composite

* add licence

* remove

* style

* style

* use anoymous composite

* use anoymous composite

---------

Co-authored-by: x-shadow-man <[email protected]>
Co-authored-by: earthchen <[email protected]>
  • Loading branch information
3 people authored Apr 11, 2023
1 parent 1d28205 commit 3c518e9
Show file tree
Hide file tree
Showing 53 changed files with 692 additions and 600 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.metrics.collector.DefaultMetricsCollector;
import org.apache.dubbo.metrics.filter.MetricsFilter;
import org.apache.dubbo.metrics.model.MetricsKey;
import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.metrics.model.sample.CounterMetricSample;
import org.apache.dubbo.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.metrics.model.sample.MetricSample;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,16 @@ dubbo:
address: zookeeper://127.0.0.1:2181
metadata-report:
address: zookeeper://127.0.0.1:2181

metrics:
protocol: prometheus
enable-jvm: true
enable-registry: true
aggregation:
enabled: true
prometheus:
exporter:
enabled: true
enable-metadata: true
logging:
pattern:
level: '%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ public interface MetricsConstants {
String ATTACHMENT_KEY_SERVICE = "serviceKey";
String ATTACHMENT_KEY_SIZE = "size";
String ATTACHMENT_KEY_LAST_NUM_MAP = "lastNumMap";
String ATTACHMENT_KEY_DIR_NUM = "dirNum";
String ATTACHMENT_DIRECTORY_MAP = "dirNum";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.metrics.data;

import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.metrics.model.MetricsCategory;
import org.apache.dubbo.metrics.model.MetricsSupport;
import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.metrics.report.MetricsExport;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

public class ApplicationStatComposite implements MetricsExport {

private final Map<MetricsKey, Map<String, AtomicLong>> applicationNumStats = new ConcurrentHashMap<>();

public void init(List<MetricsKey> appKeys) {
if (CollectionUtils.isEmpty(appKeys)) {
return;
}
appKeys.forEach(appKey -> applicationNumStats.put(appKey, new ConcurrentHashMap<>()));
}

public void incrementSize(MetricsKey metricsKey, String applicationName, int size) {
if (!applicationNumStats.containsKey(metricsKey)) {
return;
}
applicationNumStats.get(metricsKey).computeIfAbsent(applicationName, k -> new AtomicLong(0L)).getAndAdd(size);
}

public void setApplicationKey(MetricsKey metricsKey, String applicationName, int num) {
if (!applicationNumStats.containsKey(metricsKey)) {
return;
}
applicationNumStats.get(metricsKey).computeIfAbsent(applicationName, k -> new AtomicLong(0L)).set(num);
}


@SuppressWarnings({"rawtypes"})
public List<GaugeMetricSample> export(MetricsCategory category) {
List<GaugeMetricSample> list = new ArrayList<>();
for (MetricsKey type : applicationNumStats.keySet()) {
Map<String, AtomicLong> stringAtomicLongMap = applicationNumStats.get(type);
for (String applicationName : stringAtomicLongMap.keySet()) {
list.add(convertToSample(applicationName, type, category, stringAtomicLongMap.get(applicationName)));
}
}
return list;
}

@SuppressWarnings({"rawtypes"})
private GaugeMetricSample convertToSample(String applicationName, MetricsKey type, MetricsCategory category, AtomicLong targetNumber) {
return new GaugeMetricSample<>(type, MetricsSupport.applicationTags(applicationName), category, targetNumber, AtomicLong::get);
}

public Map<MetricsKey, Map<String, AtomicLong>> getApplicationNumStats() {
return applicationNumStats;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.metrics.data;

import org.apache.dubbo.metrics.collector.MetricsCollector;
import org.apache.dubbo.metrics.model.MetricsCategory;
import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.metrics.model.sample.GaugeMetricSample;

import org.apache.dubbo.metrics.report.MetricsExport;

import java.util.ArrayList;
import java.util.List;


/**
* As a data aggregator, use internal data containers calculates and classifies
* the registry data collected by {@link MetricsCollector MetricsCollector}, and
* provides an {@link MetricsExport MetricsExport} interface for exporting standard output formats.
*/
public abstract class BaseStatComposite implements MetricsExport {

private final ApplicationStatComposite applicationStatComposite = new ApplicationStatComposite();
private final ServiceStatComposite serviceStatComposite = new ServiceStatComposite();
private final RtStatComposite rtStatComposite = new RtStatComposite();


public BaseStatComposite() {
init(applicationStatComposite, serviceStatComposite, rtStatComposite);
}

protected abstract void init(ApplicationStatComposite applicationStatComposite, ServiceStatComposite serviceStatComposite, RtStatComposite rtStatComposite);

public void calcApplicationRt(String applicationName, String registryOpType, Long responseTime) {
rtStatComposite.calcApplicationRt(applicationName, registryOpType, responseTime);
}

public void calcServiceKeyRt(String applicationName, String serviceKey, String registryOpType, Long responseTime) {
rtStatComposite.calcServiceKeyRt(applicationName, serviceKey, registryOpType, responseTime);
}

public void setServiceKey(MetricsKey metricsKey, String applicationName, String serviceKey, int num) {
serviceStatComposite.setServiceKey(metricsKey, applicationName, serviceKey, num);
}

public void setApplicationKey(MetricsKey metricsKey, String applicationName, int num) {
applicationStatComposite.setApplicationKey(metricsKey, applicationName, num);
}

public void incrementApp(MetricsKey metricsKey, String applicationName, int size) {
applicationStatComposite.incrementSize(metricsKey, applicationName, size);
}

public void incrementServiceKey(MetricsKey metricsKey, String applicationName, String attServiceKey, int size) {
serviceStatComposite.incrementServiceKey(metricsKey, applicationName, attServiceKey, size);
}

@Override
@SuppressWarnings({"rawtypes"})
public List<GaugeMetricSample> export(MetricsCategory category) {
List<GaugeMetricSample> list = new ArrayList<>();
list.addAll(applicationStatComposite.export(category));
list.addAll(rtStatComposite.export(category));
list.addAll(serviceStatComposite.export(category));
return list;
}

public ApplicationStatComposite getApplicationStatComposite() {
return applicationStatComposite;
}

public RtStatComposite getRtStatComposite() {
return rtStatComposite;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.metrics.data;

import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.metrics.model.MetricsCategory;
import org.apache.dubbo.metrics.model.key.MetricsKeyWrapper;
import org.apache.dubbo.metrics.model.container.AtomicLongContainer;
import org.apache.dubbo.metrics.model.container.LongAccumulatorContainer;
import org.apache.dubbo.metrics.model.container.LongContainer;
import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.metrics.model.key.MetricsPlaceType;
import org.apache.dubbo.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.metrics.report.MetricsExport;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.stream.Collectors;

public class RtStatComposite implements MetricsExport {

private final List<LongContainer<? extends Number>> rtStats = new ArrayList<>();

public void init(MetricsPlaceType... placeValues) {
if (placeValues == null) {
return;
}
Arrays.stream(placeValues).forEach(metricsPlaceType -> rtStats.addAll(initStats(metricsPlaceType)));
}

private List<LongContainer<? extends Number>> initStats(MetricsPlaceType placeValue) {
List<LongContainer<? extends Number>> singleRtStats = new ArrayList<>();
singleRtStats.add(new AtomicLongContainer(new MetricsKeyWrapper(MetricsKey.METRIC_RT_LAST, placeValue)));
singleRtStats.add(new LongAccumulatorContainer(new MetricsKeyWrapper(MetricsKey.METRIC_RT_MIN, placeValue), new LongAccumulator(Long::min, Long.MAX_VALUE)));
singleRtStats.add(new LongAccumulatorContainer(new MetricsKeyWrapper(MetricsKey.METRIC_RT_MAX, placeValue), new LongAccumulator(Long::max, Long.MIN_VALUE)));
singleRtStats.add(new AtomicLongContainer(new MetricsKeyWrapper(MetricsKey.METRIC_RT_SUM, placeValue), (responseTime, longAccumulator) -> longAccumulator.addAndGet(responseTime)));
// AvgContainer is a special counter that stores the number of times but outputs function of sum/times
AtomicLongContainer avgContainer = new AtomicLongContainer(new MetricsKeyWrapper(MetricsKey.METRIC_RT_AVG, placeValue), (k, v) -> v.incrementAndGet());
avgContainer.setValueSupplier(applicationName -> {
LongContainer<? extends Number> totalContainer = rtStats.stream().filter(longContainer -> longContainer.isKeyWrapper(MetricsKey.METRIC_RT_SUM, placeValue.getType())).findFirst().get();
AtomicLong totalRtTimes = avgContainer.get(applicationName);
AtomicLong totalRtSum = (AtomicLong) totalContainer.get(applicationName);
return totalRtSum.get() / totalRtTimes.get();
});
singleRtStats.add(avgContainer);
return singleRtStats;
}

@SuppressWarnings({"rawtypes", "unchecked"})
public void calcApplicationRt(String applicationName, String registryOpType, Long responseTime) {
for (LongContainer container : rtStats.stream().filter(longContainer -> longContainer.specifyType(registryOpType)).collect(Collectors.toList())) {
Number current = (Number) ConcurrentHashMapUtils.computeIfAbsent(container, applicationName, container.getInitFunc());
container.getConsumerFunc().accept(responseTime, current);
}
}

@SuppressWarnings({"rawtypes", "unchecked"})
public void calcServiceKeyRt(String applicationName, String serviceKey, String registryOpType, Long responseTime) {
for (LongContainer container : rtStats.stream().filter(longContainer -> longContainer.specifyType(registryOpType)).collect(Collectors.toList())) {
Number current = (Number) ConcurrentHashMapUtils.computeIfAbsent(container, applicationName + "_" + serviceKey, container.getInitFunc());
container.getConsumerFunc().accept(responseTime, current);
}
}

@SuppressWarnings({"rawtypes"})
public List<GaugeMetricSample> export(MetricsCategory category) {
List<GaugeMetricSample> list = new ArrayList<>();
for (LongContainer<? extends Number> rtContainer : rtStats) {
MetricsKeyWrapper metricsKeyWrapper = rtContainer.getMetricsKeyWrapper();
for (Map.Entry<String, ? extends Number> entry : rtContainer.entrySet()) {
list.add(new GaugeMetricSample<>(metricsKeyWrapper.targetKey(), metricsKeyWrapper.targetDesc(), metricsKeyWrapper.tagName(entry.getKey()), category, entry.getKey().intern(), value -> rtContainer.getValueSupplier().apply(value.intern())));
}
}
return list;
}

public List<LongContainer<? extends Number>> getRtStats() {
return rtStats;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dubbo.metrics.data;

import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.metrics.model.MetricsCategory;
import org.apache.dubbo.metrics.model.ServiceKeyMetric;
import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.metrics.report.MetricsExport;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

public class ServiceStatComposite implements MetricsExport {

private final Map<MetricsKey, Map<ServiceKeyMetric, AtomicLong>> serviceNumStats = new ConcurrentHashMap<>();

public void init(List<MetricsKey> serviceKeys) {
if (CollectionUtils.isEmpty(serviceKeys)) {
return;
}
serviceKeys.forEach(appKey -> serviceNumStats.put(appKey, new ConcurrentHashMap<>()));
}

public void incrementServiceKey(MetricsKey metricsKey, String applicationName, String serviceKey, int size) {
if (!serviceNumStats.containsKey(metricsKey)) {
return;
}
serviceNumStats.get(metricsKey).computeIfAbsent(new ServiceKeyMetric(applicationName, serviceKey), k -> new AtomicLong(0L)).getAndAdd(size);
}

public void setServiceKey(MetricsKey type, String applicationName, String serviceKey, int num) {
if (!serviceNumStats.containsKey(type)) {
return;
}
serviceNumStats.get(type).computeIfAbsent(new ServiceKeyMetric(applicationName, serviceKey), k -> new AtomicLong(0L)).set(num);
}

@SuppressWarnings({"rawtypes"})
public List<GaugeMetricSample> export(MetricsCategory category) {
List<GaugeMetricSample> list = new ArrayList<>();
for (MetricsKey type : serviceNumStats.keySet()) {
Map<ServiceKeyMetric, AtomicLong> stringAtomicLongMap = serviceNumStats.get(type);
for (ServiceKeyMetric serviceKeyMetric : stringAtomicLongMap.keySet()) {
list.add(new GaugeMetricSample<>(type, serviceKeyMetric.getTags(), category, stringAtomicLongMap, value -> value.get(serviceKeyMetric).get()));
}
}
return list;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.dubbo.metrics.event;

import org.apache.dubbo.metrics.model.MethodMetric;
import org.apache.dubbo.metrics.model.TypeWrapper;
import org.apache.dubbo.metrics.model.key.TypeWrapper;
import org.apache.dubbo.rpc.model.ApplicationModel;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dubbo.metrics.model;

import org.apache.dubbo.common.Version;
import org.apache.dubbo.metrics.model.key.MetricsKey;

import java.util.HashMap;
import java.util.Map;
Expand Down
Loading

0 comments on commit 3c518e9

Please sign in to comment.