Client Go四种交互模式之 DynamicClient实战案例详解
引子
Kubernetes
赢得了云原生平台之争,同时在绝大多数云原生场景中都凭借其高扩展性担任了重要角色。通过kube-apiserver
提供的开放的模块,在不需要切分一个内外部的接口情况下,让我们具备了在同一应用前提下(Controller
)与集群及其他系统交互的能力,甚至是自定义的资源描述我们特殊的操作,被称之为Operator Pattern
。
尽管我们可以使用HTTP Client
与API Server
之间进行交互,但这其实并不简单。如果我们只考虑kubernetes
的核心resources
,那么这种交互方式恐怕不是最完美的,因为许多resources
请求后响应回的数据结构和可能操作都是不一样的。作为加强,Kubernetes
本身提供了一系列更加方便整合进kubernetes
系统的方式,被称之为client-go
。
这个项目中被最广泛使用客户端是ClientSet
,它是交互客户端的类型之一。这意味着这个接口给kubernetes
的每种资源都提供了可用的扩展方法和操作,这也就是为什么但凡要实现扩展,我们首先就应该考虑这种标准操作kubernetes
内置接口的方式。
但是这也肯定不能满足全部场景,很多情况下我们要自己声明CRD
(Custom Resources Definition
),ClientSet
无法满足我们我们的需求了。这个时候,我们就需要DynamicClient
,也就是k8s.io/client-go/dynamic.Interface
接口,来让我们有能力入侵系统,并实现自定义的功能和操作。
这种方式的好处在于,首先,它避免了对kubernetes
系统的强依赖,让我们可以操作自定义resources
。如果你想通过其他操作作为构建块来建构自动化或者工作流,比如ExternalDNS
,CertManager
,Prometheus Operator
等,通常就是把这些项目,使用Go
语言数据类型,把它们注册到你的Client
实例当中,作为依赖。操作的自由度越大,管控的难度显然就会越大,你需要管理这些扩展功能的迭代升级版本,同时又要保持你已经在集群上安装的版本与go.mod
文件中声明的版本相一致。
第二,你可以协调多种或未知的resources
。当你的operator
实现了一个通用逻辑,它就可以与任何通用的Kubernetes resources
(RBAC
到Pods
)交互,custom resources
也不在话下,这个时候,dynamicClient
就是你唯一的解决的方案。DynamicClient
中有些例子对garbage collection controller
是有重度依赖的,假如你打算对项目里任意的custom resources
提供支持DynamicClient
类型扩展也必不可少,比如KubeWatch
。
因此,深入client-go
项目的组成模块和弄清楚如何使用好它就是要具备的技能。
通过DynamicClient
做基础操作
以下的代码都预设你在`kubernetes cluster`中进行使用。
许多DynamicClient
相关的操作都与TypedClient
类似,比如创建新实例,都可以通过给构造方法提供配置文件来完成。
func newClient() (dynamic.Interface, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
dynClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
return dynClient, nil
}
尽管DynamicClient
对于你通过resources
声明的意图一无所知,它也没提供类似于CoreV1
().Pod
类似的方法。所以得告诉它怎么来解析和处理你提供的resources
,这时候首先要做的,你需要提供一个schema.GroupVersionResource
,它是一个提供了必要信息的golang
数据结构,用它可以来构造针对于API Server
的HTTP
请求。
举例,如果你想实现一个从MongoDB Community Operator
拉取到所有Mongodb resources
列表的功能,你可以这么做:
var monboDBResource = schema.GroupVersionResource{Group: "mongodbcommunity.mongodb.com", Version: "v1", Resource: "mongodbcommunity"}
func ListMongoDB(ctx context.Context, client dynamic.Interface, namespace string) ([]unstructured.Unstructured, error) {
list, err := client.Resource(monboDBResource).Namespace(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
return list.Items, nil
}
这里要注意,如果你处理带有namespace
的resources
,那.Namespace
(namespace
)就是强制的,而使用空字符就是列出所有命名空间下的。
这段代码片段,我们可以看到DynamicClient
的主要组成unstructured.Unstructured
。这是一个封装任意JSON
结构的特别类型,同时也符合标准的Kubernetes interface
,比如runtime.Object
,但是最为重要的是它提供了在unstructure package
下一系列helpers
用于操作这些数据。
扩充下我们上面的例子,如果我们想按一定比例来扩容MongoDB
,我们可以像如下方式使用:
// ScaleMongoDB changes the number of members by the given proportion,
// which should be 0 =< proportion < 1.
func ScaleMongoDB(ctx context.Context, client dynamic.Interface, name string, namespace string, proportion uint) error {
if proportion > 1 {
return fmt.Errorf("proportion should be between 0 =< proportion < 1")
}
mongoDBClient := client.Resource(monboDBResource).Namespace(namespace)
mdb, err := mongoDBClient.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return err
}
members, found, err := unstructured.NestedInt64(mdb.UnstructuredContent(), "spec", "members")
if err != nil {
return err
}
if !found {
return fmt.Errorf("members field not found on MongoDB spec")
}
scaled := int(members) * (1 + int(proportion))
patch := []interface{}{
map[string]interface{}{
"op": "replace",
"path": "/spec/members",
"value": scaled,
},
}
payload, err := json.Marshal(patch)
if err != nil {
return err
}
_, err = mongoDBClient.Patch(ctx, name, types.JSONPatchType, payload, metav1.PatchOptions{})
if err != nil {
return err
}
return nil
}
这里我们利用unstructured.NestedInt64
只访问我们要操作的字段,保证我们与MongoDB CRD
间的耦合最小,同时又可以在安全类型之下来操作resource data
。
unstructured package
有很多像这种的helpers
,不只用于读取,也可以在resource
上的任意字段上进行写入。
在Kubernetes
上执行所有常规操作(get
,list
,watch
,create
,patch
,delete
等)也遵循同样规则:即提供scheme.GroupVersionResource
,同时处理unstructured.Unstructured
结果。
带有DynamicClient
的Controller
更高级和对Kubernetes client
的使用是创建一个controller
,它可对实际集群状态的变化进行响应,把它调整到期望状态。
通常,我们使用Informer
,一个由k8s.io/client-go
提供的模块,状态发生改变、创建就会在指定类型的client
运行一个handler
。幸运的是Dynamic package
提供了一个我们可以使用的Informer
模块。
举例,如果MongoDB
被检测到清除了相关PersistentVolumeClaims
,我们进行捕获,如下:
package main
import (
"fmt"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"time"
)
const maxRetries = 3
var monboDBResource = schema.GroupVersionResource{Group: "mongodbcommunity.mongodb.com", Version: "v1", Resource: "mongodbcommunity"}
type MongoDBController struct {
informer cache.SharedIndexInformer
stopper chan struct{}
queue workqueue.RateLimitingInterface
}
func NewMongoDBController(client dynamic.Interface) (*MongoDBController, error) {
dynInformer := dynamicinformer.NewDynamicSharedInformerFactory(client, 0)
informer := dynInformer.ForResource(monboDBResource).Informer()
stopper := make(chan struct{})
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
})
return &MongoDBController{
informer: informer,
queue: queue,
stopper: stopper,
}, nil
}
func (m *MongoDBController) Stop() {
close(m.stopper)
}
func (m *MongoDBController) Run() {
defer utilruntime.HandleCrash()
defer m.queue.ShutDown()
go m.informer.Run(m.stopper)
if !cache.WaitForCacheSync(m.stopper, m.informer.HasSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
wait.Until(m.runWorker, time.Second, m.stopper)
}
func (m *MongoDBController) runWorker() {
for {
key, quit := m.queue.Get()
if quit {
return
}
err := m.processItem(key.(string))
if err == nil {
m.queue.Forget(key)
} else if m.queue.NumRequeues(key) < maxRetries {
m.queue.AddRateLimited(key)
} else {
m.queue.Forget(key)
utilruntime.HandleError(err)
}
m.queue.Done(key)
}
}
func (m *MongoDBController) processItem(mongodb string) error {
return nil
}
这段绝大多数代码都非常标准,例如使用特定化类型client controller
的work queue
,informer event handler
,item processing
。
此外,使用由DynamicClient
所提供的解耦方式,在terms complexity
上确实让人少了好多头痛事。
用DynamicClient
进行测试
如果我们想用DynamicClient
来进行伸缩,还要让其具备像TypedClient
一样易于测试,这样才能保证整体扩展的灵活性和健壮性。
在上面给出的Controller
例子中,dynamic package
提供了一个等效的伪客户端,允许使用它做存根对象验证及断言行为执行。这里多说两句,如果对于单元测试了解不深入,可能对于我说的概念有点糊涂,模拟通过具有验证调用和交互的能力,而存根通常没有,存根通常用于提供固定数据或控制测试环境,而模拟则用于代码和外部环境间的交互。
package main
import (
"context"
"k8s.io/apimachinery/pkg/runtime"
dynamicfake "k8s.io/client-go/dynamic/fake"
)
func TestDynamicClient(t *testing.T) {
mdb := &unstructured.Unstructured{}
mdb.SetUnstructuredContent(map[string]interface{}{
"apiVersion": "mongodbcommunity.mongodb.com/v1",
"kind": "MongoDBCommunity",
"metadata": map[string]interface{} {
"name": "mongodb-test",
"namespace": "default",
},
"spec": map[string]interface{}{
"members": 3,
},
})
dynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme(), mdb)
NotifyMongoDBs(context.Background(), dynamicClient)
AssertActions(t, dynamicClient.Actions(), []ExpectedAction{
{
Verb: "list",
Namespace: "default",
Resource: "mongodbcommunity",
},
})
}
使用unstructured.Unstructured
类型,我们可以像使用YAML
文件中相同标签一样来创建Kubernetes
对象,但要带有maps
。
在执行测试逻辑之后,我们可以使用dynamicClient
.Actions()
来查看我们代码里执行的所有操作。然而,在每个测试上面手动断言这些actions
,通常会导致代码可读性下降和不够完善的断言。
与此同时,我经常使用特定的断言功能AssertActions
来校验是不是每个预期的action
都可以在可执行actions
中被查找到。关键点在于,此功能无法提供一个准确的列表匹配,举例,如果正在使用的client
执行了一个删除操作,测试无法进行中断,这里仅有的用于失败的AssertAction
条件是列表提供的operation
在预期的列表中查询不到。可以在,仅当预期的actions
被执行到时,来改变断言功能或者创建一个sibling function
来做验证。
尽管当前的实现比较麻烦,但是此功能与DynamicClient
或TypedClient
均可协调使用。
type ExpectedAction struct {
Verb string
Name string
Namespace string
Resource string
// Patch action
PatchType types.PatchType
PatchPayload []map[string]interface{}
}
func AssertActions(t *testing.T, got []kubetesting.Action, expected []ExpectedAction) {
if len(expected) > len(got) {
t.Fatalf("executed actions too short, expected %d, got %d", len(expected), len(got))
return
}
for i, expectedAction := range expected {
if !AssertExpectedAction(got, expectedAction) {
t.Fatalf("action %d does not match any of the got actions", i)
}
}
}
func AssertExpectedAction(got []kubetesting.Action, expectedAction ExpectedAction) bool {
for _, gotAction := range got {
switch expectedAction.Verb {
case "get":
getAction, ok := gotAction.(kubetesting.GetAction)
if !ok {
continue
}
if getAction.GetName() != expectedAction.Name {
continue
}
if !validateNamespaceAndResource(getAction, expectedAction) {
continue
}
return true
case "list":
listAction, ok := gotAction.(kubetesting.ListAction)
if !ok {
continue
}
if !validateNamespaceAndResource(listAction, expectedAction) {
continue
}
return true
case "watch":
watchAction, ok := gotAction.(kubetesting.WatchAction)
if !ok {
continue
}
if !validateNamespaceAndResource(watchAction, expectedAction) {
continue
}
return true
case "create":
createAction, ok := gotAction.(kubetesting.CreateAction)
if !ok {
continue
}
if !validateNamespaceAndResource(createAction, expectedAction) {
continue
}
return true
case "update":
updateAction, ok := gotAction.(kubetesting.UpdateAction)
if !ok {
continue
}
if !validateNamespaceAndResource(updateAction, expectedAction) {
continue
}
return true
case "delete":
deleteAction, ok := gotAction.(kubetesting.DeleteAction)
if !ok {
continue
}
if deleteAction.GetName() != expectedAction.Name {
continue
}
if !validateNamespaceAndResource(deleteAction, expectedAction) {
continue
}
return true
case "patch":
patchAction, ok := gotAction.(kubetesting.PatchAction)
if !ok {
continue
}
if patchAction.GetName() != expectedAction.Name {
continue
}
if !validateNamespaceAndResource(patchAction, expectedAction) {
continue
}
if patchAction.GetPatchType() != expectedAction.PatchType {
continue
}
patchBytes, err := json.Marshal(expectedAction.PatchPayload)
if err != nil {
continue
}
if !bytes.Equal(patchAction.GetPatch(), patchBytes) {
continue
}
return true
}
}
return false
}
func validateNamespaceAndResource(action kubetesting.Action, expectedAction ExpectedAction) bool {
return action.GetNamespace() == expectedAction.Namespace && action.GetResource().Resource == expectedAction.Resource
}
这个断言功能允许你随便往里加判断条件,比如验证list/watch
限制以及create/update
主体。
总结
Kubernetes
的生态系统非常丰富,我们时不时会发现这样的宝藏。我强烈大家阅读k8s.io/client-go
源码,当然仅仅读它也不够,sigs.k8s.io/controller-runtime
项目和Kubernetes Reference API
都是极好的。