Skip to content

Commit

Permalink
update report migrate result to noc (ctripcorp#712)
Browse files Browse the repository at this point in the history
* update report migrate result to noc

* get all one way cluster nums

* add todo to consider az group after hetero online
  • Loading branch information
songyuyuyu authored Jul 20, 2023
1 parent ab43803 commit ba3b487
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ public interface ConsoleConfig extends CoreConfig, CheckerConfig, AlertConfig {

boolean isMigrationProcessReportOpen();

String getBreakDownDc();

String getKeyMigrationProcessReportUrl();

long getMigrationProcessReportIntervalMill();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class DefaultConsoleConfig extends AbstractCoreConfig implements ConsoleC
private static final String KEY_MIGRATION_PROCESS_REPORT_INTERVAL_MILLI = "migration.process.report.interval.milli";
private static final String KEY_MIGRATION_PROCESS_REPORT_OPEN = "migration.process.report.open";
private static final String KEY_MIGRATION_PROCESS_REPORT_URL = "migration.process.report.url";
private static final String KEY_MIGRATION_BREAK_DOWN_DC= "migration.break.down.dc";

private String defaultRouteChooseStrategyType = RouteChooseStrategyFactory.RouteStrategyType.CRC32_HASH.name();

Expand Down Expand Up @@ -714,6 +715,11 @@ public String getKeyMigrationProcessReportUrl() {
return getProperty(KEY_MIGRATION_PROCESS_REPORT_URL, "127.0.0.1:8080");
}

@Override
public String getBreakDownDc() {
return getProperty(KEY_MIGRATION_BREAK_DOWN_DC, "jq");
}

@Override
public long getMigrationProcessReportIntervalMill() {
return getLongProperty(KEY_MIGRATION_PROCESS_REPORT_INTERVAL_MILLI, 10000L);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
package com.ctrip.xpipe.redis.console.reporter;

import com.ctrip.xpipe.api.monitor.EventMonitor;
import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.redis.checker.alert.ALERT_TYPE;
import com.ctrip.xpipe.redis.console.AbstractCrossDcIntervalAction;
import com.ctrip.xpipe.redis.console.controller.api.migrate.meta.MigrationProgress;
import com.ctrip.xpipe.redis.console.service.migration.MigrationService;
import com.ctrip.xpipe.redis.console.AbstractSiteLeaderIntervalAction;
import com.ctrip.xpipe.redis.console.service.ClusterService;
import com.ctrip.xpipe.redis.console.service.DcService;
import com.ctrip.xpipe.spring.AbstractProfile;
import com.ctrip.xpipe.utils.DateTimeUtils;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@Profile(AbstractProfile.PROFILE_NAME_PRODUCTION)
public class DefaultMigrationProcessReporter extends AbstractCrossDcIntervalAction implements MigrationReporter{
public class DefaultMigrationProcessReporter extends AbstractSiteLeaderIntervalAction implements MigrationReporter{

@Autowired
private MigrationService migrationService;
private ClusterService clusterService;

@Autowired
private DcService dcService;

private static final int DEFAULT_HOURS = 1;

Expand All @@ -33,31 +38,35 @@ public class DefaultMigrationProcessReporter extends AbstractCrossDcIntervalActi

private DefaultHttpService httpService = new DefaultHttpService();

private long totalClusters = 0;

@Override
protected void doAction() {
EventMonitor.DEFAULT.logEvent(REPORT_EVENT, "begin");
MigrationProcessReportModel model = new MigrationProcessReportModel();
MigrationProgress migrationProgress = migrationService.buildMigrationProgress(DEFAULT_HOURS);
if (migrationProgress.getTotal() > 0) {
if (migrationProgress.getSuccess() > migrationProgress.getTotal()) {
EventMonitor.DEFAULT.logEvent(REPORT_EVENT, "wrong migrationProgress");
logger.warn("[DefaultMigrationReporter] build migration progress fail: success {} is greater than total {}",
migrationProgress.getSuccess(), migrationProgress.getTotal());
return;
}
model.setObjectCount(migrationProgress.getTotal())
.setProcess((100 * migrationProgress.getSuccess() / migrationProgress.getTotal()));
} else {
model.setProcess(0).setObjectCount(0);

// TODO AzGroup need to be considered after hetero cluster type online
Long nonMigrateClustersNum = clusterService.getCountByActiveDcAndClusterType(dcService.find(consoleConfig.getBreakDownDc()).getId(), ClusterType.ONE_WAY.name());
if (totalClusters == 0 || nonMigrateClustersNum > totalClusters) {
totalClusters = nonMigrateClustersNum;
}

model.setObjectCount((int)totalClusters).setProcess((int)((100 * (totalClusters - nonMigrateClustersNum) / totalClusters)));

model.setService(DEFAULT_SERVICE).setTimestamp(DateTimeUtils.currentTimeAsString(DEFAULT_TIME_FORMAT)).setOperator(DEFAULT_OPERATOR);
httpService.getRestTemplate().postForEntity(consoleConfig.getKeyMigrationProcessReportUrl(), model, MigrationProcessReportResponseModel.class);
logger.info("[DefaultMigrationReporter] send migration report model: {},migration clusters:{}", model, totalClusters - nonMigrateClustersNum);

ResponseEntity<MigrationProcessReportResponseModel> responseEntity
= httpService.getRestTemplate().postForEntity(consoleConfig.getKeyMigrationProcessReportUrl(), model, MigrationProcessReportResponseModel.class);
if (responseEntity != null && responseEntity.getBody() != null && responseEntity.getBody().getCode() != 200) {
logger.warn("[DefaultMigrationReporter] send migration report fail! migration model: {}, result:{}", model, responseEntity.getBody());
}
}

@Override
protected boolean shouldDoAction() {
logger.debug("[DefaultMigrationReporter]get switch {}", consoleConfig.isMigrationProcessReportOpen());
if (!consoleConfig.isMigrationProcessReportOpen()) totalClusters = 0;
return consoleConfig.isMigrationProcessReportOpen() && super.shouldDoAction();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public interface ClusterService {
Long getCountByActiveDc(long activeDcId);
Map<String, Long> getAllCountByActiveDc();
Map<String, Long> getMigratableClustersCountByActiveDc();
Long getAllCount();
Long getCountByActiveDcAndClusterType(long activeDc, String clusterType);
Long getAllCount();
ClusterTbl createCluster(ClusterModel clusterModel);
void updateCluster(String clusterName, ClusterModel cluster);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ public Long doQuery() throws DalException {
});
}

@Override
public Long getCountByActiveDcAndClusterType(long activeDc, String clusterType) {
return queryHandler.handleQuery(new DalQuery<Long>() {
@Override
public Long doQuery() throws DalException {
return dao.countByActiveDcAndClusterType(activeDc, clusterType, ClusterTblEntity.READSET_COUNT).getCount();
}
});
}

@Override
public Long getAllCount() {
return queryHandler.handleQuery(new DalQuery<Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,19 @@
]]>
</statement>
</query>
<query name="count-by-active-dc-and-cluster-type" type="SELECT" multiple="false">
<param name='activedc-id'/>
<param name="cluster-type" />
<statement>
<![CDATA[
SELECT <FIELDS/>
FROM <TABLE/>
WHERE <FIELD name='activedc-id'/> = ${activedc-id}
AND <FIELD name='cluster-type' /> = ${cluster-type}
AND <FIELD name='deleted'/> = 0
]]>
</statement>
</query>
<query name="atomic-set-status" type="UPDATE" batch='false'>
<param name="id" />
<param name="origin-status" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

import com.ctrip.xpipe.api.cluster.CrossDcClusterServer;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.controller.api.migrate.meta.MigrationProgress;
import com.ctrip.xpipe.redis.console.service.migration.MigrationService;
import com.ctrip.xpipe.redis.console.model.DcTbl;
import com.ctrip.xpipe.redis.console.service.ClusterService;
import com.ctrip.xpipe.redis.console.service.DcService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.*;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestOperations;


Expand All @@ -20,7 +23,10 @@ public class DefaultMigrationProcessReporterTest {
DefaultMigrationProcessReporter migrationReporter;

@Mock
MigrationService migrationService;
ClusterService clusterService;

@Mock
DcService dcService;

@Captor
ArgumentCaptor<MigrationProcessReportModel> migrationProcessReportModelArgumentCaptor;
Expand All @@ -44,27 +50,42 @@ public class DefaultMigrationProcessReporterTest {
@Before
public void before() {
Mockito.when(consoleConfig.getKeyMigrationProcessReportUrl()).thenReturn("127.0.0.1:8080");
Mockito.when(consoleConfig.getBreakDownDc()).thenReturn("jq");
Mockito.when(httpService.getRestTemplate()).thenReturn(restTemplate);
Mockito.when(restTemplate.postForEntity(Mockito.anyString(),
migrationProcessReportModelArgumentCaptor.capture(), Mockito.eq(MigrationProcessReportResponseModel.class)))
.thenReturn(null);
.thenReturn(new ResponseEntity<MigrationProcessReportResponseModel>(new MigrationProcessReportResponseModel().setCode(200), HttpStatus.OK));
Mockito.when(dcService.find(Mockito.anyString())).thenReturn(new DcTbl());
}

@Test
public void testReportSuccess() {
MigrationProgress migrationProgress = new MigrationProgress();
migrationProgress.setSuccess(11);
migrationProgress.setTotal(20);

Mockito.when(migrationService.buildMigrationProgress(1)).thenReturn(migrationProgress);

Mockito.when(clusterService.getCountByActiveDcAndClusterType(Mockito.anyLong(), Mockito.anyString())).thenReturn(1000L);
migrationReporter.doAction();
Mockito.verify(restTemplate, Mockito.times(1))
.postForEntity(Mockito.anyString(),
migrationProcessReportModelArgumentCaptor.capture(), Mockito.eq(MigrationProcessReportResponseModel.class));
MigrationProcessReportModel value = migrationProcessReportModelArgumentCaptor.getValue();
Assert.assertEquals(55, value.getProcess());
Assert.assertEquals(20, value.getObjectCount());
Assert.assertEquals(0, value.getProcess());
Assert.assertEquals(1000, value.getObjectCount());
Assert.assertEquals("redis", value.getService());

Mockito.when(clusterService.getCountByActiveDcAndClusterType(Mockito.anyLong(), Mockito.anyString())).thenReturn(1001L);
migrationReporter.doAction();
Mockito.verify(restTemplate, Mockito.times(2))
.postForEntity(Mockito.anyString(),
migrationProcessReportModelArgumentCaptor.capture(), Mockito.eq(MigrationProcessReportResponseModel.class));
value = migrationProcessReportModelArgumentCaptor.getValue();
Assert.assertEquals(0, value.getProcess());
Assert.assertEquals(1001, value.getObjectCount());

Mockito.when(clusterService.getCountByActiveDcAndClusterType(Mockito.anyLong(), Mockito.anyString())).thenReturn(400L);
migrationReporter.doAction();
Mockito.verify(restTemplate, Mockito.times(3))
.postForEntity(Mockito.anyString(),
migrationProcessReportModelArgumentCaptor.capture(), Mockito.eq(MigrationProcessReportResponseModel.class));
value = migrationProcessReportModelArgumentCaptor.getValue();
Assert.assertEquals(60, value.getProcess());
Assert.assertEquals(1001, value.getObjectCount());
}
}

0 comments on commit ba3b487

Please sign in to comment.