操操操

Client Go四种交互模式之 DynamicClient实战案例详解

2024-03-02
9分钟阅读时长

引子

Kubernetes赢得了云原生平台之争,同时在绝大多数云原生场景中都凭借其高扩展性担任了重要角色。通过kube-apiserver提供的开放的模块,在不需要切分一个内外部的接口情况下,让我们具备了在同一应用前提下(Controller)与集群及其他系统交互的能力,甚至是自定义的资源描述我们特殊的操作,被称之为Operator Pattern

尽管我们可以使用HTTP ClientAPI Server之间进行交互,但这其实并不简单。如果我们只考虑kubernetes的核心resources,那么这种交互方式恐怕不是最完美的,因为许多resources请求后响应回的数据结构和可能操作都是不一样的。作为加强,Kubernetes本身提供了一系列更加方便整合进kubernetes系统的方式,被称之为client-go

这个项目中被最广泛使用客户端是ClientSet,它是交互客户端的类型之一。这意味着这个接口给kubernetes的每种资源都提供了可用的扩展方法和操作,这也就是为什么但凡要实现扩展,我们首先就应该考虑这种标准操作kubernetes内置接口的方式。

但是这也肯定不能满足全部场景,很多情况下我们要自己声明CRD(Custom Resources Definition),ClientSet无法满足我们我们的需求了。这个时候,我们就需要DynamicClient,也就是k8s.io/client-go/dynamic.Interface接口,来让我们有能力入侵系统,并实现自定义的功能和操作。

这种方式的好处在于,首先,它避免了对kubernetes系统的强依赖,让我们可以操作自定义resources。如果你想通过其他操作作为构建块来建构自动化或者工作流,比如ExternalDNS,CertManager,Prometheus Operator等,通常就是把这些项目,使用Go语言数据类型,把它们注册到你的Client实例当中,作为依赖。操作的自由度越大,管控的难度显然就会越大,你需要管理这些扩展功能的迭代升级版本,同时又要保持你已经在集群上安装的版本与go.mod文件中声明的版本相一致。

第二,你可以协调多种或未知的resources。当你的operator实现了一个通用逻辑,它就可以与任何通用的Kubernetes resources(RBACPods)交互,custom resources也不在话下,这个时候,dynamicClient就是你唯一的解决的方案。DynamicClient中有些例子对garbage collection controller是有重度依赖的,假如你打算对项目里任意的custom resources提供支持DynamicClient类型扩展也必不可少,比如KubeWatch

因此,深入client-go项目的组成模块和弄清楚如何使用好它就是要具备的技能。

通过DynamicClient做基础操作

以下的代码都预设你在`kubernetes cluster`中进行使用。

许多DynamicClient相关的操作都与TypedClient类似,比如创建新实例,都可以通过给构造方法提供配置文件来完成。

func newClient() (dynamic.Interface, error) {
	config, err := rest.InClusterConfig()
	if err != nil {
		return nil, err
	}

	dynClient, err := dynamic.NewForConfig(config)
	if err != nil {
		return nil, err
	}

	return dynClient, nil
}

尽管DynamicClient对于你通过resources声明的意图一无所知,它也没提供类似于CoreV1().Pod类似的方法。所以得告诉它怎么来解析和处理你提供的resources,这时候首先要做的,你需要提供一个schema.GroupVersionResource,它是一个提供了必要信息的golang数据结构,用它可以来构造针对于API ServerHTTP请求。

举例,如果你想实现一个从MongoDB Community Operator拉取到所有Mongodb resources列表的功能,你可以这么做:

var monboDBResource = schema.GroupVersionResource{Group: "mongodbcommunity.mongodb.com", Version: "v1", Resource: "mongodbcommunity"}

func ListMongoDB(ctx context.Context, client dynamic.Interface, namespace string) ([]unstructured.Unstructured, error)  {
	list, err := client.Resource(monboDBResource).Namespace(namespace).List(ctx, metav1.ListOptions{})
	if err != nil {
		return nil, err
	}

	return list.Items, nil
}

这里要注意,如果你处理带有namespaceresources,那.Namespace(namespace)就是强制的,而使用空字符就是列出所有命名空间下的。

这段代码片段,我们可以看到DynamicClient的主要组成unstructured.Unstructured。这是一个封装任意JSON结构的特别类型,同时也符合标准的Kubernetes interface,比如runtime.Object,但是最为重要的是它提供了在unstructure package下一系列helpers用于操作这些数据。

扩充下我们上面的例子,如果我们想按一定比例来扩容MongoDB,我们可以像如下方式使用:

// ScaleMongoDB changes the number of members by the given proportion,
// which should be 0 =< proportion < 1.
func ScaleMongoDB(ctx context.Context, client dynamic.Interface, name string, namespace string, proportion uint) error {
	if proportion > 1 {
		return fmt.Errorf("proportion should be between 0 =< proportion < 1")
	}

	mongoDBClient := client.Resource(monboDBResource).Namespace(namespace)
	mdb, err := mongoDBClient.Get(ctx, name, metav1.GetOptions{})
	if err != nil {
		return err
	}

	members, found, err := unstructured.NestedInt64(mdb.UnstructuredContent(), "spec", "members")
	if err != nil {
		return err
	}

	if !found {
		return fmt.Errorf("members field not found on MongoDB spec")
	}

	scaled := int(members) * (1 + int(proportion))

	patch := []interface{}{
		map[string]interface{}{
			"op":    "replace",
			"path":  "/spec/members",
			"value": scaled,
		},
	}

	payload, err := json.Marshal(patch)
	if err != nil {
		return err
	}

	_, err = mongoDBClient.Patch(ctx, name, types.JSONPatchType, payload, metav1.PatchOptions{})
	if err != nil {
		return err
	}

	return nil
}

这里我们利用unstructured.NestedInt64只访问我们要操作的字段,保证我们与MongoDB CRD间的耦合最小,同时又可以在安全类型之下来操作resource data

unstructured package有很多像这种的helpers,不只用于读取,也可以在resource上的任意字段上进行写入。

Kubernetes上执行所有常规操作(get,list,watch,create,patch,delete等)也遵循同样规则:即提供scheme.GroupVersionResource,同时处理unstructured.Unstructured结果。

带有DynamicClientController

更高级和对Kubernetes client的使用是创建一个controller,它可对实际集群状态的变化进行响应,把它调整到期望状态。

通常,我们使用Informer,一个由k8s.io/client-go提供的模块,状态发生改变、创建就会在指定类型的client运行一个handler。幸运的是Dynamic package提供了一个我们可以使用的Informer模块。

举例,如果MongoDB被检测到清除了相关PersistentVolumeClaims,我们进行捕获,如下:

package main

import (
	"fmt"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/dynamic/dynamicinformer"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/util/workqueue"
	"time"
)

const maxRetries = 3

var monboDBResource = schema.GroupVersionResource{Group: "mongodbcommunity.mongodb.com", Version: "v1", Resource: "mongodbcommunity"}

type MongoDBController struct {
	informer cache.SharedIndexInformer
	stopper  chan struct{}
	queue    workqueue.RateLimitingInterface
}

func NewMongoDBController(client dynamic.Interface) (*MongoDBController, error) {
	dynInformer := dynamicinformer.NewDynamicSharedInformerFactory(client, 0)
	informer := dynInformer.ForResource(monboDBResource).Informer()
	stopper := make(chan struct{})

	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		DeleteFunc: func(obj interface{}) {
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
	})

	return &MongoDBController{
		informer: informer,
		queue: queue,
		stopper: stopper,
	}, nil
}

func (m *MongoDBController) Stop() {
	close(m.stopper)
}

func (m *MongoDBController) Run() {
	defer utilruntime.HandleCrash()

	defer m.queue.ShutDown()

	go m.informer.Run(m.stopper)

	if !cache.WaitForCacheSync(m.stopper, m.informer.HasSynced) {
		utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
		return
	}

	wait.Until(m.runWorker, time.Second, m.stopper)
}

func (m *MongoDBController) runWorker() {
	for {
		key, quit := m.queue.Get()
		if quit {
			return
		}

		err := m.processItem(key.(string))

		if err == nil {
			m.queue.Forget(key)
		} else if m.queue.NumRequeues(key) < maxRetries {
			m.queue.AddRateLimited(key)
		} else {
			m.queue.Forget(key)
			utilruntime.HandleError(err)
		}

		m.queue.Done(key)
	}
}

func (m *MongoDBController) processItem(mongodb string) error {
	return nil
}

这段绝大多数代码都非常标准,例如使用特定化类型client controllerwork queue,informer event handler,item processing

此外,使用由DynamicClient所提供的解耦方式,在terms complexity上确实让人少了好多头痛事。

DynamicClient进行测试

如果我们想用DynamicClient来进行伸缩,还要让其具备像TypedClient一样易于测试,这样才能保证整体扩展的灵活性和健壮性。

在上面给出的Controller例子中,dynamic package提供了一个等效的伪客户端,允许使用它做存根对象验证及断言行为执行。这里多说两句,如果对于单元测试了解不深入,可能对于我说的概念有点糊涂,模拟通过具有验证调用和交互的能力,而存根通常没有,存根通常用于提供固定数据或控制测试环境,而模拟则用于代码和外部环境间的交互。

package main

import (
	"context"
	"k8s.io/apimachinery/pkg/runtime"
	dynamicfake "k8s.io/client-go/dynamic/fake"
)

func TestDynamicClient(t *testing.T) {
	mdb := &unstructured.Unstructured{}
	mdb.SetUnstructuredContent(map[string]interface{}{
		"apiVersion": "mongodbcommunity.mongodb.com/v1",
		"kind": "MongoDBCommunity",
		"metadata": map[string]interface{} {
			"name":      "mongodb-test",
			"namespace": "default",
		},
		"spec": map[string]interface{}{
			"members": 3,
		},
	})

	dynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme(), mdb)
  NotifyMongoDBs(context.Background(), dynamicClient)

	AssertActions(t,  dynamicClient.Actions(), []ExpectedAction{
		{
			Verb: "list",
			Namespace: "default",
			Resource: "mongodbcommunity",
		},
	})

}

使用unstructured.Unstructured类型,我们可以像使用YAML文件中相同标签一样来创建Kubernetes对象,但要带有maps

在执行测试逻辑之后,我们可以使用dynamicClient.Actions()来查看我们代码里执行的所有操作。然而,在每个测试上面手动断言这些actions,通常会导致代码可读性下降和不够完善的断言。

与此同时,我经常使用特定的断言功能AssertActions来校验是不是每个预期的action都可以在可执行actions中被查找到。关键点在于,此功能无法提供一个准确的列表匹配,举例,如果正在使用的client执行了一个删除操作,测试无法进行中断,这里仅有的用于失败的AssertAction条件是列表提供的operation在预期的列表中查询不到。可以在,仅当预期的actions被执行到时,来改变断言功能或者创建一个sibling function来做验证。

尽管当前的实现比较麻烦,但是此功能与DynamicClientTypedClient均可协调使用。

type ExpectedAction struct {
	Verb string
	Name string
	Namespace string
	Resource string

	// Patch action
	PatchType types.PatchType
	PatchPayload []map[string]interface{}
}

func AssertActions(t *testing.T, got []kubetesting.Action, expected []ExpectedAction) {
	if len(expected) > len(got) {
		t.Fatalf("executed actions too short, expected %d, got %d", len(expected), len(got))
		return
	}

	for i, expectedAction := range expected {
		if !AssertExpectedAction(got, expectedAction) {
			t.Fatalf("action %d does not match any of the got actions", i)
		}
	}
}

func AssertExpectedAction(got []kubetesting.Action, expectedAction ExpectedAction) bool {
	for _, gotAction := range got {
		switch expectedAction.Verb {
		case "get":
			getAction, ok := gotAction.(kubetesting.GetAction)
			if !ok {
				continue
			}

			if getAction.GetName() != expectedAction.Name {
				continue
			}

			if !validateNamespaceAndResource(getAction, expectedAction) {
				continue
			}

			return true
		case "list":
			listAction, ok := gotAction.(kubetesting.ListAction)
			if !ok {
				continue
			}

			if !validateNamespaceAndResource(listAction, expectedAction) {
				continue
			}

			return true
		case "watch":
			watchAction, ok := gotAction.(kubetesting.WatchAction)
			if !ok {
				continue
			}

			if !validateNamespaceAndResource(watchAction, expectedAction) {
				continue
			}

			return true
		case "create":
			createAction, ok := gotAction.(kubetesting.CreateAction)
			if !ok {
				continue
			}

			if !validateNamespaceAndResource(createAction, expectedAction) {
				continue
			}

			return true
		case "update":
			updateAction, ok := gotAction.(kubetesting.UpdateAction)
			if !ok {
				continue
			}

			if !validateNamespaceAndResource(updateAction, expectedAction) {
				continue
			}

			return true
		case "delete":
			deleteAction, ok := gotAction.(kubetesting.DeleteAction)
			if !ok {
				continue
			}

			if deleteAction.GetName() != expectedAction.Name {
				continue
			}

			if !validateNamespaceAndResource(deleteAction, expectedAction) {
				continue
			}

			return true
		case "patch":
			patchAction, ok := gotAction.(kubetesting.PatchAction)
			if !ok {
				continue
			}

			if patchAction.GetName() != expectedAction.Name {
				continue
			}

			if !validateNamespaceAndResource(patchAction, expectedAction) {
				continue
			}

			if patchAction.GetPatchType() != expectedAction.PatchType {
				continue
			}

			patchBytes, err := json.Marshal(expectedAction.PatchPayload)
			if err != nil {
				continue
			}

			if !bytes.Equal(patchAction.GetPatch(), patchBytes) {
				continue
			}

			return true
		}
	}

	return false
}

func validateNamespaceAndResource(action kubetesting.Action, expectedAction ExpectedAction) bool {
	return action.GetNamespace() == expectedAction.Namespace && action.GetResource().Resource == expectedAction.Resource
}

这个断言功能允许你随便往里加判断条件,比如验证list/watch限制以及create/update主体。

总结

Kubernetes的生态系统非常丰富,我们时不时会发现这样的宝藏。我强烈大家阅读k8s.io/client-go源码,当然仅仅读它也不够,sigs.k8s.io/controller-runtime项目和Kubernetes Reference API都是极好的。

Avatar

Aisen

Be water,my friend.
扫码关注公众号,可领取以下赠品:
《夯实基础的go语言体系建设》645页涵盖golang各大厂全部面试题,针对云原生领域更是面面俱到;
扫码加微信,可领取以下赠品:
【完整版】本人所著,原价1299元的《爱情困惑者必学的七堂课》; 50个搞定正妹完整聊天记录列表详情点这里
【完整版】时长7小时,原价699元《中国各阶层男性脱单上娶指南》;