Skip to content

Commit

Permalink
Report provider and consumer to lookout. (sofastack#358)
Browse files Browse the repository at this point in the history
  • Loading branch information
leizhiyuan authored and ujjboy committed Nov 30, 2018
1 parent 67c9016 commit 232e965
Show file tree
Hide file tree
Showing 12 changed files with 389 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.event;

import com.alipay.sofa.rpc.config.ConsumerConfig;

/**
* @author <a href=mailto:[email protected]>leizhiyuan</a>
*/
public class ConsumerSubEvent implements Event {

private ConsumerConfig consumerConfig;

public ConsumerSubEvent(ConsumerConfig consumerConfig) {
this.consumerConfig = consumerConfig;
}

public ConsumerConfig getConsumerConfig() {
return consumerConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.event;

import com.alipay.sofa.rpc.config.ProviderConfig;

/**
* @author <a href=mailto:[email protected]>leizhiyuan</a>
*/
public class ProviderPubEvent implements Event {

private ProviderConfig providerConfig;

public ProviderPubEvent(ProviderConfig providerConfig) {
this.providerConfig = providerConfig;
}

public ProviderConfig getProviderConfig() {
return providerConfig;
}
}
7 changes: 7 additions & 0 deletions extension-impl/metrics-lookout/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
<version>${project.parent.version}</version>
</dependency>

<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-log-common-tools</artifactId>
<version>${project.parent.version}</version>
</dependency>


<dependency>
<groupId>com.alipay.sofa.lookout</groupId>
<artifactId>lookout-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ public void onEvent(Event event) {
ServerStoppedEvent serverStartedEvent = (ServerStoppedEvent) event;

rpcMetrics.removeThreadPool(serverStartedEvent.getServerConfig());
} else if (eventClass == ProviderPubEvent.class) {
ProviderPubEvent providerPubEvent = (ProviderPubEvent) event;
rpcMetrics.collectProvderPubInfo(providerPubEvent.getProviderConfig());
}
else if (eventClass == ConsumerSubEvent.class) {
ConsumerSubEvent consumerSubEvent = (ConsumerSubEvent) event;
rpcMetrics.collectConsumerSubInfo(consumerSubEvent.getConsumerConfig());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.alipay.lookout.api.composite.MixinMetric;
import com.alipay.lookout.api.info.Info;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.log.LogCodes;
import com.alipay.sofa.rpc.log.Logger;
Expand Down Expand Up @@ -102,31 +104,32 @@ public void collectThreadPool(ServerConfig serverConfig, final ThreadPoolExecuto

final ThreadPoolConfig threadPoolConfig = new ThreadPoolConfig(coreSize, maxSize, queueSize);

Lookout.registry().info(rpcLookoutId.getServerThreadConfigId(serverConfig), new Info<ThreadPoolConfig>() {
Lookout.registry().info(rpcLookoutId.fetchServerThreadConfigId(serverConfig), new Info<ThreadPoolConfig>() {

@Override
public ThreadPoolConfig value() {
return threadPoolConfig;
}
});

Lookout.registry().gauge(rpcLookoutId.getServerThreadPoolActiveCountId(serverConfig), new Gauge<Integer>() {
Lookout.registry().gauge(rpcLookoutId.fetchServerThreadPoolActiveCountId(serverConfig),
new Gauge<Integer>() {

@Override
public Integer value() {
return threadPoolExecutor.getActiveCount();
}
});
@Override
public Integer value() {
return threadPoolExecutor.getActiveCount();
}
});

Lookout.registry().gauge(rpcLookoutId.getServerThreadPoolIdleCountId(serverConfig), new Gauge<Integer>() {
Lookout.registry().gauge(rpcLookoutId.fetchServerThreadPoolIdleCountId(serverConfig), new Gauge<Integer>() {

@Override
public Integer value() {
return threadPoolExecutor.getPoolSize() - threadPoolExecutor.getActiveCount();
}
});

Lookout.registry().gauge(rpcLookoutId.getServerThreadPoolQueueSizeId(serverConfig), new Gauge<Integer>() {
Lookout.registry().gauge(rpcLookoutId.fetchServerThreadPoolQueueSizeId(serverConfig), new Gauge<Integer>() {

@Override
public Integer value() {
Expand Down Expand Up @@ -217,7 +220,7 @@ private Id createMethodConsumerId(RpcClientLookoutModel model) {
tags.put("invoke_type", StringUtils.defaultString(model.getInvokeType()));
tags.put("target_app", StringUtils.defaultString(model.getTargetApp()));

return rpcLookoutId.getConsumerId().withTags(tags);
return rpcLookoutId.fetchConsumerStatId().withTags(tags);
}

/**
Expand All @@ -235,7 +238,47 @@ public Id createMethodProviderId(RpcServerLookoutModel model) {
tags.put("protocol", StringUtils.defaultString(model.getProtocol()));
tags.put("caller_app", StringUtils.defaultString(model.getCallerApp()));

return rpcLookoutId.getProviderId().withTags(tags);
return rpcLookoutId.fetchProviderStatId().withTags(tags);
}

/**
* Collect the RPC client information.
*
* @param providerConfig client information model
*/
public void collectProvderPubInfo(final ProviderConfig providerConfig) {

try {
Id providerConfigId = rpcLookoutId.fetchProviderPubId();
Lookout.registry().info(providerConfigId, new Info<ProviderConfig>() {
@Override
public ProviderConfig value() {
return providerConfig;
}
});
} catch (Throwable t) {
LOGGER.error(LogCodes.getLog(LogCodes.ERROR_METRIC_REPORT_ERROR), t);
}
}

/**
* Collect the RPC client information.
*
* @param consumerConfig client information model
*/
public void collectConsumerSubInfo(final ConsumerConfig consumerConfig) {

try {
Id consumerConfigId = rpcLookoutId.fetchConsumerSubId();
Lookout.registry().info(consumerConfigId, new Info<ConsumerConfig>() {
@Override
public ConsumerConfig value() {
return consumerConfig;
}
});
} catch (Throwable t) {
LOGGER.error(LogCodes.getLog(LogCodes.ERROR_METRIC_REPORT_ERROR), t);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,25 @@
public class RpcLookoutId {

private volatile Id consumerId;
private final Object consumerIdLock = new Object();
private final Object consumerIdLock = new Object();

private volatile Id providerId;
private final Object providerIdLock = new Object();
private final Object providerIdLock = new Object();

private final ConcurrentMap<String, Id> serverConfigIds = new ConcurrentHashMap<String, Id>();
private final ConcurrentMap<String, Id> serverConfigIds = new ConcurrentHashMap<String, Id>();

private volatile Id consumerConfigId;
private final Object consumerConfigIdLock = new Object();

private volatile Id providerConfigId;
private final Object providerConfigIdLock = new Object();

/**
* create consumerId
*
* @return consumerId
*/
public Id getConsumerId() {
public Id fetchConsumerStatId() {

if (consumerId == null) {
synchronized (consumerIdLock) {
Expand All @@ -59,7 +65,7 @@ public Id getConsumerId() {
*
* @return ProviderId
*/
public Id getProviderId() {
public Id fetchProviderStatId() {

if (providerId == null) {
synchronized (providerIdLock) {
Expand All @@ -72,27 +78,49 @@ public Id getProviderId() {
return providerId;
}

public synchronized Id getServerThreadConfigId(ServerConfig serverConfig) {
public Id fetchConsumerSubId() {
if (consumerConfigId == null) {
synchronized (consumerConfigIdLock) {
if (consumerConfigId == null) {
consumerConfigId = Lookout.registry().createId("rpc.consumer.info.stats");
}
}
}
return consumerConfigId;
}

public Id fetchProviderPubId() {
if (providerConfigId == null) {
synchronized (providerConfigIdLock) {
if (providerConfigId == null) {
providerConfigId = Lookout.registry().createId("rpc.provider.info.stats");
}
}
}
return providerConfigId;
}

public synchronized Id fetchServerThreadConfigId(ServerConfig serverConfig) {
String key = "rpc." + serverConfig.getProtocol() + ".threadpool.config";
return getId(key);
return fetchServerConfigId(key);
}

public Id getServerThreadPoolActiveCountId(ServerConfig serverConfig) {
public Id fetchServerThreadPoolActiveCountId(ServerConfig serverConfig) {
String key = "rpc." + serverConfig.getProtocol() + ".threadpool.active.count";
return getId(key);
return fetchServerConfigId(key);
}

public Id getServerThreadPoolIdleCountId(ServerConfig serverConfig) {
public Id fetchServerThreadPoolIdleCountId(ServerConfig serverConfig) {
String key = "rpc." + serverConfig.getProtocol() + ".threadpool.idle.count";
return getId(key);
return fetchServerConfigId(key);
}

public Id getServerThreadPoolQueueSizeId(ServerConfig serverConfig) {
public Id fetchServerThreadPoolQueueSizeId(ServerConfig serverConfig) {
String key = "rpc." + serverConfig.getProtocol() + ".threadpool.queue.size";
return getId(key);
return fetchServerConfigId(key);
}

private Id getId(String key) {
private Id fetchServerConfigId(String key) {
Id lookoutId = serverConfigIds.get(key);
if (lookoutId == null) {
synchronized (RpcLookout.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package com.alipay.sofa.rpc.module;

import com.alipay.sofa.rpc.event.ClientEndInvokeEvent;
import com.alipay.sofa.rpc.event.ConsumerSubEvent;
import com.alipay.sofa.rpc.event.EventBus;
import com.alipay.sofa.rpc.event.LookoutSubscriber;
import com.alipay.sofa.rpc.event.ProviderPubEvent;
import com.alipay.sofa.rpc.event.ServerSendEvent;
import com.alipay.sofa.rpc.event.ServerStartedEvent;
import com.alipay.sofa.rpc.ext.Extension;
Expand Down Expand Up @@ -48,6 +50,9 @@ public void install() {
EventBus.register(ClientEndInvokeEvent.class, subscriber);
EventBus.register(ServerSendEvent.class, subscriber);
EventBus.register(ServerStartedEvent.class, subscriber);
EventBus.register(ProviderPubEvent.class, subscriber);
EventBus.register(ConsumerSubEvent.class, subscriber);

}

@Override
Expand All @@ -56,6 +61,8 @@ public void uninstall() {
EventBus.unRegister(ClientEndInvokeEvent.class, subscriber);
EventBus.unRegister(ServerSendEvent.class, subscriber);
EventBus.unRegister(ServerStartedEvent.class, subscriber);
EventBus.unRegister(ProviderPubEvent.class, subscriber);
EventBus.unRegister(ConsumerSubEvent.class, subscriber);
}
}
}
Loading

0 comments on commit 232e965

Please sign in to comment.