Skip to content

Commit

Permalink
add hints in ClusterMeta, send info gtid only when hints has APPLIER_…
Browse files Browse the repository at this point in the history
…IN_CLUSTER
  • Loading branch information
aiyueqi committed Nov 9, 2022
1 parent 4183899 commit 4d3fef0
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 7 deletions.
33 changes: 33 additions & 0 deletions core/src/main/java/com/ctrip/xpipe/cluster/Hints.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.ctrip.xpipe.cluster;

import com.ctrip.xpipe.utils.StringUtil;
import org.apache.commons.lang3.StringUtils;

import java.util.HashSet;
import java.util.Set;

/**
* @author ayq
* <p>
* 2022/11/9 15:38
*/
public enum Hints {

APPLIER_IN_CLUSTER;

public static Hints lookup(String name) {
if (StringUtil.isEmpty(name)) throw new IllegalArgumentException("no Hints for name " + name);
return valueOf(name.toUpperCase());
}

public static Set<Hints> parse(String str) {
HashSet<Hints> result = new HashSet<>();
if (StringUtils.isEmpty(str)) {
return result;
}
for (String s : str.split("\\s*,\\s*")) {
result.add(lookup(s));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.ctrip.xpipe.api.server.Server;
import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.cluster.DcGroupType;
import com.ctrip.xpipe.cluster.Hints;
import com.ctrip.xpipe.command.AbstractCommand;
import com.ctrip.xpipe.command.ParallelCommandChain;
import com.ctrip.xpipe.command.RetryCommandFactory;
Expand Down Expand Up @@ -46,6 +47,8 @@ public class DcMetaBuilder extends AbstractCommand<Map<String, DcMeta>> {

private Map<Long, List<ApplierTbl>> replId2AppliersMap;

private Set<Long> shardIdWithAppliers;

private List<ReplDirectionTbl> replDirectionTblList;

private RedisMetaService redisMetaService;
Expand Down Expand Up @@ -422,7 +425,9 @@ protected void doExecute() throws Exception {
try {
List<ApplierTbl> applierTblList = applierService.findAll();
replId2AppliersMap = new HashMap<>();
shardIdWithAppliers = new HashSet<>();
for (ApplierTbl applierTbl : applierTblList) {
shardIdWithAppliers.add(applierTbl.getShardId());
List<ApplierTbl> applierTbls = MapUtils.getOrCreate(replId2AppliersMap, applierTbl.getReplDirectionId(), ArrayList::new);
applierTbls.add(applierTbl);
}
Expand Down Expand Up @@ -500,13 +505,31 @@ protected void doExecute() throws Exception {
buildHeteroMeta(dcMeta, dcId);
}
}
addClusterHints();

future().setSuccess();
} catch (Exception e) {
future().setFailure(e);
}
}

private void addClusterHints() {
for (DcMeta dcMeta: dcMetaMap.values()) {
for (ClusterMeta clusterMeta : dcMeta.getClusters().values()) {
boolean hasApplier = false;
for (ShardMeta shardMeta : clusterMeta.getAllShards().values()) {
if (shardIdWithAppliers.contains(shardMeta.getDbId())) {
hasApplier = true;
break;
}
}
if (hasApplier) {
clusterMeta.setHints(Hints.APPLIER_IN_CLUSTER.name());
}
}
}
}

private void buildHeteroMeta(DcMeta dcMeta, Long dcId) {

List<ReplDirectionTbl> toCurrentDcOrFromCurrentDcList = replDirectionTblList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
<attribute name="phase" value-type="int" />
<attribute name="lastModifiedTime" value-type="String" />
<attribute name="type" value-type="String" />
<attribute name="hints" value-type="String" />
<attribute name="admin-emails" value-type="String" />
<attribute name="dcs" value-type="String" />
<attribute name="activeRedisCheckRules" value-type="String" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.ctrip.xpipe.api.command.Command;
import com.ctrip.xpipe.api.pool.SimpleObjectPool;
import com.ctrip.xpipe.cluster.Hints;
import com.ctrip.xpipe.endpoint.DefaultEndPoint;
import com.ctrip.xpipe.gtid.GtidSet;
import com.ctrip.xpipe.netty.commands.NettyClient;
Expand Down Expand Up @@ -51,12 +52,15 @@ public DefaultRedisGtidCollector(Long clusterDbId, Long shardDbId, DcMetaCache d

@Override
protected void work() {

collectCurrentDcGtidAndSids();
collectSids();
String hints = dcMetaCache.getClusterMeta(clusterDbId).getHints();
if (Hints.parse(hints).contains(Hints.APPLIER_IN_CLUSTER)) {
collectCurrentDcGtidAndSids();
collectSids();
}
}

private void collectCurrentDcGtidAndSids() {
@VisibleForTesting
protected void collectCurrentDcGtidAndSids() {

/* redis shard */
List<RedisMeta> redises = currentMetaManager.getRedises(clusterDbId, shardDbId);
Expand All @@ -71,7 +75,8 @@ private void collectCurrentDcGtidAndSids() {
}
}

private void collectSids() {
@VisibleForTesting
protected void collectSids() {

if (dcMetaCache.isCurrentShardParentCluster(clusterDbId, shardDbId) ||
dcMetaCache.getShardAppliers(clusterDbId, shardDbId) == null ||
Expand Down Expand Up @@ -125,7 +130,8 @@ private void collectGtidAndSids(RedisMeta redisMeta) {
return;
}
logger.info("[info gtid command], cluster_{}, shard_{}, ip={}, port={} gtidSet={}",
clusterDbId, shardDbId, redisMeta.getIp(), redisMeta.getPort(), gtidSet);
clusterDbId, shardDbId, redisMeta.getIp(), redisMeta.getPort(),
gtidSet.toString().substring(0, Math.min(1000, gtidSet.toString().length())));
String sids = null;
if (!gtidSet.getUUIDs().isEmpty()) {
for(String sid: gtidSet.getUUIDs()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.ctrip.xpipe.redis.meta.server.keeper.keepermaster.impl;

import com.ctrip.xpipe.cluster.Hints;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.core.entity.ClusterMeta;
import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager;
import com.ctrip.xpipe.redis.meta.server.meta.DcMetaCache;
import com.ctrip.xpipe.redis.meta.server.multidc.MultiDcService;
Expand All @@ -12,6 +14,7 @@

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;

/**
* @author ayq
Expand All @@ -20,11 +23,14 @@
*/
public class DefaultRedisGtidCollectorTest {

private DcMetaCache dcMetaCache;

private DefaultRedisGtidCollector defaultRedisGtidCollector;

@Before
public void setUp() throws Exception {
defaultRedisGtidCollector = new DefaultRedisGtidCollector(1L,1L, Mockito.mock(DcMetaCache.class),
dcMetaCache = Mockito.mock(DcMetaCache.class);
defaultRedisGtidCollector = new DefaultRedisGtidCollector(1L,1L, dcMetaCache,
Mockito.mock(CurrentMetaManager.class), Mockito.mock(MultiDcService.class),
Mockito.mock(ScheduledExecutorService.class), Mockito.mock(XpipeNettyClientKeyedObjectPool.class),
DefaultRedisGtidCollector.REDIS_INFO_GTID_INTERVAL_SECONDS_DR_MASTER_GROUP);
Expand All @@ -42,4 +48,29 @@ public void testSrcChanged() {
assertTrue(defaultRedisGtidCollector.sidsChanged("a,b", "a,b,c"));
}

@Test
public void testHintsApplierInCluster() {
ClusterMeta clusterMeta = new ClusterMeta();
clusterMeta.setHints(Hints.APPLIER_IN_CLUSTER.name());
when(dcMetaCache.getClusterMeta(1L)).thenReturn(clusterMeta);

DefaultRedisGtidCollector collector = spy(defaultRedisGtidCollector);

collector.work();
verify(collector,Mockito.times(1)).collectCurrentDcGtidAndSids();
verify(collector,Mockito.times(1)).collectSids();
}

@Test
public void testHintsApplierNotInCluster() {
ClusterMeta clusterMeta = new ClusterMeta();
when(dcMetaCache.getClusterMeta(1L)).thenReturn(clusterMeta);

DefaultRedisGtidCollector collector = spy(defaultRedisGtidCollector);

collector.work();
verify(collector,Mockito.times(0)).collectCurrentDcGtidAndSids();
verify(collector,Mockito.times(0)).collectSids();
}

}

0 comments on commit 4d3fef0

Please sign in to comment.