Skip to content

Commit

Permalink
distribute lock renew
Browse files Browse the repository at this point in the history
  • Loading branch information
dzdx committed May 25, 2022
1 parent 3da834a commit 197bb56
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alipay.sofa.registry.net.NetUtil;
import com.alipay.sofa.registry.util.ConcurrentUtils;
import com.alipay.sofa.registry.util.LoopRunnable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.util.Date;
import java.util.List;
Expand All @@ -45,6 +46,12 @@ public abstract class AbstractLeaderElector implements LeaderElector {

private final LeaderElectorTrigger leaderElectorTrigger = new LeaderElectorTrigger();

private volatile String address;

public AbstractLeaderElector() {
address = NetUtil.getLocalAddress().getHostAddress();
}

@Override
public void registerLeaderAware(LeaderAware leaderAware) {
leaderAwares.add(leaderAware);
Expand All @@ -55,6 +62,11 @@ public void init() {
ConcurrentUtils.createDaemonThread("LeaderElectorTrigger", leaderElectorTrigger).start();
}

@VisibleForTesting
public void setAddress(String address) {
this.address = address;
}

private class LeaderElectorTrigger extends LoopRunnable {

@Override
Expand Down Expand Up @@ -93,7 +105,7 @@ public void elect() {

@Override
public String myself() {
return NetUtil.getLocalAddress().getHostAddress();
return address;
}
/**
* start compete leader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ public class MetaJdbcLeaderElector extends AbstractLeaderElector implements Reco

public static final String lockName = "META-MASTER";

@Autowired private DistributeLockMapper distributeLockMapper;
@Autowired DistributeLockMapper distributeLockMapper;

@Autowired private MetaElectorConfig metaElectorConfig;
@Autowired MetaElectorConfig metaElectorConfig;

@Autowired private DefaultCommonConfig defaultCommonConfig;
@Autowired DefaultCommonConfig defaultCommonConfig;

/**
* start elect, return current leader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
#{duration},
NOW(3),
NOW(3),
1,
0,
0
)
ON DUPLICATE KEY UPDATE lock_name = #{lockName}
Expand All @@ -72,7 +72,7 @@
<update id="ownerHeartbeat" parameterType="com.alipay.sofa.registry.jdbc.domain.DistributeLockDomain">
<![CDATA[
update distribute_lock set owner = #{owner}, gmt_modified = NOW(3), `term_duration` = (`term_duration` + 1)
where data_center = #{dataCenter} and lock_name = #{lockName} and owner = #{owner} and term = #{term} and `term_duration` = #{termDuration} and timestampdiff(SECOND, gmt_modified, NOW()) < #{duration}/1000
where data_center = #{dataCenter} and lock_name = #{lockName} and owner = #{owner}
]]>
</update>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
#{duration},
NOW(3),
NOW(3),
1,
0,
0
)
ON DUPLICATE KEY UPDATE lock_name = #{lockName}
Expand All @@ -72,7 +72,7 @@
<update id="ownerHeartbeat" parameterType="com.alipay.sofa.registry.jdbc.domain.DistributeLockDomain">
<![CDATA[
update /*+ QUERY_TIMEOUT(2000000) */ distribute_lock set owner = #{owner}, gmt_modified = NOW(3), `term_duration` = (`term_duration` + 1)
where data_center = #{dataCenter} and lock_name = #{lockName} and owner = #{owner} and term = #{term} and `term_duration` = #{termDuration} and timestampdiff(SECOND, gmt_modified, NOW()) < #{duration}/1000
where data_center = #{dataCenter} and lock_name = #{lockName} and owner = #{owner}
]]>
</update>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@
package com.alipay.sofa.registry.jdbc.elector;

import com.alipay.sofa.registry.jdbc.AbstractH2DbTestBase;
import com.alipay.sofa.registry.jdbc.config.MetaElectorConfigBean;
import com.alipay.sofa.registry.jdbc.constant.TableEnum;
import com.alipay.sofa.registry.jdbc.domain.DistributeLockDomain;
import com.alipay.sofa.registry.jdbc.mapper.DistributeLockMapper;
import com.alipay.sofa.registry.store.api.config.DefaultCommonConfig;
import com.alipay.sofa.registry.store.api.elector.AbstractLeaderElector;
import com.alipay.sofa.registry.util.ConcurrentUtils;
import com.alipay.sofa.registry.util.LoopRunnable;
import com.alipay.sofa.registry.util.StringFormatter;
import com.google.common.collect.Lists;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.math.RandomUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -89,4 +97,129 @@ public void testLeaderInfo() {
Assert.assertEquals(
leaderInfo.getExpireTimestamp(), lock.getGmtModified().getTime() + 1000 / 2);
}

private static class ElectLoop extends LoopRunnable {
private final MetaJdbcLeaderElector elector;

private ElectLoop(MetaJdbcLeaderElector elector) {
this.elector = elector;
}

@Override
public void runUnthrowable() {
elector.elect();
}

@Override
public void waitingUnthrowable() {
ConcurrentUtils.sleepUninterruptibly(250, TimeUnit.MILLISECONDS);
}
}

private static class LeaderChecker extends LoopRunnable {
private final List<ElectLoop> loops;

private LeaderChecker(List<ElectLoop> loops) {
this.loops = loops;
}

@Override
public void runUnthrowable() {
int leaderCount = 0;
for (ElectLoop loop : loops) {
if (loop.elector.amILeader()) {
leaderCount++;
}
}
Assert.assertTrue(StringFormatter.format("leaderCount {}", leaderCount), leaderCount <= 1);
}

@Override
public void waitingUnthrowable() {
int sleep = (int) (RandomUtils.nextFloat() * 150 + 150);
ConcurrentUtils.sleepUninterruptibly(sleep, TimeUnit.MILLISECONDS);
}
}

@Test
public void testConcurrentComplete() {
MetaElectorConfigBean metaElectorConfigBean = new MetaElectorConfigBean();
metaElectorConfigBean.setLockExpireDuration(2000);
List<ElectLoop> loops = Lists.newArrayList();
for (int i = 0; i < 20; i++) {
MetaJdbcLeaderElector e = new MetaJdbcLeaderElector();
e.setAddress(StringFormatter.format("{}", i));
e.distributeLockMapper = distributeLockMapper;

e.defaultCommonConfig = defaultCommonConfig;
e.metaElectorConfig = metaElectorConfigBean;
ElectLoop loop = new ElectLoop(e);
loops.add(loop);
}
// 0(旧版) 抢到leader
loops.get(0).runUnthrowable();
DistributeLockDomain domain =
distributeLockMapper.queryDistLock(
defaultCommonConfig.getClusterId(TableEnum.DISTRIBUTE_LOCK.getTableName()),
MetaJdbcLeaderElector.lockName);
Assert.assertEquals("0", domain.getOwner());
// 启动多个elector开始竞争
int i = 0;
for (ElectLoop loop : loops) {
ConcurrentUtils.createDaemonThread(StringFormatter.format("testComplete-{}", i), loop)
.start();
i++;
}
LeaderChecker leaderChecker = new LeaderChecker(loops);
ConcurrentUtils.createDaemonThread("LeaderChecker", leaderChecker).start();
// 没有其他follower能抢到leader
for (int j = 0; j < 30; j++) {
ConcurrentUtils.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
domain =
distributeLockMapper.queryDistLock(
defaultCommonConfig.getClusterId(TableEnum.DISTRIBUTE_LOCK.getTableName()),
MetaJdbcLeaderElector.lockName);
Assert.assertEquals("0", domain.getOwner());
Assert.assertFalse(StringFormatter.format("{}", domain), domain.expire());
}
// 停止争抢
for (ElectLoop loop : loops) {
loop.suspend();
}
ConcurrentUtils.sleepUninterruptibly(3000, TimeUnit.MILLISECONDS);
// 1(新版)抢到leader
loops.get(1).runUnthrowable();
domain =
distributeLockMapper.queryDistLock(
defaultCommonConfig.getClusterId(TableEnum.DISTRIBUTE_LOCK.getTableName()),
MetaJdbcLeaderElector.lockName);
Assert.assertEquals("1", domain.getOwner());
// 恢复争抢
for (ElectLoop loop : loops) {
loop.resume();
}
// 没有其他follower能抢到leader
for (int j = 0; j < 30; j++) {
ConcurrentUtils.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
domain =
distributeLockMapper.queryDistLock(
defaultCommonConfig.getClusterId(TableEnum.DISTRIBUTE_LOCK.getTableName()),
MetaJdbcLeaderElector.lockName);
Assert.assertEquals("1", domain.getOwner());
Assert.assertFalse(StringFormatter.format("{}", domain), domain.expire());
}
// 1 停止续约,其他follower抢到leader
loops.get(1).suspend();
ConcurrentUtils.sleepUninterruptibly(3000, TimeUnit.MILLISECONDS);
domain =
distributeLockMapper.queryDistLock(
defaultCommonConfig.getClusterId(TableEnum.DISTRIBUTE_LOCK.getTableName()),
MetaJdbcLeaderElector.lockName);
Assert.assertNotEquals("1", domain.getOwner());

for (ElectLoop loop : loops) {
loop.close();
}
leaderChecker.close();
}
}

0 comments on commit 197bb56

Please sign in to comment.