为 Serverless 平台插上腾飞的翅膀--自定义 Faas 事件驱动事件源

我们都知道,Faas 最主要的两个特征是事件驱动自动扩速容(支持缩容到零),本文就带你研究 Faas 事件驱动的进阶开发:如何为你的 Faas 平台 添加自己的事件驱动源。

注意:仅适用于云原生的 Faas 平台,即基于 k8s 的 Faas 平台,因为本文的所有逻辑都是基于 k8s 的资源来进行操作的。

1. 基本原理

解释图中的几个概念

  • your Source CR: 这个是 k8s 的自定义资源 (CRD),CR 中主要包含事件源的相关字段,以及接收方的信息,(如果你是一个定时器的事件源,那么包含事件消息,事件的生成时间,接收方的地址等)如果你不知道 k8s 的 CRD 是什么,可以参考 k8s 官网 [https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/arrow-up-right]

  • Adapter:这个是需要自己开发的,是 k8sDeployment,主要是用来生成事件,或者将其他信息源转为你的 Faas 平台统一的消息格式 (比如统一转为 Serverless 的标准格式 CloudEvents),即事件生成器或者事件适配器。

    • 消息如何产生:

      • adapter 自己生成:这种情况是没有外部输入的,adapter 自己生成消息数据,转化为统一的消息格式 (如 CloudEvents)

      • 基于外部信息源:如果是基于外部信息的事件源,则根据需要主动去外部轮询 pull 指定接口或者被动监听相关数据,然后转化为统一的消息格式 (如 CloudEvents)

    • 消息如何发出:

      • adapter 中需要有环境变量指定接收方的地址,图中 key 为 SinkURL 的环境变量就指定了消息接收方的地址,这里可以是 k8sservice 的集群内域名(yoursvc.yournamespace.svc.cluster.local) 或者 knative serviceurl

  • controller:这个是需要自己开发的, k8s 的自定义 controller,主要作用是将 CR 资源 生成 Adapter Deployment

开发流程

介绍完 基本原理之后,下面就带你研究如何一步步开发自己的事件驱动事件源。

前提条件:熟悉 kubernetes 开发,熟悉 Go 开发, 如果能利用 Ko 什么?云原生开发你竟然不知道 “Ko”arrow-up-right,那更好不过了

代码参考 https://github.com/knative-sandbox/sample-sourcearrow-up-right

1. API 定义

定义 事件源 CRD 结构体, CRD 结构体包含 事件驱动事件源的必要信息,如 duckv1.SourceSpec (包含 Sink )定义了事件接收方的地址

结构体中 Spec 主要用于定义事件源的期望状态,status 中定义事件源的实际状态,statuscontroller来同步。

这是一个定时事件源,spec 中主要定义了 定时器的时间间隔 interval ,接收方的地址duckv1.SourceSpec.Sinkstatus 中主要定义了事件源 CR 的状态,(是否Ready),接收方的地址 SinkURI

其中,status 的状态由 controller 中对应资源的 Reconciler 来更改,可以通过下面的函数来对 status 进行更改

2. Controller

其中 controller 的部分与之前的文章 如何基于 Knative 开发 自定义controllerarrow-up-right 类似,想深究的小伙伴可以翻看一下那篇文章,此处不做过多的详细解析。

controller 入口

sharedmain.Main 会传入 controller 的实例化方法

3. Reconciler

接下来看一下 Reconciler 的实现, Reconcile 的作用有:

  • 根据 SampleSource 这个 CR 资源, 创建 DeploymentDeployment 中运行的进程是 Receive Adapter

  • 根据 Receive Adapter 这个 Deployment 的状态和接收方的状态,更新 SampleSourcestatus

对于 创建 Receive Adapter 的过程,下面来看下详细的步骤

  1. 组装 Receive Adapter 的参数

2. 查看需要创建 的 Deployment 存在不存在,如果不存在需要重新创建

如果 deployment 已经存在,判断需要更新的情况下更新 Deployment

第一章中说明的 Deployment 中 的接收方的地址 SinkURI是怎么传进去的呢?别急,一步步带你研究

再来回头看下 ReconcileDeployment (sample-source/pkg/reconciler/sample/samplesource.go)这个函数

传入了 SinkBinding 对象,由 makeSinkBinding 构造完成 对象中包含 source中的 SourceSpec (这里面看之前的结构体可以知道,这里包含 接收方的地址 SinkURI

ReconcileDeployment 中有个 syncSink 函数,这个比较关键,这是将 SinkURI 传入 deployment 环境变量的关键一步,

binder.Do 中有以下逻辑,将 Sink 注入 PodEnv

4. Receive Adapter

Receive Adapter 是 用来产生事件源的组件,事件源可以由 Receive Adapter 独自产生,或者 Receive Apapter 将其他信息转换成事件源。下面解析 Receive Adapter 的实现

  1. 入口函数

adapter.Main 函数入参为 adapter.NewEnvadapter.NewAdapter 构造函数

2.下面看 adapter.NewAdapter

下面看 Adapter 结构体,Adapter 实现了 Start 方法,用于 main函数的调用

Start() 方法

Start() 方法中 调用了 Main函数中传入的 Cloudevent client 来发送消息 a.client.Send...

5. CRD 定义与 示例 CR

CRD 的生成方法参考本公众号发的这篇文章: 如何基于 Knative 开发 自定义controllerarrow-up-right

自定义事件源举例

6. Debug

详细代码可以 fork 仓库 https://github.com/knative-sandbox/sample-source

如果已经看过之前的这篇文章[什么?云原生开发你竟然不知道 “Ko”arrow-up-right],那么调试起来就非常轻松了,只需一行命令就可以:

还等什么,赶紧操练起来吧!

Last updated