Skip to content

Commit

Permalink
[SRE API] query cluster migration
Browse files Browse the repository at this point in the history
  • Loading branch information
lishanglin committed Mar 19, 2024
1 parent b3a26f9 commit 7e547d8
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import com.ctrip.xpipe.api.migration.DcMapper;
import com.ctrip.xpipe.redis.checker.controller.result.RetMessage;
import com.ctrip.xpipe.redis.console.cache.DcCache;
import com.ctrip.xpipe.redis.console.controller.AbstractConsoleController;
import com.ctrip.xpipe.redis.console.controller.api.migrate.meta.*;
import com.ctrip.xpipe.redis.console.migration.model.MigrationCluster;
import com.ctrip.xpipe.redis.console.migration.model.MigrationEvent;
import com.ctrip.xpipe.redis.console.migration.status.MigrationStatus;
import com.ctrip.xpipe.redis.console.model.MigrationClusterTbl;
import com.ctrip.xpipe.redis.console.service.migration.MigrationService;
import com.ctrip.xpipe.redis.console.service.migration.exception.*;
import com.ctrip.xpipe.redis.console.service.migration.impl.MigrationRequest;
Expand All @@ -17,9 +19,8 @@
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;

/**
* @author wenchao.meng
Expand All @@ -35,6 +36,9 @@ public class MigrationApi extends AbstractConsoleController {
@Autowired
private MigrationService migrationService;

@Autowired
private DcCache dcCache;

@RequestMapping(value = "/checkandprepare", method = RequestMethod.POST, produces = {MediaType.APPLICATION_JSON_UTF8_VALUE})
public CheckPrepareResponse checkAndPrepare(@RequestBody(required = true) CheckPrepareRequest checkMeta) {

Expand Down Expand Up @@ -181,6 +185,27 @@ public RollbackResponse rollback(@RequestBody(required = true) RollbackRequest r
return rollbackResponse;
}

@GetMapping(value = "history")
public Map<String, List<ClusterMigrationStatus>> getClusterMigrationHistory(@RequestBody MigrationHistoryReq req) {
logger.info("[history][{}-{}] {}", req.from, req.to, req.clusters);
long current = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
if (req.from < 0 || req.from >= current) return Collections.emptyMap();
if (null == req.clusters || req.clusters.isEmpty()) return Collections.emptyMap();
if (req.to < req.from) req.to = current;

List<MigrationClusterTbl> migrationClusterTbls = migrationService.fetchMigrationClusters(req.clusters,
TimeUnit.SECONDS.toMillis(req.from), TimeUnit.SECONDS.toMillis(req.to));
Map<String, List<ClusterMigrationStatus>> resp = new HashMap<>();
migrationClusterTbls.forEach(migrationClusterTbl -> {
String clusterName = migrationClusterTbl.getCluster().getClusterName();
if (!resp.containsKey(clusterName)) resp.put(clusterName, new ArrayList<>());
ClusterMigrationStatus clusterMigrationStatus = ClusterMigrationStatus.from(migrationClusterTbl, dcCache);
resp.get(clusterName).add(clusterMigrationStatus);
});

return resp;
}

@RequestMapping(value = "/migration/system/health/status", method = RequestMethod.GET)
public RetMessage getMigrationSystemHealthStatus() {
logger.info("[getMigrationSystemHealthStatus]");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.ctrip.xpipe.redis.console.controller.api.migrate.meta;

import com.ctrip.xpipe.redis.console.cache.DcCache;
import com.ctrip.xpipe.redis.console.migration.status.MigrationStatus;
import com.ctrip.xpipe.redis.console.model.DcTbl;
import com.ctrip.xpipe.redis.console.model.MigrationClusterTbl;

import java.util.concurrent.TimeUnit;

/**
* @author lishanglin
* date 2024/3/18
*/
public class ClusterMigrationStatus {

public Long startAt;

public Long endAt;

public String sourceDc;

public String destDc;

public String status;

public static ClusterMigrationStatus from(MigrationClusterTbl migrationClusterTbl, DcCache dcCache) {
ClusterMigrationStatus migrationStatus = new ClusterMigrationStatus();
migrationStatus.startAt = TimeUnit.MILLISECONDS.toSeconds(migrationClusterTbl.getStartTime().getTime());
if (null != migrationClusterTbl.getEndTime()) {
migrationStatus.endAt = TimeUnit.MILLISECONDS.toSeconds(migrationClusterTbl.getEndTime().getTime());
} else {
migrationStatus.endAt = null;
}

DcTbl srcDcTbl = dcCache.find(migrationClusterTbl.getSourceDcId());
DcTbl destDcTbl = dcCache.find(migrationClusterTbl.getDestinationDcId());
if (null != srcDcTbl) migrationStatus.sourceDc = srcDcTbl.getDcName();
if (null != destDcTbl) migrationStatus.destDc = destDcTbl.getDcName();
migrationStatus.status = MigrationStatus.valueOf(migrationClusterTbl.getStatus()).getType();

return migrationStatus;
}

@Override
public String toString() {
return "ClusterMigrationStatus{" +
"startAt=" + startAt +
", endAt=" + endAt +
", sourceDc='" + sourceDc + '\'' +
", destDc='" + destDc + '\'' +
", status='" + status + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.ctrip.xpipe.redis.console.controller.api.migrate.meta;

import java.util.Set;

/**
* @author lishanglin
* date 2024/3/18
*/
public class MigrationHistoryReq {

public long from;

public long to;

public Set<String> clusters;

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.List;
import java.util.Set;

/**
* @author wenchao.meng
Expand Down Expand Up @@ -262,5 +263,15 @@ public List<MigrationClusterTbl> doQuery() throws DalException {
});
}

public List<MigrationClusterTbl> findMigrationClustersByNameAndTime(Set<String> clusters, Date from, Date to) {
return queryHandler.handleQuery(new DalQuery<List<MigrationClusterTbl>>() {
@Override
public List<MigrationClusterTbl> doQuery() throws DalException {
return migrationClusterTblDao.findMigrationClustersByNameAndTime(clusters, from, to,
MigrationClusterTblEntity.READSET_MIGRATION_CLUSTER_STATUS);
}
});
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,6 @@ public interface MigrationService {
Set<String> getLatestMigrationOperators(int hours);

List<MigrationClusterTbl> getLatestMigrationClusters(int seconds);

List<MigrationClusterTbl> fetchMigrationClusters(Set<String> clusters, long from, long to);
}
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,11 @@ public List<MigrationClusterTbl> getLatestMigrationClusters(int seconds) {
return migrationClusterDao.findLatestMigrationClusterWithClusterName(DateTimeUtils.getSecondBeforeDate(new Date(), seconds));
}

@Override
public List<MigrationClusterTbl> fetchMigrationClusters(Set<String> clusters, long from, long to) {
return migrationClusterDao.findMigrationClustersByNameAndTime(clusters, new Date(from), new Date(to));
}

private String clusterRelatedDcToString(List<DcTbl> clusterRelatedDc) {
return StringUtil.join(",", (dcTbl) -> dcTbl.getDcName() , clusterRelatedDc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1925,6 +1925,9 @@
<var name="offset" value-type="long" />
<var name="operator" value-type="String" />
<var name="event-ids" value-type="java.util.List&lt;Long&gt;" />
<var name="from" value-type="Date" />
<var name="to" value-type="Date" />
<var name="clusters" value-type="java.util.Set&lt;String&gt;" />

<member name="count" value-type="long" select-expr="COUNT(*) count" all="false"/>
<member name="migEventCnt" value-type="long" select-expr=" COUNT(distinct mct.migration_event_id) migEventCnt" all="false" />
Expand Down Expand Up @@ -1961,6 +1964,10 @@
<readset-ref name="NAME_AND_ORG" relation-name="cluster" />
<readset-ref name="FULL" relation-name="migration-event" />
</readset>
<readset name="MIGRATION_CLUSTER_STATUS">
<readset-ref name="FULL"/>
<readset-ref name="NAME" relation-name="cluster" />
</readset>

</readsets>

Expand All @@ -1977,6 +1984,23 @@
</statement>
</query>

<query name="find-migration-clusters-by-name-and-time" type="SELECT" multiple="true">
<param name="clusters"/>
<param name="from"/>
<param name="to"/>
<statement>
<![CDATA[
SELECT <FIELDS/>
FROM <TABLE name='cluster'/>, <TABLE/>
WHERE <FIELD name='deleted'/> = 0 AND ct.deleted = 0
AND <FIELD name='cluster-id'/> = ct.id
AND ct.cluster_name IN <IN>${clusters}</IN>
AND <FIELD name='start-time'/> >= ${from}
AND <FIELD name='start-time'/> <= ${to}
]]>
</statement>
</query>

<query name="find-latest-migration-clusters-with-cluster-name" type="SELECT" multiple="true">
<param name="data-change-last-time"/>
<statement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.ctrip.xpipe.redis.console.controller.api.data.meta.CheckPrepareRequestTest;
import com.ctrip.xpipe.redis.console.controller.api.data.meta.ClusterCreateInfoTest;
import com.ctrip.xpipe.redis.console.controller.api.data.meta.RedisInstanceInfoTest;
import com.ctrip.xpipe.redis.console.controller.api.migrate.MigrationApiIntegrationTest;
import com.ctrip.xpipe.redis.console.controller.api.migrate.MigrationApiTest;
import com.ctrip.xpipe.redis.console.controller.config.ClusterCheckInterceptorTest;
import com.ctrip.xpipe.redis.console.controller.consoleportal.RedisControllerTest;
Expand Down Expand Up @@ -202,6 +203,7 @@
BeaconMetaServiceImplTest.class,
BeaconMigrationServiceImplTest.class,
MigrationApiTest.class,
MigrationApiIntegrationTest.class,
ExclusiveThreadsForMigrationTest.class,
XPipeHandlerMethodCommandTest.class,

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.ctrip.xpipe.redis.console.controller.api.migrate;

import com.ctrip.xpipe.redis.console.AbstractConsoleIntegrationTest;
import com.ctrip.xpipe.redis.console.controller.api.migrate.meta.ClusterMigrationStatus;
import com.ctrip.xpipe.redis.console.controller.api.migrate.meta.MigrationHistoryReq;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* @author lishanglin
* date 2024/3/18
*/
public class MigrationApiIntegrationTest extends AbstractConsoleIntegrationTest {

@Autowired
private MigrationApi migrationApi;

@Test
public void testGetClusterMigrationHistory() {
MigrationHistoryReq req = new MigrationHistoryReq();
req.from = 0;
req.to = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) + 600;
req.clusters = Collections.singleton("cluster2");

Map<String, List<ClusterMigrationStatus>> resp = migrationApi.getClusterMigrationHistory(req);
logger.info("[testGetClusterMigrationHistory] {}", resp);
Assert.assertTrue(resp.containsKey("cluster2"));
Assert.assertEquals(1, resp.get("cluster2").size());
Assert.assertEquals("Processing", resp.get("cluster2").get(0).status);
}

protected String prepareDatas() throws IOException {
return prepareDatasFromFile("src/test/resources/migration-test.sql");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public UserInfoHolder userInfoHolder() {

@Before
public void beforeRestTemplateFactoryTest() throws Exception {
this.port = randomPort();
restOperations = RestTemplateFactory.createCommonsHttpRestTemplate(2, 2, 1200, 60000);

springApplicationStarter = new SpringApplicationStarter(port, 1, SlowController.class, TestDependency.class, MigrationApi4Beacon.class);
Expand Down

0 comments on commit 7e547d8

Please sign in to comment.