Skip to content

Commit

Permalink
增加PushCenter消息流转时间线, 方便监控消息的各个生命周期的耗时PushClient -> GatewayClient -> G…
Browse files Browse the repository at this point in the history
…atewayServer -> PushCenter -> ConnServer -> Client
  • Loading branch information
夜色 committed Dec 29, 2016
1 parent e7e6c8c commit d0eb910
Show file tree
Hide file tree
Showing 13 changed files with 340 additions and 149 deletions.
16 changes: 8 additions & 8 deletions mpush-api/src/main/java/com/mpush/api/spi/push/PushListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,47 @@ public interface PushListener<T extends IPushMessage> {
*
* @param message 要下发的消息
*/
void onSuccess(T message);
void onSuccess(T message, Object[] timePoints);

/**
* 收到客户端ACK后回调
*
* @param message 要下发的消息
*/
void onAckSuccess(T message);
void onAckSuccess(T message, Object[] timePoints);

/**
* 广播消息推送全部结束后回调
*
* @param message 要下发的消息
*/
void onBroadcastComplete(T message);
void onBroadcastComplete(T message, Object[] timePoints);

/**
* 消息下发失败后回调
*
* @param message 要下发的消息
*/
void onFailure(T message);
void onFailure(T message, Object[] timePoints);

/**
* 推送消息发现用户不在线时回调
*
* @param message 要下发的消息
*/
void onOffline(T message);
void onOffline(T message, Object[] timePoints);

/**
* 推送消息发现用户不在当前机器时回调
*
* @param message 要下发的消息
*/
void onRedirect(T message);
void onRedirect(T message, Object[] timePoints);

/**
* 等待客户端ACK超时时回调
* 发送消息超时或等待客户端ACK超时时回调
*
* @param message 要下发的消息
*/
void onAckTimeout(T message);
void onTimeout(T message, Object[] timePoints);
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ public void handle(ErrorMessage message) {

Logs.PUSH.warn("receive an error gateway response, message={}", message);
if (message.code == OFFLINE.errorCode) {//用户离线
request.offline();
request.onOffline();
} else if (message.code == PUSH_CLIENT_FAILURE.errorCode) {//下发到客户端失败
request.failure();
request.onFailure();
} else if (message.code == ROUTER_CHANGE.errorCode) {//用户路由信息更改
request.redirect();
request.onRedirect();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.mpush.client.push.PushRequestBus;
import com.mpush.common.handler.BaseMessageHandler;
import com.mpush.common.message.OkMessage;
import com.mpush.common.push.GatewayPushResult;
import com.mpush.tools.log.Logs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,7 +51,7 @@ public void handle(OkMessage message) {
Logs.PUSH.warn("receive a gateway response, but request has timeout. message={}", message);
return;
}
request.success();//推送成功
request.onSuccess(GatewayPushResult.fromJson(message.data));//推送成功
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private FutureTask<PushResult> send0(PushContext ctx) {
} else {
Set<RemoteRouter> remoteRouters = CachedRemoteRouterManager.I.lookupAll(ctx.getUserId());
if (remoteRouters == null || remoteRouters.isEmpty()) {
return PushRequest.build(factory, ctx).offline();
return PushRequest.build(factory, ctx).onOffline();
}
FutureTask<PushResult> task = null;
for (RemoteRouter remoteRouter : remoteRouters) {
Expand Down
52 changes: 33 additions & 19 deletions mpush-client/src/main/java/com/mpush/client/push/PushRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.mpush.api.router.ClientLocation;
import com.mpush.client.gateway.connection.GatewayConnectionFactory;
import com.mpush.common.message.gateway.GatewayPushMessage;
import com.mpush.common.push.GatewayPushResult;
import com.mpush.common.router.CachedRemoteRouterManager;
import com.mpush.common.router.RemoteRouter;
import com.mpush.tools.Jsons;
Expand Down Expand Up @@ -165,22 +166,6 @@ public FutureTask<PushResult> send(RemoteRouter router) {
return this;
}

public void redirect() {
timeLine.addTimePoint("redirect");
LOGGER.warn("user route has changed, userId={}, location={}", userId, location);
CachedRemoteRouterManager.I.invalidateLocalCache(userId);
if (status.get() == Status.init) {//表示任务还没有完成,还可以重新发送
RemoteRouter remoteRouter = CachedRemoteRouterManager.I.lookup(userId, location.getClientType());
send(remoteRouter);
}
}

public FutureTask<PushResult> offline() {
CachedRemoteRouterManager.I.invalidateLocalCache(userId);
submit(Status.offline);
return this;
}

public FutureTask<PushResult> broadcast() {
timeLine.begin();

Expand Down Expand Up @@ -220,20 +205,49 @@ public FutureTask<PushResult> broadcast() {
return this;
}

public void timeout() {
private void offline() {
CachedRemoteRouterManager.I.invalidateLocalCache(userId);
submit(Status.offline);
}

private void timeout() {
if (PushRequestBus.I.getAndRemove(sessionId) != null) {
submit(Status.timeout);
}
}

public void success() {
private void success() {
submit(Status.success);
}

public void failure() {
private void failure() {
submit(Status.failure);
}

public void onFailure() {
failure();
}

public void onRedirect() {
timeLine.addTimePoint("redirect");
LOGGER.warn("user route has changed, userId={}, location={}", userId, location);
CachedRemoteRouterManager.I.invalidateLocalCache(userId);
if (status.get() == Status.init) {//表示任务还没有完成,还可以重新发送
RemoteRouter remoteRouter = CachedRemoteRouterManager.I.lookup(userId, location.getClientType());
send(remoteRouter);
}
}

public FutureTask<PushResult> onOffline() {
offline();
return this;
}

public void onSuccess(GatewayPushResult result) {
if (result != null) timeLine.addTimePoints(result.timePoints);
submit(Status.success);
}

public long getTimeout() {
return timeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -156,7 +157,7 @@ public BaseMessage setRecipient(InetSocketAddress recipient) {
return this;
}

public ExecutorService getExecutor() {
public ScheduledExecutorService getExecutor() {
return connection.getChannel().eventLoop();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* (C) Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* [email protected] (夜色)
*/

package com.mpush.common.push;

import com.mpush.common.message.gateway.GatewayPushMessage;
import com.mpush.tools.Jsons;

/**
* Created by ohun on 2016/12/29.
*
* @author [email protected] (夜色)
*/
public final class GatewayPushResult {
public String userId;
public Integer clientType;
public Object[] timePoints;

public GatewayPushResult(String userId, Integer clientType, Object[] timePoints) {
this.userId = userId;
this.timePoints = timePoints;
if (clientType > 0) this.clientType = clientType;
}

public static String toJson(GatewayPushMessage message, Object[] timePoints) {
return Jsons.toJson(new GatewayPushResult(message.userId, message.clientType, timePoints));
}

public static GatewayPushResult fromJson(String json) {
if (json == null) return null;
return Jsons.fromJson(json, GatewayPushResult.class);
}
}
14 changes: 7 additions & 7 deletions mpush-core/src/main/java/com/mpush/core/mq/MQPushListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,43 +38,43 @@ public MQPushListener() {
}

@Override
public void onSuccess(MQPushMessage message) {
public void onSuccess(MQPushMessage message, Object[] timePoints) {
//publish messageId to mq:[success/queue]
mqClient.publish("/mpush/push/success", message);
}

@Override
public void onAckSuccess(MQPushMessage message) {
public void onAckSuccess(MQPushMessage message, Object[] timePoints) {
//publish messageId to mq:[success/queue]
mqClient.publish("/mpush/push/success", message);
}

@Override
public void onBroadcastComplete(MQPushMessage message) {
public void onBroadcastComplete(MQPushMessage message, Object[] timePoints) {
//publish messageId to mq:[broadcast/finish/queue]
mqClient.publish("/mpush/push/broadcast_finish", message);
}

@Override
public void onFailure(MQPushMessage message) {
public void onFailure(MQPushMessage message, Object[] timePoints) {
//publish messageId to mq:[failure/queue], client can retry
mqClient.publish("/mpush/push/failure", message);
}

@Override
public void onOffline(MQPushMessage message) {
public void onOffline(MQPushMessage message, Object[] timePoints) {
//publish messageId to mq:[offline/queue], client persist offline message to db
mqClient.publish("/mpush/push/offline", message);
}

@Override
public void onRedirect(MQPushMessage message) {
public void onRedirect(MQPushMessage message, Object[] timePoints) {
//publish messageId to mq:[route/change/queue], client should be try again
mqClient.publish("/mpush/push/route_change", message);
}

@Override
public void onAckTimeout(MQPushMessage message) {
public void onTimeout(MQPushMessage message, Object[] timePoints) {
//publish messageId to mq:[ack/timeout/queue], client can retry
mqClient.publish("/mpush/push/ack_timeout", message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.mpush.common.qps.OverFlowException;
import com.mpush.core.router.LocalRouter;
import com.mpush.core.router.RouterCenter;
import com.mpush.tools.common.TimeLine;
import com.mpush.tools.log.Logs;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
Expand Down Expand Up @@ -57,6 +58,8 @@ public final class BroadcastPushTask implements PushTask, ChannelFutureListener

private final Condition condition;

private final TimeLine timeLine = new TimeLine();

//使用Iterator, 记录任务遍历到的位置,因为有流控,一次任务可能会被分批发送,而且还有在推送过程中上/下线的用户
private final Iterator<Map.Entry<String, Map<Integer, LocalRouter>>> iterator;

Expand All @@ -65,6 +68,7 @@ public BroadcastPushTask(IPushMessage message, FlowControl flowControl) {
this.flowControl = flowControl;
this.condition = message.getCondition();
this.iterator = RouterCenter.I.getLocalRouterManager().routers().entrySet().iterator();
this.timeLine.begin("push-center-begin");
}

@Override
Expand Down Expand Up @@ -120,7 +124,7 @@ private boolean broadcast() {

private void report() {
Logs.PUSH.info("[Broadcast] task finished, cost={}, message={}", (System.currentTimeMillis() - begin), message);
PushCenter.I.getPushListener().onBroadcastComplete(message);//通知发送方,广播推送完毕
PushCenter.I.getPushListener().onBroadcastComplete(message, timeLine.end().getTimePoints());//通知发送方,广播推送完毕
}

private boolean checkCondition(Condition condition, Connection connection) {
Expand Down
Loading

0 comments on commit d0eb910

Please sign in to comment.