note
  • 个人笔记
  • 理解 Cilium 系列文章
    • (1) 初识 Cilium
    • (2) 理解网络数据包的流转过程
    • (3) ebpf
    • (4) ebpf Map
    • (5) ebpf 辅助函数
    • (6) XDP
    • (7) cilium 原理解析
  • linux
    • 内存
    • network
      • Linux虚拟网络之 tun/tap、ipvlan/macvlan、ipvtap/macvtap
      • socket
        • image
    • 文件系统
    • 详解Cgroup V2
    • ebpf
  • go
    • map
  • 平台
    • 关于函数冷启动
    • 镜像预热
    • 网关
    • Serverless平台遇到的问题⭐️⭐️ ⭐️⭐️⭐️
  • 工具合集
    • 压测 hey
    • 调试 pod
  • kubernetes
    • oci
      • OCI 和 runc
    • 容器网络
      • iptables ipvs
      • 如何实现一个 Kubernetes 网络插件
      • 性能提升40%: 腾讯 TKE 用 eBPF 绕过 conntrack 优化 K8s Service
    • CRI
    • 调度
    • kubectl top
    • 容器中的单进程和 k8s 中的 Pause 容器
    • Docker entrypoint 和 kubernetes 容器启动命令
  • 开发相关
    • 如何利用 Google 开源工具 Ko 在 kubernetes 上构建并部署 Go 应用
  • serverless
    • serverless 趋势解读
    • 有状态函数
  • 公众号内容
    • 题材列表
    • 内容
      • 为 Serverless 平台插上腾飞的翅膀--自定义 Faas 事件驱动事件源
      • 容器镜像按需加载
      • 【图解 Knative】解密 Eventing Broker-Trigger实现原理
      • 图解 kubectl 命令
      • Serverless 2021 最新调查报告
      • 典型的 Serverless 架构是怎样的
  • 容器技术
    • Docker 存储驱动
    • Docker overlay2 分层原理1
    • Docker overlay2 分层原理2
  • 操作系统
    • 第二章 中断,异常,系统调用
    • 第三章 计算机体系结构和 内存层次结构
  • 虚拟化
    • kvm与 qemu关系
    • kvm
  • 负载均衡
    • 负载均衡相关文章
Powered by GitBook
On this page
  • 1. 基本原理
  • 开发流程
  • 1. API 定义
  • 2. Controller
  • 3. Reconciler
  • 4. Receive Adapter
  • 5. CRD 定义与 示例 CR
  • 6. Debug

Was this helpful?

  1. 公众号内容
  2. 内容

为 Serverless 平台插上腾飞的翅膀--自定义 Faas 事件驱动事件源

Previous内容Next容器镜像按需加载

Last updated 4 years ago

Was this helpful?

我们都知道,Faas 最主要的两个特征是事件驱动与自动扩速容(支持缩容到零),本文就带你研究 Faas 事件驱动的进阶开发:如何为你的 Faas 平台 添加自己的事件驱动源。

注意:仅适用于云原生的 Faas 平台,即基于 k8s 的 Faas 平台,因为本文的所有逻辑都是基于 k8s 的资源来进行操作的。

1. 基本原理

解释图中的几个概念

  • Adapter:这个是需要自己开发的,是 k8s 的 Deployment,主要是用来生成事件,或者将其他信息源转为你的 Faas 平台统一的消息格式 (比如统一转为 Serverless 的标准格式 CloudEvents),即事件生成器或者事件适配器。

    • 消息如何产生:

      • adapter 自己生成:这种情况是没有外部输入的,adapter 自己生成消息数据,转化为统一的消息格式 (如 CloudEvents)

      • 基于外部信息源:如果是基于外部信息的事件源,则根据需要主动去外部轮询 pull 指定接口或者被动监听相关数据,然后转化为统一的消息格式 (如 CloudEvents)

    • 消息如何发出:

      • adapter 中需要有环境变量指定接收方的地址,图中 key 为 SinkURL 的环境变量就指定了消息接收方的地址,这里可以是 k8s 的 service 的集群内域名(yoursvc.yournamespace.svc.cluster.local) 或者 knative service 的 url

  • controller:这个是需要自己开发的, k8s 的自定义 controller,主要作用是将 CR 资源 生成 Adapter Deployment

开发流程

介绍完 基本原理之后,下面就带你研究如何一步步开发自己的事件驱动事件源。

1. API 定义

定义 事件源 CRD 结构体, CRD 结构体包含 事件驱动事件源的必要信息,如 duckv1.SourceSpec (包含 Sink )定义了事件接收方的地址

# pkg/apis/samples/v1alpha1/samplesource_types.go

// +genclient
// +genreconciler
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +k8s:openapi-gen=true
type SampleSource struct {
	metav1.TypeMeta `json:",inline"`
	// +optional
	metav1.ObjectMeta `json:"metadata,omitempty"`

	// Spec holds the desired state of the SampleSource (from the client).
	Spec SampleSourceSpec `json:"spec"`

	// Status communicates the observed state of the SampleSource (from the controller).
	// +optional
	Status SampleSourceStatus `json:"status,omitempty"`
}

// SampleSourceSpec holds the desired state of the SampleSource (from the client).
type SampleSourceSpec struct {
	// inherits duck/v1 SourceSpec, which currently provides:
    // * Sink - a reference to an object that will resolve to a domain name or
    //   a URI directly to use as the sink.
    // * CloudEventOverrides - defines overrides to control the output format
    //   and modifications of the event sent to the sink.
    duckv1.SourceSpec `json:",inline"`

    // Interval is the time interval between events.
    //
    // The string format is a sequence of decimal numbers, each with optional
    // fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m". Valid time
    // units are "ns", "us" (or "µs"), "ms", "s", "m", "h". If unspecified
    // this will default to "10s".
    Interval string `json:"interval"`
}

// SampleSourceStatus communicates the observed state of the SampleSource (from the controller).
type SampleSourceStatus struct {
	duckv1.Status `json:",inline"`

	// SinkURI is the current active sink URI that has been configured
	// for the SampleSource.
	// +optional
	SinkURI *apis.URL `json:"sinkUri,omitempty"`
}

结构体中 Spec 主要用于定义事件源的期望状态,status 中定义事件源的实际状态,status 由 controller来同步。

这是一个定时事件源,spec 中主要定义了 定时器的时间间隔 interval ,接收方的地址duckv1.SourceSpec.Sink ;status 中主要定义了事件源 CR 的状态,(是否Ready),接收方的地址 SinkURI

其中,status 的状态由 controller 中对应资源的 Reconciler 来更改,可以通过下面的函数来对 status 进行更改

# pkg/apis/samples/VERSION/samplesource_lifecycle.go
// InitializeConditions sets relevant unset conditions to Unknown state.
func (s *SampleSourceStatus) InitializeConditions() {
	SampleCondSet.Manage(s).InitializeConditions()
}

...

// MarkSink sets the condition that the source has a sink configured.
func (s *SampleSourceStatus) MarkSink(uri *apis.URL) {
	s.SinkURI = uri
	if len(uri.String()) > 0 {
		SampleCondSet.Manage(s).MarkTrue(SampleConditionSinkProvided)
	} else {
		SampleCondSet.Manage(s).MarkUnknown(SampleConditionSinkProvided, "SinkEmpty", "Sink has resolved to empty.%s", "")
	}
}

// MarkNoSink sets the condition that the source does not have a sink configured.
func (s *SampleSourceStatus) MarkNoSink(reason, messageFormat string, messageA ...interface{}) {
	SampleCondSet.Manage(s).MarkFalse(SampleConditionSinkProvided, reason, messageFormat, messageA...)
}

2. Controller

controller 入口

#cmd/controller
import (
	// The set of controllers this controller process runs.
	"knative.dev/sample-source/pkg/reconciler/sample"

	// This defines the shared main for injected controllers.
	"knative.dev/pkg/injection/sharedmain"
)

func main() {
	sharedmain.Main("sample-source-controller", sample.NewController)
}

sharedmain.Main 会传入 controller 的实例化方法

#pkg/reconciler/sample/controller.go

func NewController(
	ctx context.Context,
	cmw configmap.Watcher,
) *controller.Impl {

  // 借助 injection 从 context 中获取 informer 
	deploymentInformer := deploymentinformer.Get(ctx)
	sampleSourceInformer := samplesourceinformer.Get(ctx)

  // 实例化 Reconciler
	r := &Reconciler{
		dr: &reconciler.DeploymentReconciler{KubeClientSet: kubeclient.Get(ctx)},
		// Config accessor takes care of tracing/config/logging config propagation to the receive adapter
		configAccessor: reconcilersource.WatchConfigurations(ctx, "sample-source", cmw),
	}
	if err := envconfig.Process("", r); err != nil {
		logging.FromContext(ctx).Panicf("required environment variable is not defined: %v", err)
	}

  // 实例化 controller.impl 返回 供 controller 框架调用
	impl := samplesource.NewImpl(ctx, r)

	r.sinkResolver = resolver.NewURIResolver(ctx, impl.EnqueueKey)

	logging.FromContext(ctx).Info("Setting up event handlers")

  // 添加 informer 的hander 函数
	sampleSourceInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

	deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
		FilterFunc: controller.FilterControllerGK(v1alpha1.Kind("SampleSource")),
		Handler:    controller.HandleAll(impl.EnqueueControllerOf),
	})

	return impl
}

3. Reconciler

接下来看一下 Reconciler 的实现, Reconcile 的作用有:

  • 根据 SampleSource 这个 CR 资源, 创建 Deployment,Deployment 中运行的进程是 Receive Adapter

  • 根据 Receive Adapter 这个 Deployment 的状态和接收方的状态,更新 SampleSource 的 status

对于 创建 Receive Adapter 的过程,下面来看下详细的步骤

  1. 组装 Receive Adapter 的参数

# pkg/reconciler/sample/samplesource.go

//1. 组装 Receive Adapter 的参数
raArgs := resources.ReceiveAdapterArgs{
		EventSource:    src.Namespace + "/" + src.Name,
        Image:          r.ReceiveAdapterImage,
        Source:         src,
        Labels:         resources.Labels(src.Name),
        AdditionalEnvs: r.configAccessor.ToEnvVars(), // Grab config envs for tracing/logging/metrics
	}

2. 查看需要创建 的 Deployment 存在不存在,如果不存在需要重新创建

# pkg/reconciler/deployment.go
namespace := owner.GetObjectMeta().GetNamespace()
	ra, err := r.KubeClientSet.AppsV1().Deployments(namespace).Get(ctx, expected.Name, metav1.GetOptions{})

如果 deployment 已经存在,判断需要更新的情况下更新 Deployment

} else if r.podSpecImageSync(expected.Spec.Template.Spec, ra.Spec.Template.Spec) {
    ra.Spec.Template.Spec = expected.Spec.Template.Spec
    if ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Update(ra); err != nil {
        return ra, err
    }

第一章中说明的 Deployment 中 的接收方的地址 SinkURI是怎么传进去的呢?别急,一步步带你研究

再来回头看下 ReconcileDeployment (sample-source/pkg/reconciler/sample/samplesource.go)这个函数

	ra, sb, event := r.dr.ReconcileDeployment(ctx, src, makeSinkBinding(src),
		resources.MakeReceiveAdapter(&resources.ReceiveAdapterArgs{
			EventSource:    src.Namespace + "/" + src.Name,
			Image:          r.ReceiveAdapterImage,
			Source:         src,
			Labels:         resources.Labels(src.Name),
			AdditionalEnvs: r.configAccessor.ToEnvVars(), // Grab config envs for tracing/logging/metrics
		}),
	)

传入了 SinkBinding 对象,由 makeSinkBinding 构造完成 对象中包含 source中的 SourceSpec (这里面看之前的结构体可以知道,这里包含 接收方的地址 SinkURI)

func makeSinkBinding(src *v1alpha1.SampleSource) *sourcesv1.SinkBinding {
	return &sourcesv1.SinkBinding{
		ObjectMeta: metav1.ObjectMeta{
			// this is necessary to track the change of sink reference.
			Name:      src.GetName(),
			Namespace: src.GetNamespace(),
		},
		Spec: sourcesv1.SinkBindingSpec{
			SourceSpec: src.Spec.SourceSpec,
		},
	}
}

在ReconcileDeployment 中有个 syncSink 函数,这个比较关键,这是将 SinkURI 传入 deployment 环境变量的关键一步,

syncSink(ctx, binder, expected.Spec.Template.Spec)


func syncSink(ctx context.Context, binder *sourcesv1.SinkBinding, now corev1.PodSpec) {
	// call Do() to project sink information.
	ps := &duckv1.WithPod{}
	ps.Spec.Template.Spec = now

	binder.Do(ctx, ps)
}

binder.Do 中有以下逻辑,将 Sink 注入 Pod 的 Env 中

		spec.InitContainers[i].Env = append(spec.InitContainers[i].Env, corev1.EnvVar{
			Name:  "K_SINK",
			Value: uri.String(),
		})

4. Receive Adapter

Receive Adapter 是 用来产生事件源的组件,事件源可以由 Receive Adapter 独自产生,或者 Receive Apapter 将其他信息转换成事件源。下面解析 Receive Adapter 的实现

  1. 入口函数

# sample-source/cmd/receive_adapter/main.go
package main

import (
	"knative.dev/eventing/pkg/adapter"
	myadapter "knative.dev/sample-source/pkg/adapter"
)

func main() {
	adapter.Main("sample-source", myadapter.NewEnv, myadapter.NewAdapter)
}

adapter.Main 函数入参为 adapter.NewEnv 和 adapter.NewAdapter 构造函数

  • 另外还是根据环境变量构造 CloudEvents client,传递给 adapter.NewAdapter

	#knative.dev/eventing/pkg/adapter/v2/cloudevents.go#48
	
	// 1. 获取到事件接受方的地址
	if target == "" && env != nil {
		target = env.GetSink()
	}
	
	#knative.dev/eventing/pkg/adapter/v2/cloudevents.go#54
	
	// 2. 构造 eventclient的 target 参数
		if len(target) > 0 {
		pOpts = append(pOpts, cloudevents.WithTarget(target))
	}

#knative.dev/eventing/pkg/adapter/v2/main.go#166

	// 3. 调用adapter.NewAdapter 构造函数,实例化 Adapter 结构体
	adapter := ctor(ctx, env, eventsClient)


#knative.dev/eventing/pkg/adapter/v2/main.go#196

  // 4. 调用 Adapter Start 函数
	if err := adapter.Start(ctx); err != nil {
		logger.Fatalw("Start returned an error", zap.Error(err))
	}

2.下面看 adapter.NewAdapter

下面看 Adapter 结构体,Adapter 实现了 Start 方法,用于 main函数的调用

// Adapter generates events at a regular interval.
type Adapter struct {
	logger   *zap.Logger
	interval time.Duration
	nextID   int
	client   cloudevents.Client
}
...

Start() 方法

Start() 方法中 调用了 Main函数中传入的 Cloudevent client 来发送消息 a.client.Send...

func (a *Adapter) Start(ctx context.Context) error {
	a.logger.Infow("Starting heartbeat", zap.String("interval", a.interval.String()))
	for {
		select {
		case <-time.After(a.interval):
			event := a.newEvent()
			a.logger.Infow("Sending new event", zap.String("event", event.String()))
			if result := a.client.Send(context.Background(), event); !cloudevents.IsACK(result) {
				a.logger.Infow("failed to send event", zap.String("event", event.String()), zap.Error(result))
				// We got an error but it could be transient, try again next interval.
				continue
			}
		case <-ctx.Done():
			a.logger.Info("Shutting down...")
			return nil
		}
	}
}

5. CRD 定义与 示例 CR

# sample-source/config/300-samplesource.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  labels:
    samples.knative.dev/release: devel
    eventing.knative.dev/source: "true"
    knative.dev/crd-install: "true"
  annotations:
    registry.knative.dev/eventTypes: |
      [
        { "type": "dev.knative.sample" }
      ]
  name: samplesources.samples.knative.dev
spec:
  group: samples.knative.dev
  versions:
    - &version
      name: v1alpha1
      served: true
      storage: true
      subresources:
        status: {}
      schema:
        openAPIV3Schema:
          type: object
          # this is a work around so we don't need to flesh out the
          # schema for each version at this time
          #
          # see issue: https://github.com/knative/serving/issues/912
          x-kubernetes-preserve-unknown-fields: true
      additionalPrinterColumns:
        - name: Ready
          type: string
          jsonPath: ".status.conditions[?(@.type=='Ready')].status"
        - name: Reason
          type: string
          jsonPath: ".status.conditions[?(@.type=='Ready')].reason"
        - name: Sink
          type: string
          jsonPath: .status.sinkUri
        - name: Age
          type: date
          jsonPath: .metadata.creationTimestamp
  names:
    categories:
    - all
    - knative
    - eventing
    - sources
    kind: SampleSource
    plural: samplesources
  scope: Namespaced

自定义事件源举例

apiVersion: samples.knative.dev/v1alpha1
kind: SampleSource
metadata:
  name: sample-source
  namespace: knative-samples
spec:
  interval: "10s"
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display

6. Debug

详细代码可以 fork 仓库 https://github.com/knative-sandbox/sample-source

ko apply -f config/*

还等什么,赶紧操练起来吧!

your Source CR: 这个是 k8s 的自定义资源 (CRD),CR 中主要包含事件源的相关字段,以及接收方的信息,(如果你是一个定时器的事件源,那么包含事件消息,事件的生成时间,接收方的地址等)如果你不知道 k8s 的 CRD 是什么,可以参考 k8s 官网 []

前提条件:熟悉 kubernetes 开发,熟悉 Go 开发, 如果能利用 Ko ,那更好不过了

代码参考

其中 controller 的部分与之前的文章 类似,想深究的小伙伴可以翻看一下那篇文章,此处不做过多的详细解析。

adapter.Main 会解析运行时的环境变量并加载到 adapter.NewEnv 中定义的变量中,并调用 Adapter 结构提的 Start() 方法 knative.dev/eventing/pkg/adapter/v2/main.go#197 (``),

CRD 的生成方法参考本公众号发的这篇文章:

如果已经看过之前的这篇文章[],那么调试起来就非常轻松了,只需一行命令就可以:

https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/
什么?云原生开发你竟然不知道 “Ko”
https://github.com/knative-sandbox/sample-source
如何基于 Knative 开发 自定义controller
https://github.com/knative/eventing/blob/main/pkg/adapter/v2/main.go
如何基于 Knative 开发 自定义controller
什么?云原生开发你竟然不知道 “Ko”