forked from javagrowing/JGrowing
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
801abd5
commit e236a43
Showing
1 changed file
with
157 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
# 1.背景 | ||
最近在修改`Seata`线程并发的一些问题,把其中一些经验总结给大家。先简单描述一下这个问题,在`Seata`这个分布式事务框架中有个全局事务的概念,在大多数情况下,全局事务的流程基本是顺序推进不会出现并发问题,但是当一些极端的情况下,会出现多线程访问导致我们全局事务处理不正确。 | ||
如下面代码所示: | ||
在我们全局事务`commit`阶段,有一个如下代码: | ||
``` | ||
if (status == GlobalStatus.Begin) { | ||
globalSession.changeStatus(GlobalStatus.Committing); | ||
} | ||
``` | ||
代码有些省略,就是先判断status状态是否Begin状态,然后改变状态为Committing。 | ||
|
||
在我们全局事务rollback阶段,有一个如下代码: | ||
``` | ||
if (status == GlobalStatus.Begin) { | ||
globalSession.changeStatus(GlobalStatus.Rollbacking); | ||
} | ||
``` | ||
同样的也省略了部分代码,这里先判断`status`状态是否为`begin`,然后改变为`Rollbacking`。这里再`Seata`的代码中并没有做一些线程同步的手段,如果这两个逻辑同时执行(一般情况下不会,但是极端情况下可能会出现),会让我们的结果出现不可预料的错误。而我们所要做的就是解决这种极端情况下来的并发出现的问题。 | ||
|
||
|
||
# 2.悲观锁 | ||
对于这种并发出现问题我相信大家第一时间想到的肯定是加锁,在Java中我们我们一般采用下面两个手段进行加锁: | ||
- `Synchronized` | ||
- `ReentrantLock` | ||
|
||
我们可以利用`Synchronized` 或者 `ReentrantLock`进行加锁,可以将代码修改成下面的逻辑: | ||
|
||
synchronized: | ||
|
||
``` | ||
synchronized(globalSession){ | ||
if (status == GlobalStatus.Begin) { | ||
globalSession.changeStatus(GlobalStatus.Rollbacking); | ||
} | ||
} | ||
``` | ||
|
||
`ReentrantLock`进行加锁: | ||
|
||
``` | ||
reentrantLock.lock(); | ||
try { | ||
if (status == GlobalStatus.Begin) { | ||
globalSession.changeStatus(GlobalStatus.Rollbacking); | ||
} | ||
}finally { | ||
reentrantLock.unlock(); | ||
} | ||
``` | ||
对于这种加锁比较简单,在`Seata`的`Go-Server`中目前是这样实现的。但是这种实现场景忽略了我们上面所说的一种情况,就是极端情况下,也就是有可能99.9%的情况下可能不会出现并发问题,只有%0.1的情况可能导致这个并发问题。虽然我们悲观锁一次加锁的时间也比较短,但是在这种高性能的中间件中还是不够,那么就引入了我们的乐观锁。 | ||
# 3.乐观锁 | ||
一提起乐观锁,很多朋友都会想到数据库中乐观锁,想象一下上面的逻辑如果在数据库中,并且没有利用乐观锁去做,我们会有如下的伪代码逻辑: | ||
|
||
``` | ||
select * from table where id = xxx for update; | ||
if(status == begin){ | ||
//do other thing | ||
update table set status = rollbacking; | ||
} | ||
``` | ||
上述代码在我们很多的业务逻辑中都能看见,这段代码有两个小问题: | ||
1. 事务较大,由于我们一上来就对我们数据加锁,那么必定在一个事务中,我们的查询和更新之间如果穿插了一些比较耗时的逻辑那么我们的事务就会导致较大。由于我们的每一个事务都会占据一个数据库连接,那么在流量较高的时会很容易出现数据库连接池不够的情况。 | ||
2. 锁定数据时间较长,在我们整个事务中都是对这条数据加了行锁,如果有其他事务想对这个数据进行修改那么会长时间阻塞等待。 | ||
|
||
所以为了解决上面的问题,在很多如果竞争不大的场景下,我们就采用了乐观锁的方法,我们在数据库中加一个字段version代表着版本号,我们将代码修改成如下所示: | ||
|
||
``` | ||
select * from table where id = xxx ; | ||
if(status == begin){ | ||
//do other thing | ||
int result = (update table set status = rollbacking where version = xxx); | ||
if(result == 0){ | ||
throw new someException(); | ||
} | ||
} | ||
``` | ||
这里我们的查询语句不再有for update,我们的事务也只缩小到update一句,我们通过我们第一句查询出来的version来进行判断,如果我们的更新的更新的行数为0,那么就证明其他事务对他进行了修改。这里可以抛出异常或者做一些其他的事。 | ||
|
||
从这里可以看出我们使用乐观锁将事务较大,锁定较长这两个问题都解决,但是对应而来的成本就是如果更新失败我们可能就会抛出异常或者做一些其他补救的措施,而我们的悲观锁在执行业务之前都已经限制住了。所以我们这里使用乐观锁一定只能在对某条数据并发处理的情况比较小的情况下。 | ||
|
||
## 3.1 代码中的乐观锁 | ||
我们上面讲述了在数据库中的乐观锁,很多人就在问,没有数据库,在我们代码中怎么去实现乐观锁呢?熟悉`synchronized`的同学肯定知道`synchronized`在Jdk1.6之后对其进行了优化,引入了锁膨胀的一个模型: | ||
- 偏向锁:顾名思义偏向某个线程的锁,适用于某个线程能长期获取到该锁。 | ||
- 轻量级锁:如果偏向锁获取失败,那么会使用CAS自旋来完成,轻量级锁适用于线程交替进入临界区。 | ||
- 重量级锁:自旋失败之后,会采取重量级锁策略我们线程会阻塞挂起。 | ||
|
||
上面的级种锁模型中轻量级锁所适用的线程交替进入临界区很适合我们的场景,因为我们的全局事务一般来说不会是某个单线程一直在处理该事务(当然也可以优化成这个模型,只是设计会比较复杂),我们的全局事务再大多数情况下都会是不同线程交替进入处理这个事务逻辑,所以我们可以借鉴轻量级锁CAS自旋的思想,完成我们代码级别的自旋锁。这里也有朋友可能会问为什么不用synchronized呢?这里经过实测在交替进入临界区我们自己实现的CAS自旋性能是最高的,并且synchronized没有超时机制,不方便我们处理异常情况。 | ||
|
||
``` | ||
class GlobalSessionSpinLock { | ||
|
||
private AtomicBoolean globalSessionSpinLock = new AtomicBoolean(true); | ||
|
||
public void lock() throws TransactionException { | ||
boolean flag; | ||
do { | ||
flag = this.globalSessionSpinLock.compareAndSet(true, false); | ||
} | ||
while (!flag); | ||
} | ||
|
||
|
||
public void unlock() { | ||
this.globalSessionSpinLock.compareAndSet(false, true); | ||
} | ||
} | ||
// method rollback | ||
void rollback(){ | ||
globalSessionSpinLock.lock(); | ||
try { | ||
if (status == GlobalStatus.Begin) { | ||
globalSession.changeStatus(GlobalStatus.Rollbacking); | ||
} | ||
}finally { | ||
globalSessionSpinLock.unlock(); | ||
} | ||
} | ||
|
||
``` | ||
|
||
上面我们用`CAS`简单的实现了一个乐观锁,但是这个乐观锁有个小缺点就是一旦出现竞争不能膨胀为悲观锁阻塞等待,并且也没有过期超时,有可能大量占用我们的`CPU`,我们又继续进一步优化: | ||
|
||
``` | ||
public void lock() throws TransactionException { | ||
boolean flag; | ||
int times = 1; | ||
long beginTime = System.currentTimeMillis(); | ||
long restTime = GLOBAL_SESSOION_LOCK_TIME_OUT_MILLS ; | ||
do { | ||
restTime -= (System.currentTimeMillis() - beginTime); | ||
if (restTime <= 0){ | ||
throw new TransactionException(TransactionExceptionCode.FailedLockGlobalTranscation); | ||
} | ||
// Pause every PARK_TIMES_BASE times,yield the CPU | ||
if (times % PARK_TIMES_BASE == 0){ | ||
// Exponential Backoff | ||
long backOffTime = PARK_TIMES_BASE_NANOS << (times/PARK_TIMES_BASE); | ||
long parkTime = backOffTime < restTime ? backOffTime : restTime; | ||
LockSupport.parkNanos(parkTime); | ||
} | ||
flag = this.globalSessionSpinLock.compareAndSet(true, false); | ||
times++; | ||
} | ||
while (!flag); | ||
} | ||
|
||
``` | ||
上面的代码做了如下几个优化: | ||
- 引入了超时机制,一般来说一个要做好这种对临界区域加锁一定要做好超时机制,尤其是在这种对性能要求较高的中间件中。 | ||
- 引入了锁膨胀机制,这里没循环一定次数如果获取不到锁,那么会线程挂起`parkTime`时间,挂起之后又继续循环获取,如果再次获取不到,此时我们会对我们的parkTime进行指数退避形式的挂起,将我们的挂起时间逐渐增长,直到超时。 | ||
|
||
# 总结 | ||
从我们对并发控制的处理来看,想要达到一个目的,要实现它方法是有多种多样的,我们需要根据不同的场景,不同的条件,选择合适的方法,选择最高效的手段来完成我们的目的。本文没有对悲观锁的原理做太多的阐述,这里有兴趣的可以下来自行查阅资料,读完本文如果你只能记住一件事,那么请记住实现线程并发安全的时候别忘记考虑乐观锁。 | ||
|
||
> 如果大家觉得这篇文章对你有帮助,你的关注和转发是对我最大的支持,O(∩_∩)O: | ||
|
||
![](https://user-gold-cdn.xitu.io/2018/7/22/164c2ad786c7cfe4?w=500&h=375&f=jpeg&s=215163) |