Skip to content

Commit 8f22095

Browse files
koo-taejinemeroad
authored andcommitted
[#github_4182] Makes each PinpointServer has its own ServerMessageListener
1. change setMessageListener -> setMessageListenerFactory 2. change test code
1 parent cbd1a91 commit 8f22095

31 files changed

+987
-615
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2018 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.navercorp.pinpoint.collector.cluster.connection;
18+
19+
import com.navercorp.pinpoint.rpc.MessageListener;
20+
import com.navercorp.pinpoint.rpc.PinpointSocket;
21+
import com.navercorp.pinpoint.rpc.packet.HandshakeResponseCode;
22+
import com.navercorp.pinpoint.rpc.packet.PingPayloadPacket;
23+
import com.navercorp.pinpoint.rpc.packet.RequestPacket;
24+
import com.navercorp.pinpoint.rpc.packet.SendPacket;
25+
import com.navercorp.pinpoint.rpc.server.PinpointServer;
26+
import com.navercorp.pinpoint.rpc.server.ServerMessageListener;
27+
import com.navercorp.pinpoint.rpc.server.ServerMessageListenerFactory;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.util.Map;
32+
33+
/**
34+
* @author Taejin Koo
35+
*/
36+
class ClusterServerMessageListenerFactory implements ServerMessageListenerFactory {
37+
38+
private final String clusterId;
39+
private final MessageListener routeMessageListener;
40+
41+
public ClusterServerMessageListenerFactory(String clusterId, MessageListener routeMessageListener) {
42+
this.clusterId = clusterId;
43+
this.routeMessageListener = routeMessageListener;
44+
}
45+
46+
@Override
47+
public ServerMessageListener create() {
48+
return new ClusterServerMessageListener(clusterId, routeMessageListener);
49+
}
50+
51+
52+
private static class ClusterServerMessageListener implements ServerMessageListener {
53+
54+
private final Logger logger = LoggerFactory.getLogger(this.getClass());
55+
56+
private final String clusterId;
57+
private final MessageListener routeMessageListener;
58+
59+
public ClusterServerMessageListener(String clusterId, MessageListener routeMessageListener) {
60+
this.clusterId = clusterId;
61+
this.routeMessageListener = routeMessageListener;
62+
}
63+
64+
@Override
65+
public void handleSend(SendPacket sendPacket, PinpointSocket pinpointSocket) {
66+
logger.info("handleSend packet:{}, remote:{}", sendPacket, pinpointSocket.getRemoteAddress());
67+
}
68+
69+
@Override
70+
public void handleRequest(RequestPacket requestPacket, PinpointSocket pinpointSocket) {
71+
logger.info("handleRequest packet:{}, remote:{}", requestPacket, pinpointSocket.getRemoteAddress());
72+
73+
// TODO : need handle control message (looks like getClusterId, ..)
74+
routeMessageListener.handleRequest(requestPacket, pinpointSocket);
75+
}
76+
77+
@Override
78+
public HandshakeResponseCode handleHandshake(Map properties) {
79+
logger.info("handle handShake {}", properties);
80+
return HandshakeResponseCode.DUPLEX_COMMUNICATION;
81+
}
82+
83+
@Override
84+
public void handlePing(PingPayloadPacket pingPacket, PinpointServer pinpointServer) {
85+
logger.info("ping received packet:{}, remote:{}", pingPacket, pinpointServer);
86+
}
87+
88+
}
89+
90+
}

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/connection/CollectorClusterAcceptor.java

+1-45
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,19 @@
2020
import com.navercorp.pinpoint.collector.util.Address;
2121
import com.navercorp.pinpoint.collector.util.DefaultAddress;
2222
import com.navercorp.pinpoint.common.util.Assert;
23-
import com.navercorp.pinpoint.rpc.MessageListener;
24-
import com.navercorp.pinpoint.rpc.PinpointSocket;
2523
import com.navercorp.pinpoint.rpc.cluster.ClusterOption;
2624
import com.navercorp.pinpoint.rpc.cluster.Role;
2725
import com.navercorp.pinpoint.rpc.common.SocketStateCode;
28-
import com.navercorp.pinpoint.rpc.packet.HandshakeResponseCode;
29-
import com.navercorp.pinpoint.rpc.packet.PingPayloadPacket;
30-
import com.navercorp.pinpoint.rpc.packet.RequestPacket;
31-
import com.navercorp.pinpoint.rpc.packet.SendPacket;
3226
import com.navercorp.pinpoint.rpc.server.ChannelFilter;
3327
import com.navercorp.pinpoint.rpc.server.PinpointServer;
3428
import com.navercorp.pinpoint.rpc.server.PinpointServerAcceptor;
35-
import com.navercorp.pinpoint.rpc.server.ServerMessageListener;
3629
import com.navercorp.pinpoint.rpc.server.handler.ServerStateChangeEventHandler;
3730
import com.navercorp.pinpoint.rpc.util.ClassUtils;
3831
import org.slf4j.Logger;
3932
import org.slf4j.LoggerFactory;
4033

4134
import java.net.InetSocketAddress;
4235
import java.net.SocketAddress;
43-
import java.util.Map;
4436

4537
/**
4638
* @author Taejin Koo
@@ -71,7 +63,7 @@ public void start() {
7163
ClusterOption clusterOption = new ClusterOption(true, option.getClusterId(), Role.ROUTER);
7264

7365
PinpointServerAcceptor serverAcceptor = new PinpointServerAcceptor(clusterOption, ChannelFilter.BYPASS);
74-
serverAcceptor.setMessageListener(new ClusterServerMessageListener(option.getClusterId(), option.getRouteMessageHandler()));
66+
serverAcceptor.setMessageListenerFactory(new ClusterServerMessageListenerFactory(option.getClusterId(), option.getRouteMessageHandler()));
7567
serverAcceptor.setServerStreamChannelMessageListener(option.getRouteStreamMessageHandler());
7668
serverAcceptor.addStateChangeEventHandler(new WebClusterServerChannelStateChangeHandler());
7769
serverAcceptor.bind(bindAddress);
@@ -92,42 +84,6 @@ public void stop() {
9284
logger.info("{} destroying completed.", name);
9385
}
9486

95-
class ClusterServerMessageListener implements ServerMessageListener {
96-
97-
private final String clusterId;
98-
private final MessageListener routeMessageListener;
99-
100-
public ClusterServerMessageListener(String clusterId, MessageListener routeMessageListener) {
101-
this.clusterId = clusterId;
102-
this.routeMessageListener = routeMessageListener;
103-
}
104-
105-
@Override
106-
public void handleSend(SendPacket sendPacket, PinpointSocket pinpointSocket) {
107-
logger.info("handleSend packet:{}, remote:{}", sendPacket, pinpointSocket.getRemoteAddress());
108-
}
109-
110-
@Override
111-
public void handleRequest(RequestPacket requestPacket, PinpointSocket pinpointSocket) {
112-
logger.info("handleRequest packet:{}, remote:{}", requestPacket, pinpointSocket.getRemoteAddress());
113-
114-
// TODO : need handle control message (looks like getClusterId, ..)
115-
routeMessageListener.handleRequest(requestPacket, pinpointSocket);
116-
}
117-
118-
@Override
119-
public HandshakeResponseCode handleHandshake(Map properties) {
120-
logger.info("handle handShake {}", properties);
121-
return HandshakeResponseCode.DUPLEX_COMMUNICATION;
122-
}
123-
124-
@Override
125-
public void handlePing(PingPayloadPacket pingPacket, PinpointServer pinpointServer) {
126-
logger.info("ping received packet:{}, remote:{}", pingPacket, pinpointServer);
127-
}
128-
129-
}
130-
13187
class WebClusterServerChannelStateChangeHandler implements ServerStateChangeEventHandler {
13288

13389
@Override

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/tcp/AgentBaseDataReceiver.java

+1-82
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,9 @@
2121
import com.navercorp.pinpoint.collector.receiver.DispatchHandler;
2222
import com.navercorp.pinpoint.collector.rpc.handler.AgentLifeCycleHandler;
2323
import com.navercorp.pinpoint.collector.service.AgentEventService;
24-
import com.navercorp.pinpoint.common.server.util.AgentEventType;
25-
import com.navercorp.pinpoint.common.server.util.AgentLifeCycleState;
2624
import com.navercorp.pinpoint.common.util.Assert;
27-
import com.navercorp.pinpoint.rpc.PinpointSocket;
28-
import com.navercorp.pinpoint.rpc.packet.HandshakePropertyType;
29-
import com.navercorp.pinpoint.rpc.packet.HandshakeResponseCode;
30-
import com.navercorp.pinpoint.rpc.packet.HandshakeResponseType;
31-
import com.navercorp.pinpoint.rpc.packet.PingPayloadPacket;
32-
import com.navercorp.pinpoint.rpc.packet.RequestPacket;
33-
import com.navercorp.pinpoint.rpc.packet.SendPacket;
34-
import com.navercorp.pinpoint.rpc.server.PinpointServer;
3525
import com.navercorp.pinpoint.rpc.server.PinpointServerAcceptor;
36-
import com.navercorp.pinpoint.rpc.server.ServerMessageListener;
3726
import com.navercorp.pinpoint.rpc.server.handler.ServerStateChangeEventHandler;
38-
import com.navercorp.pinpoint.rpc.util.MapUtils;
3927
import org.slf4j.Logger;
4028
import org.slf4j.LoggerFactory;
4129

@@ -44,7 +32,6 @@
4432
import javax.annotation.Resource;
4533
import java.util.Collections;
4634
import java.util.List;
47-
import java.util.Map;
4835
import java.util.Objects;
4936
import java.util.concurrent.Executor;
5037

@@ -111,42 +98,7 @@ public void start() {
11198

11299
// take care when attaching message handlers as events are generated from the IO thread.
113100
// pass them to a separate queue and handle them in a different thread.
114-
acceptor.setMessageListener(new ServerMessageListener() {
115-
116-
@Override
117-
public HandshakeResponseCode handleHandshake(Map properties) {
118-
if (properties == null) {
119-
return HandshakeResponseType.ProtocolError.PROTOCOL_ERROR;
120-
}
121-
122-
boolean hasRequiredKeys = HandshakePropertyType.hasRequiredKeys(properties);
123-
if (!hasRequiredKeys) {
124-
return HandshakeResponseType.PropertyError.PROPERTY_ERROR;
125-
}
126-
127-
boolean supportServer = MapUtils.getBoolean(properties, HandshakePropertyType.SUPPORT_SERVER.getName(), true);
128-
if (supportServer) {
129-
return HandshakeResponseType.Success.DUPLEX_COMMUNICATION;
130-
} else {
131-
return HandshakeResponseType.Success.SIMPLEX_COMMUNICATION;
132-
}
133-
}
134-
135-
@Override
136-
public void handleSend(SendPacket sendPacket, PinpointSocket pinpointSocket) {
137-
receive(sendPacket, pinpointSocket);
138-
}
139-
140-
@Override
141-
public void handleRequest(RequestPacket requestPacket, PinpointSocket pinpointSocket) {
142-
requestResponse(requestPacket, pinpointSocket);
143-
}
144-
145-
@Override
146-
public void handlePing(PingPayloadPacket pingPacket, PinpointServer pinpointServer) {
147-
recordPing(pingPacket, pinpointServer);
148-
}
149-
});
101+
acceptor.setMessageListenerFactory(new AgentBaseDataReceiverServerMessageListenerFactory(executor, tcpPacketHandler, agentEventService, agentLifeCycleHandler));
150102
acceptor.bind(configuration.getBindIp(), configuration.getBindPort());
151103

152104
if (logger.isInfoEnabled()) {
@@ -164,37 +116,6 @@ private void prepare(PinpointServerAcceptor acceptor) {
164116
}
165117
}
166118

167-
private void receive(SendPacket sendPacket, PinpointSocket pinpointSocket) {
168-
executor.execute(new Runnable() {
169-
@Override
170-
public void run() {
171-
tcpPacketHandler.handleSend(sendPacket, pinpointSocket);
172-
}
173-
});
174-
}
175-
176-
private void requestResponse(RequestPacket requestPacket, PinpointSocket pinpointSocket) {
177-
executor.execute(new Runnable() {
178-
@Override
179-
public void run() {
180-
tcpPacketHandler.handleRequest(requestPacket, pinpointSocket);
181-
}
182-
});
183-
}
184-
185-
private void recordPing(PingPayloadPacket pingPacket, PinpointServer pinpointServer) {
186-
final int eventCounter = pingPacket.getPingId();
187-
long pingTimestamp = System.currentTimeMillis();
188-
try {
189-
if (!(eventCounter < 0)) {
190-
agentLifeCycleHandler.handleLifeCycleEvent(pinpointServer, pingTimestamp, AgentLifeCycleState.RUNNING, eventCounter);
191-
}
192-
agentEventService.handleEvent(pinpointServer, pingTimestamp, AgentEventType.AGENT_PING);
193-
} catch (Exception e) {
194-
logger.warn("Error handling ping event", e);
195-
}
196-
}
197-
198119
@PreDestroy
199120
public void stop() {
200121
if (logger.isInfoEnabled()) {
@@ -210,6 +131,4 @@ public void stop() {
210131
}
211132
}
212133

213-
214-
215134
}

0 commit comments

Comments
 (0)