Use API

Concept

kubectl createkubectl replace是命令式配置文件操作,kubectl apply是声明式API。

Practice

本地编写一个nginx-deployment.yaml文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
labels:
app: nginx
spec:
replicas: 3
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.7.9
ports:
- containerPort: 80

使用kubectl apply命令来创建这个Deployment:

1
2
$ kubectl apply -f nginx-deployment.yaml
deployment.apps/nginx-deployment created

然后修改一下nginx-deployment.yaml里定义的镜像:

1
2
3
4
5
6
7
...
spec:
containers:
- name: nginx
image: nginx:1.21.6
ports:
- containerPort: 80

这时继续执行kubectl apply命令:

1
2
$ kubectl apply -f nginx-deployment.yaml
deployment.apps/nginx-deployment configured

这时,Kubernetes就会触发这个Deployment的滚动更新。

Problem

这样触发的更新与kubectl createkubectl replace有什么区别呢?

kubectl replace的执行过程,是使用新的YAML文件中的API对象,替换原有的API对象;而kubectl apply,则是执行了一个对原有API对象的PATCH操作。

更进一步地,这意味着kube-apiserver在响应命令式请求(比如kubectl replace)对时候,一次只能处理一个写请求,否则会有产生冲突的可能。而对于声明式请求(比如kubectl apply)的时候,一次能处理多个写操作,并且具体Merge的能力。

Meaning

Istio

下面以Istio项目为例,学习声明式API在实际使用时的重要意义。

Istio官网:https://istio.io/latest/zh/

Server Mesh:https://www.servicemesher.com/istio-handbook/concepts/basic.html

Istio是一个开源的Service Mesh实现产品,实际上就是一个基于Kubernetes项目的微服务治理框架,它的架构非常清晰,如下所示:


在上面这个架构图中,不难看到Istio项目架构的核心所在。Istio最根本的组件,是运行在每一个应用Pod里的Envoy容器。

这个Envoy项目是Lyft公司推出的一个高性能C++网络代理,也是Lyft公司对Istio项目的唯一贡献。

Istio项目,则把这个代理服务以sidecar容器的方式,运行在了每一个被治理的应用Pod中。Pod里的所有容器都共享同一个Network Namespace。所以,Envoy容器就能够通过配置Pod里面的iptables规则,把整个Pod的进出流量接管下来。

这时候,Istio的控制层(Control Plane)里的Pilot组件,就能够通过调用每个Envoy容器的API,对这个Envoy代理进行配置,从而实现微服务治理。

看个🌰:假设这个Istio架构图左边的Pod是已经在运行的应用,而右边的Pod则是我们刚刚上线的应用的新版本。这时候,Pilot通过调节这两Pod里的Enovy容器的配置,从而将90%的流量分配给旧版本的应用,将10%的流量分配给新版本应用,并且,还可以在后续的过程中随时调整。这样,一个典型的灰度发布的场景就完成了。比如,Istio可以调节这个流量从90%-10%,改到80%-20%,再到50%-50%,最后到0%-100%,就完成了这个灰度发布的过程。

更重要的是,在整个微服务治理的过程中,无论是对Envoy容器的部署,还是像上面这样对Envoy代理的配置,用户和应用都是完全“无感”的。

Istio项目使用的,是Kubernetes中一份非常重要的功能,叫做Dynamic Admission Controll

Kubernetes项目中,当一个Pod或者任何一个API对象被提交给APIServer之后,总有一些“初始化”性质的工作需要在它们被Kubernetes项目正式处理之前进行。比如,自动为所有Pod加上某些标签(Labels)。

而这个“初始化”操作的实现,借助的是一个叫作Admission的功能。它其实是Kubernetes项目里一组被称为Admission Controller的代码,可以选择性地被编译进APIServer中,在API对象创建之后会被立刻调用到。

但这就意味着,如果现在想要添加一些自己的规则到Adminssion Controller,就会比较困难。因为,这要求重新编译并重启APIServer。显然,这种使用方法对Istio来说,影响太大了。

所以,Kubernetes项目为我们提供了一种“热插拔”式的Admission机制,它就是Dynamic Admission Control,也叫作Initializer

Example

比如,现在有如下所示的一个应用Pod

1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: Pod
metadata:
name: myapp-pod
labels:
app: myapp
spec:
containers:
- name: myapp-container
image: busybox
command: ['sh', '-c', 'echo Hello Kubernetes! && sleep 3600']

可以看到,这个Pod里面只有一个用户容器,叫作:myapp-container

接下来,Istio项目要做的,就是在这个Pod的YAML被提交给Kubernetes之后,在它对应的API对象里自动加上Envoy容器的配置,使这个对象变成如下所示的样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
apiVersion: v1
kind: Pod
metadata:
name: myapp-pod
labels:
app: myapp
spec:
containers:
- name: myapp-container
image: busybox
command: ['sh', '-c', 'echo Hello Kubernetes! && sleep 3600']
- name: envoy
image: lyft/envoy:845747b88f102c0fd262ab234308e9e22f693a1
command: ["/usr/local/bin/envoy"]
...

可以看到,被Istio处理后的这个Pod里,除了用户自己定义的myapp-container容器之外,多出了一个叫作envoy的容器,它就是IStio要使用的Envoy代理。

Basic Principle

那么,Istio又是如何在用户完全不知情的前提下完成这个操作的呢?

Istio要做的,就是编写一个用来为Pod“自动注入”Envoy容器的Initializer

首先,Istio会将这个Envoy容器本身的定义,以ConfigMap的方式保存在Kubernetes当中。这个ConfigMap(名叫:envoy-initializer)的定义如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
apiVersion: v1
kind: ConfigMap
metadata:
name: envoy-initializer
data:
config: |
containers:
- name: envoy
image: lyft/envoy:845747db88f102c0fd262ab234308e9e22f693a1
command: ["/usr/local/bin/envoy"]
args:
- "--concurrency 4"
- "--config-path /etc/envoy/envoy.json"
- "--mode serve"
ports:
- containerPort: 80
protocol: TCP
resources:
limits:
cpu: "1000m"
memory: "512Mi"
requests:
cpu: "100m"
memory: "64Mi"
volumeMounts:
- name: envoy-conf
mountPath: /etc/envoy
volumes:
- name: envoy-conf
configMap:
name: envoy

这个ConfigMapdata部分,正是一个Pod对象的一部分定义,其中,我们可以看到Envoy容器对应的containers字段,以及一个用来声明Envoy配置文件的volumes字段。

不难想到,Initializer要做的工作,就是把这部分Envoy相关的字段,自动添加到用户提交的PodAPI对象里。可是,用户提交的Pod里本来就有containers字段和volumes字段,所以Kubernetes在处理这样的更新请求时,就必须使用类似于git merge这样的操作,才能将这两部分内容结合在一起。

所以说,在Initializer更新用户的Pod对象的时候,必须使用PATCH API来完成。而这种PATCH API,正是声明式API最主要的能力。

接下来,Istio将一个编写好的Initializer,作为一个Pod部署在Kubernetes中。这个Pod的定义非常简单,如下所示:

1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: Pod
metadata:
labels:
app: envoy-initializer
name: envoy-initializer
spec:
containers:
- name: envoy-initializer
image: envoy-initializer:0.0.1
imagePullPolicy: Always

可以看到,这个envoy-initializer使用的envoy-initializer:0.0.1镜像,就是一个事先编写好的“自定义控制器”(Custom Controller)。

一个Kubernetes Controller,实际上就是一个死循环:它不断地获取实际状态,然后与期望状态,并以此为依据决定下一步的操作。

Initializer的控制器,不断获取到的实际状态,就是用户新创建的Pod。而它的“期望状态”,则是:这个Pod里被添加了Envoy容器的定义。

用Go语言的伪代码描述这个控制逻辑,如下所示:

1
2
3
4
5
6
7
8
9
for {
// 获取新创建的Pod
pod := client.GetLatestPod()
// Diff一下,检查是否已经初始化过
if !isInitialized(pod) {
// 没有?那就来初始化一下
doSomthing(pod)
}
}
  • 如果这个Pod里面已经添加过Envoy容器,那么就“放过”这个Pod,进入下一个检查周期。
  • 而如果还没有添加过Envoy容器的话,它就要进行Initilize操作了,即:修改该PodAPI对象(doSomething函数)。

这时候,Istio要往这个Pod里合并的字段,正是我们之前保存在envoy-initializer这个ConfigMap里的数据(即:它的data字段的值)。

所以,在Initializer控制器的工作逻辑里,它首先会从APIServer中拿到这个ConfigMap

1
2
3
func doSomthing(pod) {
cm := client.Get(Config, "envoy-initializer")
}

然后,把这个ConfigMap里存储的containersvolumes字段,直接添加进一个空的Pod对象里:

1
2
3
4
5
6
7
func doSomething(pod) {
cm := client.Get(Config, "envoy-initializer")

newPod := Pod{}
newPod.Spec.Containers = cm.Containers
newPod.Spec.Volumes = cm.Volumes
}

现在,关键来了。KubernetesAPI库,提供了一个方法,可以直接使用新旧两个Pod对象,剩菜一个TwoWayMergePatch

1
2
3
4
5
6
7
8
9
10
11
12
13
func doSomething(pod) {
cm := client.Get(Config, "envoy-initializer")

newPod := Pod{}
newPod.Spec.Containers = cm.Containers
newPod.Spec.Volumes = cm.Volumes

// 生成patch数据
patchBytes := strategicpatch.CreateTwoWayMergePatch(pod, newPod)

// 发起PATCH请求,修改这个Pod对象
client.Patch(pod.Name, patchBytes)
}

有了这个TwoWayMergePatch之后,Initializer的代码就可以使用这个patch的数据,调用KubernetesClient,发起一个PATCH请求。

这样,一个用户提交的Pod对象里,就会被自动加上Envoy容器相关的字段。

Other Method

当然,Kubernetes还允许通过配置,来制定要对什么样的资源进行这个Initialize操作,比如下面这个🌰:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
apiVersion: admissionregistration.k8s.io/v1alpha1
kind: InitializerConfiguration
metadata:
name: envoy-config
initializers:
// 这个名字必须至少包括两个 "."
- name: envoy.initializer.kubernetes.io
rules:
- apiGroups:
- "" // 前面说过, ""就是core API Group的意思
apiVersions:
- v1
resources:
- pods

这个配置,就意味着Kubernetes要对所有的Pod进行这个Initialize操作,并且,我们指定了负责这个操作的Initializer,名叫:envoy-initializer

而一旦这个InitializerConfiguration被创建,Kubernetes就会把这个Initializer的名字,加在所有新创建的PodMetadata上,格式如下所示:

1
2
3
4
5
6
7
8
9
10
apiVersion: v1
kind: Pod
metadata:
initializers:
pending:
- name: envoy.initializer.kubernetes.io
name: myapp-pod
labels:
app: myapp
...

可以看到,每一个新创建的Pod,都会自动懈怠了metadata.initializer.pendingMetadata信息。

这个Metadata,正是接下来Initializer的控制器判断这个Pod有没有执行过自己所负责的初始化操作的重要依据(也就是前面伪代码中isInitialized()方法的含义)。

这也意味着,当你在Initializer里完成了要做的操作后,一定要记得将这个metadata.initializers.pending标志清除掉。这一点,在编写Initializer代码的时候一定要非常注意。

此外,除了上面的配置方法,还可以在具体的PodAnnotation里添加一个如下所示的字段,从而声明要使用某个Initializer

1
2
3
4
5
6
apiVersion: v1
kind: Pod
metadata
annotations:
"initializer.kubernetes.io/envoy": "true"
...

在这个Pod里,我们添加了一个Annotation,写明:initializer.kubernetes.io/envoy=true。这样,就会使用到我们前面所定义的envoy-initializer了。

Summary

Istio项目的核心,就是由无数个运行在应用Pod中的Envoy容器组成的服务代理网格。这也正是Service Mesh的含义。

而这个机制得以实现的原理,正是借助了Kubernetes能够对API对象进行在线更新的能力,这也正是Kubernetes“声明式API”的独特之处。

  • 首先,所谓“声明式”,指的就是我只需要提交一个定义好的API对象来“声明”,我所期望的状态是什么样子。
  • 其次,”声明式API“允许有多个API写端,以PATCH的方式对API对象进行修改,而无需关心本地原始YAML文件的内容。
  • 最后,也是最重要的,有了上述两个能力,Kubernetes项目才可以基于对API对象的增、删、改、查,在完全无需外界干预的情况下,完成对实际状态和期望状态的调谐(Reconcile)过程。

所以说,声明式API,才是Kubernetes项目编排能力“赖以生存”的核心所在。

无论是对sidecar容器的巧妙设计,还是对Initializer的合理利用,Istio项目的设计与实现,其实都依托于Kubernetes的声明式API和它所提供的各种编排能力。可以说,IstioKubernetes项目使用上的一位“集大成者”。

API’s secret

Design

一直好奇:当我把一个YAML文件提交给Kubernetes之后,它究竟是如何创建出一个API对象的呢?

Kubernetes中,一个API对象在Etcd里的完整资源路径,是由Group(API组)、Version(API版本)和Resource(API资源类型)三个部分组成的。

通过这样的结构,整个Kubernetes里的所有API对象,实际上就可以用如下的树形结构表示出来:


在这张图里,可以清除地看到KubernetesAPI对象的组织方式,其实是层层递进的。

比如,现在要声明创建一个CronJob对象,那么YAML文件的开始部分会这么写:

1
2
3
apiVersion: batch/v2alpha1
kind: CronJob
...

在这个YAML文件中,“CronJob”就是这个API对象的资源类型(Resource),“batch”就是它的组(Group),“v2alpha1”就是它的版本(Version)。

当我们提交了这个YAML文件之后,Kubernetes就会把这个YAML文件里描述的内容,转换成Kubernetes里的一个CronJob对象。

Problem

那么,Kubernetes是如何对ResourceGroupVersion进行解析,从而在Kubernetes项目里找到CronJob对象的定义呢?

首先,Kubernetes会匹配API对象的组。

需要明确的是,对于Kubernetes里的核心API对象,比如:PodNode等,是不需要Group的(即它们的Group是“”)。所以,对于这些API对象来说,Kubernetes会直接在/api这个层级进行下一步的匹配过程。

而对于CronJob等非核心API对象来说,Kubernetes就必须在/apis这个层级里查找它对应的Group,进而根据“batch”这个Group的名字,找到/apis/batch

不难发现,这些API Group的分类是以对象功能为依据的,比如JobCronjob就都属于“batch”(离线业务)这个Group

然后,Kubernetes会进一步匹配到API对象的版本号。

对于CronJob这个API对象来说,Kubernetesbatch这个Group下,匹配到的版本号就是v2alpha1

Kubernetes中,同一种API对象可以有多个版本,这正是Kubernetes进行API版本化管理的重要手段。这样,比如在CronJob的开发过程中,对于会影响到用户的变更就可以通过升级新版本号来处理,从而保证了向后兼容。

最后,Kubernetes会匹配API对象的资源类型。

在前面匹配到正确的版本之后,Kubernetes就知道要创建一个/apis/batch/v2alpha1下的CronJob对象。

这时候,APIServer就可以继续创建这个CronJob对象了。

流程图如下:


首先,当我们发起了创建CronJobPOST请求之后,我们编写的YAML的信息就被提交给了APIServer

APIServer的第一个功能,就是过滤这个请求,并完成一些前置性的工作,比如授权、超时处理、审计等。

然后,请求会进入MUXRoutes流程。MUXRoutesAPIServer完成URLHandler绑定的场所。而APIServerHandler要做的事情,就是按照上述匹配流程,找到对应的CronJob类型定义。

接着,APIServer最重要的职责就来了:根据这个CronJob类型定义,使用用户的YAML文件里的字段,创建一个CronJob对象。

而在这个过程中,APIServer会进行一个Convert工作,即:把用户提交的YAML文件,转换成一个叫作Super Version的对象,它正是该API资源类型所有版本的字段全集。这样用户提交的不同版本的YAML文件,就都可以用这个Super Version对象来进行处理了。

接下来,APIServer会先后进行Admission()Validation()操作。Admission ControllerInitializer,就都属于Admission的内容。

Validation,则负责验证验证这个对象里的各个字段是否合法。这个被验证过的API对象,都保存在了APIServer里一个叫作Registry的数据结构中。也就是说,只要一个API对象的定义能在Registry里查到,它就是一个有效的Kubernetes API对象。

最后,APIServer会把验证过的API对象转换成用户最初提交的版本,进行序列化操作,并调用EtcdAPI把它保存起来。

由此可见,声明式API对于Kubernetes来说非常重要。所以,APIServer这样一个在其他项目里“平淡无奇”的组件,却成了KUbernetes项目的重中之重。它不仅是Google Borg设计思想的集中体现,也是Kubernetes项目里唯一一个被Google公司和RedHat公司双重控制、其他势力根本无法参与其中的组件。

此外,由于同时要兼顾性能、API完备性、版本化、向后兼容等很多工程化指标,所以Kubernetes团队在APIServer项目里大量使用了Go语言的代码生成功能,来自动化注入ConvertDeepCopy等于API资源相关的操作。这部分自动生成的代码,曾一度占到Kubernetes项目总代码的20%~20%。

这也是为何,在过去很长一段时间里,在这样一个极其“复杂”的APIServer中,添加一个Kubernetes风格的API资源类型,是一个非常困难的工作。

CRD

不过,在Kubernetes v1.7之后,这个工作就变得轻松得多了。这,得益于一个全新的API插件机制:CRD

CRD的全称是Custom Resource Definition。顾名思义,它指的就是,允许用户在Kubernetes中添加一个跟PodNode类似的、新的API资源类型,即:自定义API资源。

举个🌰:现在要为Kubernetes添加一个名叫NetworkAPI资源类型。

它的作用是,一旦用户创建一个Network对象,那么Kubernetes就应该使用这个对象定义的网络参数,调用真实的网络插件,比如Neturon项目,为用户创建一个真正的“网络”。这样,将来用户创建的Pod,就可以声明使用这个“网络”了。

Custom Controller

🚀code:https://github.com/Khighness/highness-network-controller

Network这个自定义API对象编写一个自定义控制器(Custom Controller)。

“声明式API”并不像”命令式API“那样有着明显的执行逻辑。这就使得基于声明式API的业务功能实现,往往需要控制器模式来“监视”API对象的变化(比如,创建或者删除Network),然后以此来决定实际要执行的具体工作。

总的来说,编写自定义控制器代码的过程包括:编写main函数、编写自定义控制器的定义,以及编写控制器里的业务逻辑三个部分。

Main Function

main函数的主要工作就是,定义并出示好一个自定义控制器(Custom Controller),然后启动它。这部分的主要内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
flag.Parse()
// handler the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

// 1.1 build config via the url of APIServer and the path of kubeConfig
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeConfig)
// 1.2 create the client of kubernetes
kubeClient, err := kubernetes.NewForConfig(cfg)
// 1.3 create the client of network
networkClient, err := clientset.NewForConfig(cfg)

// 2. create a factory for network, and transfer if to controller
networkInformerFactory := informers.NewSharedInformerFactory(networkClient, 30*time.Second)
controller := NewController(kubeClient, networkClient, networkInformerFactory.Samplecrd().V1().Networks())

// 3. start the factory and the controller
go networkInformerFactory.Start(stopCh)
if err = controller.Run(2, stopCh); err != nil {
glog.Fatalf("Error running controller: %s", err.Error())
}
}

这个main函数主要通过三步完成了初始化并启动了一个自定义控制器的工作。

Working Principle

一个自定义控制器的工作原理,可以用下面这样一幅流程图来表示:


从这幅示意图的最左边看起。


这个控制器要做的第一件事,是从KubernetesAPIServer里获取它所关心的对象,也就是定义的Network对象。

这个操作,依靠的是一个叫做Informer(可以翻译为:通知器)的代码库完成的。InformerAPI对象是一一对应的,所以传递给自定义控制器的,正是一个Network对象的InformerNetwork Informer)。

在创建这个Informer工厂的时候,需要给它传递一个networkClient

事实上,Network Informer正是使用这个networkClient,跟APIServer建立了连接。不过,真正负责维护这个连接的,则是Informer所使用的Reflector包。

更具体地说,Reflector使用的是一种叫作ListAndWatch的方法,来获取并监听这些Network对象实例的变化。

ListAndWatch机制下,一旦APIServer端有新的Network实例被创建、删除或者更新,Reflector都会收到事件通知。这时,该事件及它对应的API对象这个组合,就被称为增量(Delta),它会被放进一个Delta FIFO Queue(增量先进先出队列)中。

而另一方面,Informer会不断地从这个Delta FIFO Queue里读去(Pop)增量。每拿到一个增量,Informer就会判断这个增量里的事件类型,然后创建或者更新本地对象的缓存。这个缓存,在Kubernetes里一般被叫作Store

比如,如果事件类型是Added(添加对象),那么Informer就会通过一个叫作Indexer的库把这个增量里的API对象保存到本地缓存中,并为它创建索引。相反,如果增量的事件类型是Deleted(删除对象),那么Informer就会从本地缓存中删除这个对象。

这个同步缓存的工作,是Informer的第一个职责,也是它最重要的职责。


Informer的第二个职责,则是根据这些事件的类型,触发事先注册好的ResourceEventHandler。这些Handler,需要在创建控制器的时候注册给它对应的Informer

接下来,编写这个这个控制器的定义,它的主要内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func NewController(
kubeClientSet kubernetes.Interface,
networkClientSet clientset.Interface,
networkInformer informers.NetworkInformer) *Controller {

runtime.Must(networkschema.AddToScheme(scheme.Scheme))
glog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

controller := &Controller{
kubeClientSet: kubeClientSet,
networkClientSet: networkClientSet,
networkListener: networkInformer.Lister(),
networkSynced: networkInformer.Informer().HasSynced,
workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Networks"),
recorder: recorder,
}

glog.Info("Setting up event handlers")
networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueNetwork,
UpdateFunc: func(oldObj, newObj interface{}) {
oldNetwork := oldObj.(*samplecrdv1.Network)
newNetwork := newObj.(*samplecrdv1.Network)
if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {
return
}
controller.enqueueNetwork(newObj)
},
DeleteFunc: controller.enqueueNetworkForDelete,
})

return controller
}

在main函数中创建了两个clientkubeClientnetworkClient),然后在这段代码里,使用这两个client和前端创建的Informer,初始化了自定义控制器。

值得注意的是,在这个自定义控制器里,设置了一个工作队列(work queue),它正是处于示意图中间位置的WorkQueue。这个工作队列的作用是,负责同步Informer和控制循环之间的数据。

实际上,Kubernetes提供了很多个工作队列的实现,可以根据需要选择合适的库直接使用。

然后,为networkInformer注册了三个HandlerAddFuncUpdateFuncDeleteFunc),分别对应API对象的“添加”“更新”和”删除“事件。而具体的处理操作,都是将该事件对应的API对象加入到工作队列中。

需要注意的是,实际入队的并不是API对象本身,而是它们本身的Key,即:该API对象的namespace/name。

而后面即将编写的控制循环,则会不断地从这个工作队列里拿到这些Key,然后开始执行真正的控制逻辑。

综上所述,所谓Informer,其实就是一个带有本地缓存和索引机制的、可以注册EventHandlerclient。它是自定义控制器跟APIServer进行数据同步的重要组件。

更具体地说,Informer通过一种叫作ListAndWatch的方法,把APIServerAPI对象缓存在了本地,并负责更新和维护这个缓存。

其中,ListAndWatch方法的含义是:首先,通过APIServerLIST API“获取”所有最新版本的API对象;然后,再通过WATCH API来“监听”所有这些API对象的变化。

而通过监听到的事件变化,Informer就可以实时更新本地缓存,并且调用这些事件对应的EventHandler了。

此外,在这个过程中,每经过resyncPeroid指定的时间,Informer维护的本地缓存,都会使用最近一次LIST返回的结果强制更新一次,从而保证缓存的有效性。在Kubernetes中,这个缓存强制更新的操作就叫做:resync

需要注意的是,这个定时resync操作,也会触发Informer注册的“更新”事件。但此时,这个“更新”事件对应的Network对象实际上并没有发生变化,即:新、旧两个Network对象的ResourceVersion是一样的。在这种情况下,Informer就不需要对这个更新事件再做进一步的处理了。

这也是为什么上面的UpdateFunc方法里,先判断了一下新、旧两个Network对象的版本(ResourceVersion)是否发生了变化,然后才开始进行的入队操作。

以上,就是Kubernetes中的Informer库的工作原理。


接下来,我们即来到了示意图中最后面的控制循环(Control Loop)部分,也正是main函数最后调用controller.Run()启动的“控制循环”。 它的主要内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.workQueue.ShutDown()

glog.Info("Starting network control loop")

glog.Info("Waiting for informers caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.networkSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

glog.Info("Starting workers")
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

glog.Info("Started workers")
<-stopCh
glog.Info("Shutting down workers")

return nil
}

可以看到,启动控制循环的逻辑非常简单:

  • 首先,等待Infomer完成一次本地缓存的数据同步操作;
  • 然后,直接通过goroutine启动一个(或者并发启动多个)“无限循环”的任务。

而这个“无限循环”任务的每一个循环周期,执行的正是我们真正的业务逻辑。

接下来,就编写这个自定义控制器的业务逻辑,它的主要内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}

func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workQueue.Get()
if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.workQueue.Done(obj)
var (
key string
ok bool
)
if key, ok = obj.(string); !ok {
c.workQueue.Forget(obj)
runtime.HandleError(fmt.Errorf("expected string in workQueue but get %#v", obj))
return nil
}
if err := c.syncHandler(key); err != nil {
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
}
c.workQueue.Forget(obj)
glog.Infof("Successfully sync '%s'", key)
return nil
}(obj)

if err != nil {
runtime.HandleError(err)
return true
}

return true
}

func (c *Controller) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(err)
return nil
}

network, err := c.networkListener.Networks(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
glog.Warningf("Network: %s/%s does not exist in local cache, will delete it from Neutron ...",
namespace, name)
glog.Infof("[Neutron] Deleting network: %s/%s ...", namespace, name)
return nil
}
runtime.HandleError(fmt.Errorf("failed to list nerwork by: %s/%s", namespace, name))
return err
}

glog.Infof("[Neutron] Try to process network: %#v ...", network)
c.recorder.Event(network, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}

可以看到,在这个执行周期里(processNextWorkItem),首先从工作队列里出队(workqueue.Get)了一个成员,也就是一个KeyNetwork对象的:namespace/name)。

然后,在syncHandler方法中,使用这个Key,尝试从Informer维护的缓存中拿到了它所对应的Network对象。

可以看到,在这里,使用了networkListener来尝试获取这个Key对应的Network对象。这个操作,其实就是在访问本地缓存的索引。实际上,在Kubernetes的源码中,可以经常看到控制器从各种Listener里获取对象,比如:podListenernodeListener等等,它们使用的都是Informer和缓存机制。

而如果控制循环从缓存中拿不到这个(即:networkListener返回了isNotFound错误),那就意味着这个Network对象的Key是通过前面的“删除”事件添加进工作队列的。所以,尽管队列里有这个Key,但是对应的Network对象已经被删除了。

这时候就需要调用NeutronAPI,把这个KeyNeutron网络从真实的集群里删除掉。

而如果能够获取到对应的Network对象,就可与执行控制器模式里的对比“期望状态”和“实际状态”的逻辑了。

其中,自定义控制器“千辛万苦”拿到的这个Network对象,正是APIServer里保存的”期望状态”,即:用户通过YAML文件提交到APIServer里的信息。当然,在例子里,它已经被Informer缓存在了本地。

那么,“实际状态”又从哪里来呢?

当然是来自于实际的集群了,所以,控制循环需要通过Neutron API来查询实际的网络情况。

比如,可以通过Neutron来查询这个Network对象对应的真实网络是否存在。

  • 如果不存在,这就是一个典型的“期望状态”与“实际状态”不一致的情形。这时,就需要使用这个Network对象里的信息(比如:CIDRGateway),调用Neutron API来创建真实的网络。
  • 如果存在,那么,就要读取这个真实网络的信息,判断它是否跟Network对象里的信息一致,从而决定是否要通过Neutron来更新这个已经存在的真实网络。

这样,就可以通过对比“期望状态”和“实际状态”的差异,完成了一次调谐(Reconcile)的过程。

Build and Run

可以通过如下流程将项目编译成二进制文件:

1
2
3
4
$ git clone https://github.com/Khighness/highness-network-controller
$ cd highness-network-controller
$ go mod tidy
$ go build -o highness-network-controller .

尝试运行自定义控制器,如下所示:

1
2
3
4
5
6
$ ./highness-network-controller -kubeconfig=$HOME/.kube/config -alsologtostderr=true
I0617 09:21:41.741895 5389 controller.go:77] Setting up event handlers
I0617 09:21:41.746107 5389 controller.go:103] Starting network control loop
I0617 09:21:41.746139 5389 controller.go:106] Waiting for informers caches to sync
E0617 09:21:41.772859 5389 reflector.go:134] pkg/mod/k8s.io/client-go@v0.0.0-20180902073250-f06dbfd73543/tools/cache/reflector.go:95: Failed to list *v1.Network: the server could not find the requested resource (get networks.samplecrd.k8s.io)
E0617 09:21:42.775466 5389 reflector.go:134] pkg/mod/k8s.io/client-go@v0.0.0-20180902073250-f06dbfd73543/tools/cache/reflector.go:95: Failed to list *v1.Network: the server could not find the requested resource (get networks.samplecrd.k8s.io)

可以看到,一开始启动后会报错。

这是因为,此时Network对象的CRD还没有被创建出来,所以InformerAPIServer里“获取”(ListNetwork对象时,并不能找到Network这个API资源类型的定义。

接下来,创建Network对象的CRD:

1
2
$ kubectl apply -f crd/network.yaml
customresourcedefinition.apiextensions.k8s.io/networks.samplecrd.k8s.io created

这时候,可以看到控制器的日志恢复了正常,控制循环启动成功:

1
2
I0617 09:22:45.946500    5389 controller.go:111] Starting workers
I0617 09:22:45.946518 5389 controller.go:117] Started workers

接下来,创建一个Network对象:

1
2
3
4
5
6
7
8
9
10
11
$ cat example/example-network.yaml
apiVersion: samplecrd.k8s.io/v1
kind: Network
metadata:
name: example-network
spec:
cidr: "192.168.0.0/16"
gateway: "192.168.0.1"

$ kubectl apply -f example/example-network.yaml
network.samplecrd.k8s.io/example-network created

这时候,查看一下控制器的日志:

1
2
3
I0617 09:23:31.766052    5389 controller.go:189] [Neutron] Try to process network: &v1.Network{TypeMeta:v1.TypeMeta{Kind:"", APIVersion:""}, ObjectMeta:v1.ObjectMeta{Name:"example-network", GenerateName:"", Namespace:"default", SelfLink:"/apis/samplecrd.k8s.io/v1/namespaces/default/networks/example-network", UID:"597cfbff-00f0-447c-a8d5-6cec45c26222", ResourceVersion:"23354", Generation:1, CreationTimestamp:time.Date(2022, time.June, 17, 9, 23, 31, 0, time.Local), DeletionTimestamp:<nil>, DeletionGracePeriodSeconds:(*int64)(nil), Labels:map[string]string(nil), Annotations:map[string]string{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"samplecrd.k8s.io/v1\",\"kind\":\"Network\",\"metadata\":{\"annotations\":{},\"name\":\"example-network\",\"namespace\":\"default\"},\"spec\":{\"cidr\":\"192.168.0.0/16\",\"gateway\":\"192.168.0.1\"}}\n"}, OwnerReferences:[]v1.OwnerReference(nil), Initializers:(*v1.Initializers)(nil), Finalizers:[]string(nil), ClusterName:""}, Spec:v1.NetworkSpec{Cidr:"192.168.0.0/16", Gateway:"192.168.0.1"}} ...
I0617 09:23:31.766972 5389 controller.go:155] Successfully sync 'default/example-network'
I0617 09:23:31.767115 5389 event.go:221] Event(v1.ObjectReference{Kind:"Network", Namespace:"default", Name:"example-network", UID:"597cfbff-00f0-447c-a8d5-6cec45c26222", APIVersion:"samplecrd.k8s.io/v1", ResourceVersion:"23354", FieldPath:""}): type: 'Normal' reason: 'Synced' Network synced successfully

再修改一下YAML文件的内容,并提交更新,如下所示:

1
2
3
4
5
6
7
8
9
10
11
$ cat example/example-network.yaml
apiVersion: samplecrd.k8s.io/v1
kind: Network
metadata:
name: example-network
spec:
cidr: "192.168.0.0/16"
gateway: "192.168.1.1"

$ kubectl apply -f example/example-network.yaml
network.samplecrd.k8s.io/example-network configured

这时候,查看一下控制器的日志:

1
2
3
I0617 09:32:57.516092    5389 controller.go:189] [Neutron] Try to process network: &v1.Network{TypeMeta:v1.TypeMeta{Kind:"", APIVersion:""}, ObjectMeta:v1.ObjectMeta{Name:"example-network", GenerateName:"", Namespace:"default", SelfLink:"/apis/samplecrd.k8s.io/v1/namespaces/default/networks/example-network", UID:"597cfbff-00f0-447c-a8d5-6cec45c26222", ResourceVersion:"24722", Generation:4, CreationTimestamp:time.Date(2022, time.June, 17, 9, 23, 31, 0, time.Local), DeletionTimestamp:<nil>, DeletionGracePeriodSeconds:(*int64)(nil), Labels:map[string]string(nil), Annotations:map[string]string{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"samplecrd.k8s.io/v1\",\"kind\":\"Network\",\"metadata\":{\"annotations\":{},\"name\":\"example-network\",\"namespace\":\"default\"},\"spec\":{\"cidr\":\"192.168.0.0/16\",\"gateway\":\"192.168.1.1\"}}\n"}, OwnerReferences:[]v1.OwnerReference(nil), Initializers:(*v1.Initializers)(nil), Finalizers:[]string(nil), ClusterName:""}, Spec:v1.NetworkSpec{Cidr:"192.168.0.0/16", Gateway:"192.168.1.1"}} ...
I0617 09:32:57.516378 5389 controller.go:155] Successfully sync 'default/example-network'
I0617 09:32:57.516393 5389 event.go:221] Event(v1.ObjectReference{Kind:"Network", Namespace:"default", Name:"example-network", UID:"597cfbff-00f0-447c-a8d5-6cec45c26222", APIVersion:"samplecrd.k8s.io/v1", ResourceVersion:"24722", FieldPath:""}): type: 'Normal' reason: 'Synced' Network synced successfully

最后,删除这个对象:

1
2
$ kubectl delete -f example/example-network.yaml
network.samplecrd.k8s.io "example-network" deleted

这时候,查看一下控制器的日志:

1
2
3
W0617 09:35:56.386759   11958 controller.go:180] Network: default/example-network does not exist in local cache, will delete it from Neutron ...
I0617 09:35:56.386790 11958 controller.go:182] [Neutron] Deleting network: default/example-network ...
I0617 09:35:56.386797 11958 controller.go:155] Successfully sync 'default/example-network'

这一次,在控制器的日志里,可以看到Informer注册的“删除”事件被触发,并且控制循环“调用”Neutron API“删除”了真实环境里的网络。

Expand

实际上,这套流程不仅可以用在自定义API资源上,也完全可以用在Kubernetes原生的默认API对象上。

比如,在main函数里,除了创建一个Network Informer外,还可以初始化一个Kubernetes默认API对象的 Informer工厂,比如Deployment对象的Informer。具体做法如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
...

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)

controller := NewController(kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
networkInformerFactory.Samplecrd().V1().Networks())

go kubeInformerFactory.Start(stopCh)
...
}

在这段代码里,首先使用KubernetesclientkubeClient)创建了一个工厂;

然后,用跟Network类似的处理方法,生成了一个Deployment Informer

接着,把Deployment Informer传递给了自定义控制器;当然,还需要调用Start方法来启动这个Deployment Informer

而有了这个Deployment Informer后,这个控制器也就持有了所有Deployment对象的信息。接下来,它既可以通过deploymentInformer.Listener()来获取Etcd里所有Deployment对象,也可用为这个Deployment Informer注册具体的Handler

更重要的是,这就使得在这个自定义控制器里面,可以通过对自定义API对象和默认API对象进行协同,从而实现更加复杂的编排功能。

比如:用户每创建一个新的Deployment,这个自定义控制器,就可以为它创建一个对应的Network供它使用。

Summary

所谓的Informer,就是一个自带缓存和索引机制,可以触发Handler的客户端库。这个本地缓存在Kubernetes中一般被称为Store,索引一般被称为Index

Informer使用了Reflector包,它是一个可以通过ListAndWatch机制获取并监视API对象变化的客户端封装。

ReflectorInformer之间,用到了一个“增量先进先出队列”进行协同。而Informer与需要编写的控制循环之间,则使用了一个工作队列来进行协同。

在实际应用中,除了控制循环之外的所有代码,实际上都是Kubernetes自动生成的,即:pkg/client/{informers, listers, clientset}里的内容。

而这些自动生成的代码,就为我们提供了一个可靠而高效地获取 API 对象“期望状态”的编程库。

所以,接下来,作为开发者,只需要关注如何拿到“实际状态”,然后如何拿它去跟“期望状态”做对比,从而决定接下来要做的业务逻辑即可。