Skip to content

Commit

Permalink
update informer.md
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-hutao committed Jun 20, 2019
1 parent 2233033 commit d3f9a0b
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 20 deletions.
2 changes: 1 addition & 1 deletion around/client-go/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# client-go

client-go 部分我步打算从头到尾一点点讲。在核心组件源码的分析过程中有用到一些 client-go 中的关键特性时咱以专题形式逐渐添加进来。
client-go 部分我不打算从头到尾一点点讲。在核心组件源码的分析过程中有用到一些 client-go 中的关键特性时咱以专题形式逐渐添加进来。

## 本章规划

Expand Down
59 changes: 40 additions & 19 deletions around/client-go/informer.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,40 @@ Informer 是 client-go 中一个比较核心的工具,通过 Informer(实际

## 架构概览

自定义控制器的工作流程基本如下图所示,我们今天要分析图中上半部分的逻辑。
自定义控制器的工作流程基本如下图所示,我们今天要分析图中上半部分的逻辑。(图片来自https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md)

![1555996411720](image/informer/1555996411720.png)

我们开发自定义控制器的时候用到的“机制”主要定义在 client-go 的 tool/cache下:

![1556075198766](image/informer/1556075198766.png)

我们根据图中的9个步骤来跟源码
先关注一下第一幅图中涉及到的一些 components:

## reflector - List & Watch API Server
### client-go 相关模块

- **Reflector**: Reflector 类型定义在 cache 包中(*tools/cache/reflector.go:47*),它的作用是向 apiserver watch 特定的资源类型。这个功能通过其绑定的 ListAndWatch 方法实现。Watch 的资源可以是 in-build 的资源也可以是 custom 的资源。当 Reflector 通过 watch API 接收到存在新的资源对象实例的通知后,它使用相应的 list API 获取新创建的资源对象,然后 put 进 Delta Fifo 队列。这个步骤在 watchHandler 函数(*tools/cache/reflector.go:268*)中完成。
- **Informer**: 一个定义在 cache 包中的基础 controller(*tools/cache/controller.go:75*) (一个 informer) 从 Delta Fifo 队列中 pop 出来资源对象实例(这个功能在 processLoop 中实现(*tools/cache/controller.go:148*))。这个 base controller 做的工作是保存这个对象用于后续检索处理用的,然后触发我们自己的控制器来处理这个对象。
- **Indexer**: Indexer 提供的是 objects 之上的检索能力。Indexer 也定义在 cache 包中(*tools/cache/index.go:27*). 一个典型的检索使用方式是基于一个对象的 labels 创建索引。Indexer 可以基于各种索引函数维护索引。Indexer 使用一个线程安全的 store 来存储对象和其对应的 key. 还有一个默认函数 MetaNamespaceKeyFunc(*tools/cache/store.go:76*) 可以生成对象的 key,类似 <namespace>/<name> 格式来关联对应的对象。

### 自定义控制器相关模块

- **Informer reference**: 这是一个知道如何处理自定义资源对象的 Informer 实例的引用。自定义控制器需要创建合适的 Informer.
- **Indexer reference**: 这是一个知道如何处理自定义资源对象的 Indexer 实例的引用. 自定义控制器代码需要创建这个引用对象,然后用于检索资源对象用于后续的处理。

Base controller 提供了 NewIndexerInformer(*tools/cache/controller.go:345*) 函数来创建 Informer 和 Indexer. 在代码里我们可以直接调用这个函数或者使用工厂方法来创建 informer.

- **Resource Event Handlers**: 这是一个回调函数,在 Informer 想要分发一个对象给控制器的时候会调用这个函数。典型的用法是写一个函数来获取分发过来的对象的 key,将 key 放入队列中等待进一步的处理。
- **Work queue**: 这个队列是在自己的控制器代码中创建的,用来解耦一个对象的分发和处理过程。Resource event handler 函数会被写成提取分发来的对象的 key,然后将这个 key 添加到 work queue 里面。
- **Process Item** 这是我们在自己代码中实现的用来处理 work queue 中拿到的 items 的函数。这里可以有一个或多个函数来处理具体的过程,这个函数的典型用法是使用 Indexer 索引或者一个 Listing wrapper 来根据相应的 key 检索对象。

下面我们根据图中这几个步骤来跟源码。

## 第一步:reflector - List & Watch API Server

Reflector 会监视特定的资源,将变化写入给定的存储中,也就是 Delta FIFO queue.

### Reflector 对象
### Reflector 对象定义

Reflector 的中文含义是反射器,我们先看一下类型定义:

Expand Down Expand Up @@ -91,7 +110,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// 开始 watch
w, err := r.listerWatcher.Watch(options)
// ……
// w 交给 watchHandler 处理
// w 交给 watchHandler 处理,这里的逻辑后面分析
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
Expand All @@ -102,9 +121,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}
```

## watchHandler - add obj to delta fifo
## 第二步:watchHandler - add obj to delta fifo

前面讲到 ListAndWatch 函数的最后一步逻辑是 watchHandler,在 ListAndWatch 中先是更新了 Delta FIFO 中的 item,然后 watch 资源对象,最后交给 watchHandler 处理,所以 watchHandler 基本可以猜到是将有变化的资源添加到 Delta FIFO 中了
前面讲到 **ListAndWatch** 函数的最后一步逻辑是 **watchHandler**,在 ListAndWatch 中先是更新了 Delta FIFO 中的 item,然后 watch 资源对象,最后交给 watchHandler 处理,所以 watchHandler 基本可以猜到是将有变化的资源添加到 Delta FIFO 中,我们具体来看

!FILENAME tools/cache/reflector.go:287

Expand Down Expand Up @@ -150,11 +169,13 @@ loop:
}
```

## Informer (controller) - pop obj from delta fifo
## 第三、四、五步:Informer - pop obj from delta fifo、Add obj to store

先看 Controller 是什么

### Controller

一个 Informer 需要实现 Controller 接口:
Informer 会实现 Controller 接口,这个接口长这样

!FILENAME tools/cache/controller.go:82

Expand All @@ -166,7 +187,7 @@ type Controller interface {
}
```

一个基础的 Controller 实现如下
和这个 Controller 对应的有一个基础 controller 实现

!FILENAME tools/cache/controller.go:75

Expand All @@ -183,7 +204,7 @@ controller 类型结构如下:

![1556088003902](image/informer/1556088003902.png)

可以看到主要对外暴露的逻辑是 Run() 方法,我们看一下 Run() 中的逻辑:
可以看到主要对外暴露的逻辑是 Run() 方法,还有一个重点 processLoop() 其实也在 Run() 里面被调用,我们看一下 Run() 中的逻辑:

!FILENAME tools/cache/controller.go:100

Expand Down Expand Up @@ -233,7 +254,7 @@ func (c *controller) processLoop() {

这里的 Queue 就是 Delta FIFO,Pop 是个阻塞方法,内部实现时会逐个 pop 队列中的数据,交给 PopProcessFunc 处理。我们先不看 Pop 的实现,关注一下 PopProcessFunc 是如何处理 Pop 中从队列拿出来的 item 的。

PopProcessFunc 是一个类型:
PopProcessFunc 是一个类型,如下

`type PopProcessFunc func(interface{}) error`

Expand Down Expand Up @@ -274,9 +295,9 @@ Process: func(obj interface{}) error {
- clientState
- ResourceEventHandler (h)

我们一一来看
我们后面会一一分析到。

## Add obj to Indexer (Thread safe store)
### clientState

前面说到 clientState,这个变量的初始化是`clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)`

Expand Down Expand Up @@ -307,7 +328,7 @@ type Indexer interface {
}
```

顺带看一下 NewThreadSafeStore()
顺带看一下 **NewThreadSafeStore()**

!FILENAME tools/cache/thread_safe_store.go:298

Expand Down Expand Up @@ -355,13 +376,13 @@ func (c *threadSafeMap) Add(key string, obj interface{}) {
}
```

第四步和第五步的内容先分析到这里,后面关注 threadSafeMap 实现的时候再继续深入。
这块逻辑先分析到这里,后面关注 threadSafeMap 实现的时候再继续深入。

## sharedIndexInformer
## 第六步:Dispatch Event Handler functions

第六步是 Dispatch Event Handler functions(Send Object to Custom Controller)
我们先看一个接口 SharedInformer

我们先看一个接口 SharedInformer:
### sharedIndexInformer

!FILENAME tools/cache/shared_informer.go:43

Expand Down

0 comments on commit d3f9a0b

Please sign in to comment.