Skip to content

Latest commit

 

History

History
361 lines (249 loc) · 21.8 KB

File metadata and controls

361 lines (249 loc) · 21.8 KB

Paxos

原文

https://martinfowler.com/articles/patterns-of-distributed-systems/paxos.html

采用两阶段共识构建,即便节点断开连接,也能安全地达成共识。

2021.1.5

问题

当多个节点共享状态时,它们往往需要彼此之间就对某个特定值达成一致。采用领导者和追随者(Leader and Followers)模式,领导者会确定这个值,并将其传递给追随者。但是,如果没有领导者,这些节点就需要自己确定一个值。(即便采用了领导者和追随者,它们也需要这么做选举出一个领导者。)

通过采用两阶段提交(Two Phase Commit),领导者可以确保副本安全地获得更新,但是,如果没有领导者,我们可以让竞争的节点尝试获取 Quorum。这个过程更加复杂,因为任何节点都可能会失效或断开连接。一个节点可能会在一个值上得到 Quorum,但在将这个值传给整个集群之前,它就断开连接了。

解决方案

Paxos 算法由 Leslie Lamport 开发,发表于 1998 年的论文《The Part-Time Parliament》中。Paxos 的工作分为三个阶段,以确保即便在部分网络或节点失效的情况下,多个节点仍能对同一值达成一致。前两个阶段的工作是围绕一个值构建共识,最后一个阶段是将该共识传达给其余的副本。

  • 准备阶段:建立最新的世代时钟(Generation Clock),收集已经接受的值。
  • 接受阶段:提出该世代的值,让各副本接受。
  • 提交阶段:让所有的副本了解已经选择的这个值。

在第一阶段(称为准备阶段),提出值的节点(称为提议者)会联系集群中的所有节点(称为接受者),它会询问他们是否能承诺(promise)考虑它给出的值。一旦接受者形成一个 Quorum,都返回其承诺(promise),提议者就会进入下一个阶段。在第二个阶段中(称为接受阶段),提议者会发出提议的值,如果节点的 Quorum 接受了这个值,那这个值就被选中了。在最后一个阶段(称为提交阶段),提议者就会把这个选中的值提交到集群的所有节点上。

协议流程

Paxos 是一个难于理解的协议。我们先从一个展示协议典型流程的例子开始,然后再深入到其工作细节之中。我们想通过这个解释提供一个对协议工作原理的直观感受,而非一个全面的描述当做某个实现的基础。

下面是协议的极简概述。

提议者 接受者
世代时钟获取下一个世代号。向所有的接受者发送带有该世代值的准备请求。
如果准备请求的世代号晚于其承诺的世代变量,它会用这个比较晚的值更新其承诺的世代,并返回一个承诺应答。如果它接受了这个提议,则返回这个提议。
收到来自接受者 Quorum 的承诺时,它会查看这些应答中包含已接受的值。如果是这样,它就把自己的提议值改成返回的具有最高世代号的提议值。向所有接受者发送接受请求,并附上它的世代号以及提议值。
如果接受请求的世代号晚于其承诺的世代变量,它会将提议存储起来,作为其接受的提议,并应答说已经接受了该请求。
收到来自接受者 Quorum 的成功响应时,它将该值记录为选中,并向所有节点发送提交消息。

这些是 paxos 的基本规则,但想要理解它们是如何组成一个有效行为却是非常困难的。因此,这里会用一个说明它是如何工作的。

考虑一个有五个节点的集群:雅典(Athens)、拜占庭(Byzantium)、锡兰(Cyrene)、德尔菲(Delphi)和以弗所(Ephesus)。一个客户端联系雅典(Athens)节点,请求将名字设置为“alice”。雅典(Athens)现在需要发起一个 Paxos 交互,看是否所有节点都同意这个变化。雅典(Athens)称为提议者,因为在这个过程中,雅典(Athens)向所有其它节点建议将集群中的名字改成“alice”。集群中的所有节点(包括雅典(Athens)自身)都是“接受者”,这意味着它们能够接受提议。

在雅典(Athens)提议“alice”的同时,以弗所(Ephesus)节点也得到了一个请求,将名字设置为“elanor”。这让以弗所(Ephesus)也成为了一个提议者。

在准备阶段,提议者首先发送一些准备请求,这些请求都包括一个世代数。由于 Paxos 旨在避免单点故障,我们不会从单一的世代时钟中获取这个数字。相反,每个节点都维护着自己的世代时钟,它将生成号码与节点 ID 相结合。节点 ID 被用来打破平局,所以,[2,a] > [1,e] > [1,a]。每个接受者都记录着它到目前为止所见的最新承诺。

节点 雅典(Athens) 拜占庭(Byzantium) 锡兰(Cyrene) 德尔菲(Delphi) 以弗所(Ephesus)
承诺的世代 1,a 1,a 0 1,e 1,e
接受的值

由于它们在此之前没有见过任何请求,所以,它们都会向调用的提议者返回一个承诺。我们将返回的值称为“承诺”,因为它表明接受者承诺不考虑任何世代时钟早于已承诺的消息。

雅典(Athens)将准备好的信息发送给锡兰(Cyrene)。当它收到一个返回的承诺时,这意味着它现在已经得到了五个节点中三个节点的承诺,这表示达成了一个 Quorum。雅典(Athens)现在就从发送准备信息切换为发送接受信息。

有可能雅典(Athens)未能收到大多数集群节点的承诺。在这种情况下,雅典(Athens)可以通过递增世代时钟的方式对准备请求进行重试。

节点 雅典(Athens) 拜占庭(Byzantium) 锡兰(Cyrene) 德尔菲(Delphi) 以弗所(Ephesus)
承诺的世代 1,a 1,a 1,a 1,e 1,e
接受的值

雅典(Athens)现在开始发送接受信息,其中包含世代以及提议的值。雅典(Athens)和拜占庭(Byzantium)接受了该提议。

节点 雅典(Athens) 拜占庭(Byzantium) 锡兰(Cyrene) 德尔菲(Delphi) 以弗所(Ephesus)
承诺的世代 1,a 1,a 1,a 1,e 1,e
接受的值 alice alice

以弗所(Ephesus)现在向锡兰(Cyrene)发出了一个准备信息。锡兰(Cyrene)曾向雅典(Athens)发出一次承诺,但以弗所(Ephesus)的请求有着更高的世代,所以它优先。锡兰(Cyrene)向以弗所(Ephesus)发回了一个承诺。

锡兰(Cyrene)现在接收到雅典(Athens)的接受请求,但却拒绝了它,因为其世代数已经落后于它对以弗所(Ephesus)的承诺。

节点 雅典(Athens) 拜占庭(Byzantium) 锡兰(Cyrene) 德尔菲(Delphi) 以弗所(Ephesus)
承诺的世代 1,a 1,a 1,e 1,e 1,e
接受的值 alice alice

现在,以弗所(Ephesus)已经从它的准备消息中得到了一个 Quorum,所以,它可以继续发送接受消息了。它向自己与德尔菲(Delphi)发送了接受消息,但是,在它发送更多的接受消息之前,它崩溃了。

节点 雅典(Athens) 拜占庭(Byzantium) 锡兰(Cyrene) 德尔菲(Delphi) 以弗所(Ephesus)
承诺的世代 1,a 1,a 1,e 1,e 1,e
接受的值 alice alice elanor elanor

与此同时,雅典(Athens)必须处理其接受请求被锡兰(Cyrene)拒绝的问题。这表明它的 Quorum 不再能够给予它承诺了,因此,其提议会失败。一个提议者像这样失去最初的 Quorum,这种情况就会发生;另一个提议者要取得 Quorum,第一个提议者的 Quorum 中至少要有一个成员叛变。

在一个简单的两阶段提交的情况下,我们会期望以弗所(Ephesus)继续执行下去,让它的值得到选择,这个模式会有问题,因为以弗所(Ephesus)已经崩溃了。如果它拥有了接受者 Quorum 的锁,它的崩溃会让整个提议过程陷入死锁。然而,Paxos 预计到这种事情会发生,因此,雅典(Athens)会再进行一次尝试,这次它会采用一个更高的世代数。

它会再次发送准备消息,但是这次的世代数会更高。同第一轮一样,它依然会得到三组承诺,但会有一个重要的区别。雅典(Athens)之前已经接受了“alice”,德尔菲(Delphi)已经接受了“elanor”。这两个接受者都返回了承诺,而且还返回了它们已经接受的值,以及它们所接受提议的世代数。在返回这个值的时候,它们会更新其承诺的世代,也就变成了[2,a],这样就可以反映它们对雅典(Athens)所做的承诺。

节点 雅典(Athens) 拜占庭(Byzantium) 锡兰(Cyrene) 德尔菲(Delphi) 以弗所(Ephesus)
承诺的世代 2,a 1,a 2,a 2,a 1,e
接受的值 alice alice elanor elanor

拥有了 Quorum 的雅典(Athens)现在必须进入到接受阶段,但它提议拥有最高世代的已接受值,也就是“elanor”,这是德尔菲(Delphi)所接受的,其世代为[1,e],它大于雅典(Athens)接受的“alice”,其世代为[1,a]。

雅典(Athens)开始发送接受请求,但是,现在发出的是“elanor”及其当前世代。雅典(Athens)给自己发了一个接受请求,这会得到接受。这是一个关键的接受,因为现在有三个节点接受了“elanor”,也就是说,“elanor”达到了 Quorum,因此,我们可以认为“elanor”成为了选中的值。

节点 雅典(Athens) 拜占庭(Byzantium) 锡兰(Cyrene) 德尔菲(Delphi) 以弗所(Ephesus)
承诺的世代 2,a 1,a 2,a 2,a 1,e
接受的值 elanor alice elanor elanor

但是,尽管“elanor”现已成为选中的值,但没人知道这一点。在接受阶段,雅典(Athens)只知道自己有“elanor”这个值,这不是一个 Quorum,而且以弗所(Ephesus)已经下线了。雅典(Athens)需要做的就是再接受到几个接受请求,它就可以提交了。但此时,雅典(Athens)崩溃了。

在这个时点上,雅典(Athens)和以弗所(Ephesus)此刻都已经崩溃了。但是集群仍然有一个节点的 Quorum 在运行,所以,它们应该能够继续工作,事实上,通过遵循协议,他们可以发现 “elanor”是选中的值。

锡兰(Cyrene)接收到一个请求,将名字设置为“carol”,因此,它变成了一个提议者。它看到了[2,a]这个世代,所以,它会启动[3,c]这个世代的准备阶段。虽然它希望提议用“carol”作为名字,但当前它只是发出了准备请求。

锡兰(Cyrene)向集群中的其余节点发送准备信息。与雅典(Athens)之前的准备阶段一样,锡兰(Cyrene)会得到已接受的值,所以,“carol”不会得到提议的机会。同之前一样,德尔菲(Delphi)的“elanor”比拜占庭(Byzantium)的“alice”晚,所以,锡兰(Cyrene)会用 “elanor”和[3,c]开启一个接受阶段。

节点 雅典(Athens) 拜占庭(Byzantium) 锡兰(Cyrene) 德尔菲(Delphi) 以弗所(Ephesus)
承诺的世代 2,a 3,c 3,c 3,c 1,e
接受的值 elanor alice elanor elanor

虽然我还可以继续崩溃和唤醒节点,但现在很明显,“elanor”将赢得胜利。只要有节点的 Quorum 在运行,其中至少有一个节点的值是 “elanor”,任何试图进行准备的节点都必须联系一个接受了“elanor”的节点,以便在准备阶段获得一个 Quorum。因此,我们将以 锡兰(Cyrene)发出提交结束这个讨论。

在某些时点,雅典(Athens)和以弗所(Ephesus)会重新上线,它们会发现 Quorum 的选择。

请求无需拒绝

在上面的例子中,我们看到接受者拒绝了世代较老的请求。但是,协议并不要求像这样明确地拒绝。按照规定,接受者可以直接忽略一个过期的请求。如果是这种情况,那么,协议仍然可以收敛在一个共识的值上。这是协议的一个重要特征,因为这是一个分布式系统,连接在任何时候都可能会丢失,所以,不依赖拒绝,对于确保协议安全而言,是有益的。(这里的安全意味着协议将会选择唯一的一个值,一旦选择,就不会改写)。

然而,发送拒绝书仍然是有用的,因为它可以提高性能。提议者越快地发现它们已经老了,它们就能越快开始另一轮更高的世代。

竞争的提议者可能无法选择

这个协议可能出错的一种方式是,两个(或更多)提议者进入了一个循环。

  • 雅典(Athens)和拜占庭(Byzantium)接受了 alice。
  • 所有节点都为 elanor 做了准备,这阻止 alice 获得 Quorum。
  • 德尔菲(Delphi)和以弗所(Ephesus)接受了 elanor。
  • 所有节点都为 alice 做了准备,这阻止 elanor 获得 Quorum。
  • 雅典(Athens)和拜占庭(Byzantium)接受了 alice。

...以此类推,这种情况称为活锁(livelock)。

FLP的不可能性结果(FLP Impossibility Result)显示,即使只有一个有问题的节点,这也能阻止整个集群选出一个值。

每当一个提议者需要选择一个新的世代时,它必须等待一段随机的时间,我们可以以此确保减少这种活锁发生的机会。一个提议者在另一个提议者向全部 Quorum 发起准备请求之前,就让一个 Quorum 得到接受,这种随机性就让这种情况成为了可能。

但我们永远无法杜绝活锁的发生。这是一个基本的权衡:要么确保安全,要么确保活锁,二者不能得兼。Paxos 首先确保安全。

一个样例的键值存储

这里解释的 Paxos 协议,构建的是对于单一值的共识(通常称为单一 Paxos)。大多数主流产品(如 Cosmos DBSpanner)中使用的实际实现都是对 Paxos 进行了修改,称为多重 paxos,其实现方式为 复制日志(Replicated Log)

但是,一个简单的键值存储可以使用基本的 Paxos 进行构建。cassandra 以类似的方式使用基本 Paxos 实现了其轻量级的事务。

键值存储为每个键值维护了一个 Paxos 实例。

class PaxosPerKeyStoreint serverId;
  public PaxosPerKeyStore(int serverId) {
      this.serverId = serverId;
  }

  Map<String, Acceptor> key2Acceptors = new HashMap<String, Acceptor>();
  List<PaxosPerKeyStore> peers;

Acceptor 存储了 promisedGeneration、acceptedGeneration 和 acceptedValue。

class Acceptorpublic class Acceptor {
      MonotonicId promisedGeneration = MonotonicId.empty();

      Optional<MonotonicId> acceptedGeneration = Optional.empty();
      Optional<Command> acceptedValue = Optional.empty();

      Optional<Command> committedValue = Optional.empty();
      Optional<MonotonicId> committedGeneration = Optional.empty();

      public AcceptorState state = AcceptorState.NEW;
      private BiConsumer<Acceptor, Command> kvStore;

当键值和值放到了 kv 存储时,它就运行了 Paxos 协议。

class PaxosPerKeyStoreint maxKnownPaxosRoundId = 1;
  int maxAttempts = 4;
  public void put(String key, String defaultProposal) {
      int attempts = 0;
      while(attempts <= maxAttempts) {
          attempts++;
          MonotonicId requestId = new MonotonicId(maxKnownPaxosRoundId++, serverId);
          SetValueCommand setValueCommand = new SetValueCommand(key, defaultProposal);

          if (runPaxos(key, requestId, setValueCommand)) {
              return;
          }

          Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), MILLISECONDS);
          logger.warn("Experienced Paxos contention. Attempting with higher generation");
      }
      throw new WriteTimeoutException(attempts);
  }

  private boolean runPaxos(String key, MonotonicId generation, Command initialValue) {
      List<Acceptor> allAcceptors = getAcceptorInstancesFor(key);
      List<PrepareResponse> prepareResponses = sendPrepare(generation, allAcceptors);
      if (isQuorumPrepared(prepareResponses)) {
          Command proposedValue = getValue(prepareResponses, initialValue);
          if (sendAccept(generation, proposedValue, allAcceptors)) {
              sendCommit(generation, proposedValue, allAcceptors);
          }
          if (proposedValue == initialValue) {
              return true;
          }
      }
      return false;
  }

  public Command getValue(List<PrepareResponse> prepareResponses, Command initialValue) {
      PrepareResponse mostRecentAcceptedValue = getMostRecentAcceptedValue(prepareResponses);
      Command proposedValue
              = mostRecentAcceptedValue.acceptedValue.isEmpty() ?
              initialValue : mostRecentAcceptedValue.acceptedValue.get();
      return proposedValue;
  }

  private PrepareResponse getMostRecentAcceptedValue(List<PrepareResponse> prepareResponses) {
      return prepareResponses.stream().max(Comparator.comparing(r -> r.acceptedGeneration.orElse(MonotonicId.empty()))).get();
  }
class Acceptorpublic PrepareResponse prepare(MonotonicId generation) {

      if (promisedGeneration.isAfter(generation)) {
          return new PrepareResponse(false, acceptedValue, acceptedGeneration, committedGeneration, committedValue);
      }
      promisedGeneration = generation;
      state = AcceptorState.PROMISED;
      return new PrepareResponse(true, acceptedValue, acceptedGeneration, committedGeneration, committedValue);

  }
class Acceptorpublic boolean accept(MonotonicId generation, Command value) {
      if (generation.equals(promisedGeneration) || generation.isAfter(promisedGeneration)) {
          this.promisedGeneration = generation;
          this.acceptedGeneration = Optional.of(generation);
          this.acceptedValue = Optional.of(value);
          return true;
      }
      state = AcceptorState.ACCEPTED;
      return false;
  }

只有当值成功地提交时,它才会存储到 kv 存储中。

class Acceptorpublic void commit(MonotonicId generation, Command value) {
      committedGeneration = Optional.of(generation);
      committedValue = Optional.of(value);
      state = AcceptorState.COMMITTED;
      kvStore.accept(this, value);
  }
class PaxosPerKeyStoreprivate void accept(Acceptor acceptor, Command command) {
      if (command instanceof SetValueCommand) {
          SetValueCommand setValueCommand = (SetValueCommand) command;
          kv.put(setValueCommand.getKey(), setValueCommand.getValue());
      }
      acceptor.resetPaxosState();
  }

Paxos 状态需要持久化。使用预写日志(Write-Ahead Log)可以轻松做到这一点。

处理多值

值得注意的是,Paxos 在处理单值上有详细的做法,而且得到了证明。因此,用单值 Paxos 协议处理多值需要在协议规范之外进行处理。一种替代方法是重置状态,单独存储提交过的值,以确保它们不会丢失。

class Acceptorpublic void resetPaxosState() {
      //This implementation has issues if committed values are not stored
      //and handled separately in the prepare phase.
      //See Cassandra implementation for details.
      //https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/SystemKeyspace.java#L1232
      promisedGeneration = MonotonicId.empty();
      acceptedGeneration = Optional.empty();
      acceptedValue = Optional.empty();
  }

[(gryadka)[https://github.com/gryadka/js]]给出了另外一种做法,它稍微修改了一下基本的 Paxos 以便设置多个值。在基本的算法之外执行一些步骤,这种需求就是在实践中首选复制日志(Replicated Log)的原因。

读取值

Paxos 依靠于准备阶段对任何未提交的值进行检测。因此,如果采用基本的 Paxos 实现如上所示的键值存储,那读取操作也需要运行完整的 Paxos 算法。

class PaxosPerKeyStorepublic String get(String key) {
      int attempts = 0;
      while(attempts <= maxAttempts) {
          attempts++;
          MonotonicId requestId = new MonotonicId(maxKnownPaxosRoundId++, serverId);
          Command getValueCommand = new NoOpCommand(key);
          if (runPaxos(key, requestId, getValueCommand)) {
              return kv.get(key);
          }

          Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), MILLISECONDS);
          logger.warn("Experienced Paxos contention. Attempting with higher generation");

      }
      throw new WriteTimeoutException(attempts);
  }

示例

cassandra 采用 Paxos 实现了轻量级事务。

所有的共识算法,比如 Raft,都采用了类似于基本的 Paxos 的基本概念。两阶段提交(Two Phase Commit)Quorum世代时钟(Generation Clock)的使用方式都是类似的。