Skip to content

Commit

Permalink
merge develop
Browse files Browse the repository at this point in the history
  • Loading branch information
qingliang.ql committed Jan 11, 2019
2 parents eb2c33a + 21b7f96 commit 55c8f03
Show file tree
Hide file tree
Showing 50 changed files with 3,777 additions and 3,437 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class Constants {

public static final String WORD_SEPARATOR = Character.toString((char)2);

public static final String LONGPULLING_LINE_SEPARATOR = "\r\n";
public static final String LONGPOLLING_LINE_SEPARATOR = "\r\n";

public static final String CLIENT_APPNAME_HEADER = "Client-AppName";
public static final String CLIENT_REQUEST_TS_HEADER = "Client-RequestTS";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class NacosConfigService implements ConfigService {
*/
private HttpAgent agent;
/**
* longpulling
* longpolling
*/
private ClientWorker worker;
private String namespace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import static com.alibaba.nacos.api.common.Constants.WORD_SEPARATOR;

/**
* Longpulling
* Longpolling
*
* @author Nacos
*/
Expand Down Expand Up @@ -300,7 +300,7 @@ public void checkConfigInfo() {
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
executorService.execute(new LongPullingRunnable(i));
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
Expand Down Expand Up @@ -435,7 +435,7 @@ public Thread newThread(Runnable r) {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPulling" + agent.getName());
t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName());
t.setDaemon(true);
return t;
}
Expand All @@ -452,10 +452,10 @@ public void run() {
}, 1L, 10L, TimeUnit.MILLISECONDS);
}

class LongPullingRunnable implements Runnable {
class LongPollingRunnable implements Runnable {
private int taskId;

public LongPullingRunnable(int taskId) {
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}

Expand Down Expand Up @@ -510,7 +510,7 @@ public void run() {
}
inInitializingCacheList.clear();
} catch (Throwable e) {
log.error("500", "longPulling error", e);
log.error("500", "longPolling error", e);
} finally {
executorService.execute(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class HostReactor {

private Map<String, Object> updatingMap;

private PushRecver pushRecver;
private PushReceiver pushReceiver;

private EventDispatcher eventDispatcher;

Expand All @@ -69,7 +69,7 @@ public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, Str

this.updatingMap = new ConcurrentHashMap<String, Object>();
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushRecver = new PushRecver(this);
this.pushReceiver = new PushReceiver(this);
}

private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
Expand Down Expand Up @@ -287,7 +287,7 @@ public void updateService4AllIPNow(String serviceName, String clusters, String e
Map<String, String> params = new HashMap<String, String>(8);
params.put("dom", serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(pushRecver.getUDPPort()));
params.put("udpPort", String.valueOf(pushReceiver.getUDPPort()));

ServiceInfo oldService = getSerivceInfo0(serviceName, clusters, env, true);
if (oldService != null) {
Expand Down Expand Up @@ -319,7 +319,7 @@ public void updateServiceNow(String serviceName, String clusters, String env) {
Map<String, String> params = new HashMap<String, String>(8);
params.put("dom", serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(pushRecver.getUDPPort()));
params.put("udpPort", String.valueOf(pushReceiver.getUDPPort()));
params.put("env", env);
params.put("clientIP", NetUtils.localIP());

Expand Down Expand Up @@ -361,7 +361,7 @@ public void refreshOnly(String serviceName, String clusters, String env, boolean
Map<String, String> params = new HashMap<String, String>(16);
params.put("dom", serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(pushRecver.getUDPPort()));
params.put("udpPort", String.valueOf(pushReceiver.getUDPPort()));
params.put("unit", env);
params.put("clientIP", NetUtils.localIP());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/**
* @author xuanyin
*/
public class PushRecver implements Runnable {
public class PushReceiver implements Runnable {

private ScheduledExecutorService executorService;

Expand All @@ -40,7 +40,7 @@ public class PushRecver implements Runnable {

private HostReactor hostReactor;

public PushRecver(HostReactor hostReactor) {
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
udpSocket = new DatagramSocket();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class ConfigServletInner {

private static final int TRY_GET_LOCK_TIMES = 9;

private static final int START_LONGPULLING_VERSION_NUM = 204;
private static final int START_LONGPOLLING_VERSION_NUM = 204;

/**
* 轮询接口
Expand All @@ -70,8 +70,8 @@ public String doPollingConfig(HttpServletRequest request, HttpServletResponse re
throws IOException, ServletException {

// 长轮询
if (LongPollingService.isSupportLongPulling(request)) {
longPollingService.addLongPullingClient(request, response, clientMd5Map, probeRequestSize);
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}

Expand All @@ -91,7 +91,7 @@ public String doPollingConfig(HttpServletRequest request, HttpServletResponse re
/**
* 2.0.4版本以前, 返回值放入header中
*/
if (versionNum < START_LONGPULLING_VERSION_NUM) {
if (versionNum < START_LONGPOLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.Map;

/**
* Config longpulling
* Config longpolling
*
* @author Nacos
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public boolean isClientLongPolling(String clientIp) {
}

public Map<String, String> getClientSubConfigInfo(String clientIp) {
ClientLongPulling record = getClientPollingRecord(clientIp);
ClientLongPolling record = getClientPollingRecord(clientIp);

if (record == null) {
return Collections.<String, String>emptyMap();
Expand All @@ -80,9 +80,9 @@ public SampleResult getSubscribleInfo(String dataId, String group, String tenant
SampleResult sampleResult = new SampleResult();
Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50);

for (ClientLongPulling clientLongPulling : allSubs) {
if (clientLongPulling.clientMd5Map.containsKey(groupKey)) {
lisentersGroupkeyStatus.put(clientLongPulling.ip, clientLongPulling.clientMd5Map.get(groupKey));
for (ClientLongPolling clientLongPolling : allSubs) {
if (clientLongPolling.clientMd5Map.containsKey(groupKey)) {
lisentersGroupkeyStatus.put(clientLongPolling.ip, clientLongPolling.clientMd5Map.get(groupKey));
}
}
sampleResult.setLisentersGroupkeyStatus(lisentersGroupkeyStatus);
Expand All @@ -93,11 +93,11 @@ public SampleResult getSubscribleInfoByIp(String clientIp) {
SampleResult sampleResult = new SampleResult();
Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50);

for (ClientLongPulling clientLongPulling : allSubs) {
if (clientLongPulling.ip.equals(clientIp)) {
for (ClientLongPolling clientLongPolling : allSubs) {
if (clientLongPolling.ip.equals(clientIp)) {
// 一个ip可能有多个监听
if (!lisentersGroupkeyStatus.equals(clientLongPulling.clientMd5Map)) {
lisentersGroupkeyStatus.putAll(clientLongPulling.clientMd5Map);
if (!lisentersGroupkeyStatus.equals(clientLongPolling.clientMd5Map)) {
lisentersGroupkeyStatus.putAll(clientLongPolling.clientMd5Map);
}
}
}
Expand Down Expand Up @@ -129,18 +129,18 @@ public Map<String, Set<String>> collectApplicationSubscribeConfigInfos() {
return null;
}
HashMap<String, Set<String>> app2Groupkeys = new HashMap<String, Set<String>>(50);
for (ClientLongPulling clientLongPulling : allSubs) {
if (StringUtils.isEmpty(clientLongPulling.appName) || "unknown".equalsIgnoreCase(
clientLongPulling.appName)) {
for (ClientLongPolling clientLongPolling : allSubs) {
if (StringUtils.isEmpty(clientLongPolling.appName) || "unknown".equalsIgnoreCase(
clientLongPolling.appName)) {
continue;
}
Set<String> appSubscribeConfigs = app2Groupkeys.get(clientLongPulling.appName);
Set<String> clientSubscribeConfigs = clientLongPulling.clientMd5Map.keySet();
Set<String> appSubscribeConfigs = app2Groupkeys.get(clientLongPolling.appName);
Set<String> clientSubscribeConfigs = clientLongPolling.clientMd5Map.keySet();
if (appSubscribeConfigs == null) {
appSubscribeConfigs = new HashSet<String>(clientSubscribeConfigs.size());
}
appSubscribeConfigs.addAll(clientSubscribeConfigs);
app2Groupkeys.put(clientLongPulling.appName, appSubscribeConfigs);
app2Groupkeys.put(clientLongPolling.appName, appSubscribeConfigs);
}

return app2Groupkeys;
Expand Down Expand Up @@ -188,27 +188,27 @@ public SampleResult getCollectSubscribleInfoByIp(String ip) {
return sampleResult;
}

private ClientLongPulling getClientPollingRecord(String clientIp) {
private ClientLongPolling getClientPollingRecord(String clientIp) {
if (allSubs == null) {
return null;
}

for (ClientLongPulling clientLongPulling : allSubs) {
HttpServletRequest request = (HttpServletRequest)clientLongPulling.asyncContext.getRequest();
for (ClientLongPolling clientLongPolling : allSubs) {
HttpServletRequest request = (HttpServletRequest) clientLongPolling.asyncContext.getRequest();

if (clientIp.equals(RequestUtil.getRemoteIp(request))) {
return clientLongPulling;
return clientLongPolling;
}
}

return null;
}

public void addLongPullingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {

String str = req.getHeader(LongPollingService.LONG_PULLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_PULLING_NO_HANG_UP_HEADER);
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
Expand Down Expand Up @@ -242,7 +242,7 @@ public void addLongPullingClient(HttpServletRequest req, HttpServletResponse rsp
asyncContext.setTimeout(0L);

scheduler.execute(
new ClientLongPulling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}

@Override
Expand All @@ -264,19 +264,20 @@ public void onEvent(Event event) {
}
}

static public boolean isSupportLongPulling(HttpServletRequest req) {
return null != req.getHeader(LONG_PULLING_HEADER);
static public boolean isSupportLongPolling(HttpServletRequest req) {
return null != req.getHeader(LONG_POLLING_HEADER);
}

@SuppressWarnings("PMD.ThreadPoolCreationRule")
public LongPollingService() {
allSubs = new ConcurrentLinkedQueue<ClientLongPulling>();
allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();

scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.LongPulling");
t.setName("com.alibaba.nacos.LongPolling");
return t;
}
});
Expand All @@ -285,15 +286,15 @@ public Thread newThread(Runnable r) {

// =================

static public final String LONG_PULLING_HEADER = "Long-Pulling-Timeout";
static public final String LONG_PULLING_NO_HANG_UP_HEADER = "Long-Pulling-Timeout-No-Hangup";
static public final String LONG_POLLING_HEADER = "Long-Pulling-Timeout";
static public final String LONG_POLLING_NO_HANG_UP_HEADER = "Long-Pulling-Timeout-No-Hangup";

final ScheduledExecutorService scheduler;

/**
* 长轮询订阅关系
*/
final Queue<ClientLongPulling> allSubs;
final Queue<ClientLongPolling> allSubs;

// =================

Expand All @@ -302,8 +303,8 @@ class DataChangeTask implements Runnable {
public void run() {
try {
ConfigService.getContentBetaMd5(groupKey);
for (Iterator<ClientLongPulling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPulling clientSub = iter.next();
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// 如果beta发布且不在beta列表直接跳过
if (isBeta && !betaIps.contains(clientSub.ip)) {
Expand Down Expand Up @@ -365,18 +366,18 @@ public void run() {

// =================

class ClientLongPulling implements Runnable {
class ClientLongPolling implements Runnable {

@Override
public void run() {
asyncTimeoutFuture = scheduler.schedule(new Runnable() {
public void run() {
try {
getRetainIps().put(ClientLongPulling.this.ip, System.currentTimeMillis());
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
/**
* 删除订阅关系
*/
allSubs.remove(ClientLongPulling.this);
allSubs.remove(ClientLongPolling.this);

if (isFixedPolling()) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
Expand All @@ -401,7 +402,7 @@ public void run() {
sendResponse(null);
}
} catch (Throwable t) {
LogUtil.defaultLog.error("long pulling error:" + t.getMessage(), t.getCause());
LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
}

}
Expand Down Expand Up @@ -447,7 +448,7 @@ void generateResponse(List<String> changedGroups) {
}
}

ClientLongPulling(AsyncContext ac, Map<String, String> clientMd5Map, String ip, int probeRequestSize,
ClientLongPolling(AsyncContext ac, Map<String, String> clientMd5Map, String ip, int probeRequestSize,
long timeoutTime, String appName, String tag) {
this.asyncContext = ac;
this.clientMd5Map = clientMd5Map;
Expand Down
Loading

0 comments on commit 55c8f03

Please sign in to comment.