为 Serverless 平台插上腾飞的翅膀--自定义 Faas 事件驱动事件源
我们都知道,Faas 最主要的两个特征是事件驱动与自动扩速容(支持缩容到零),本文就带你研究 Faas 事件驱动的进阶开发:如何为你的 Faas 平台 添加自己的事件驱动源。
注意:仅适用于云原生的 Faas 平台,即基于 k8s 的 Faas 平台,因为本文的所有逻辑都是基于 k8s 的资源来进行操作的。
1. 基本原理

解释图中的几个概念
your Source CR: 这个是 k8s 的自定义资源 (CRD),CR 中主要包含事件源的相关字段,以及接收方的信息,(如果你是一个定时器的事件源,那么包含事件消息,事件的生成时间,接收方的地址等)如果你不知道 k8s 的 CRD 是什么,可以参考 k8s 官网 [https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/]
Adapter:这个是需要自己开发的,是k8s的Deployment,主要是用来生成事件,或者将其他信息源转为你的Faas平台统一的消息格式 (比如统一转为Serverless的标准格式CloudEvents),即事件生成器或者事件适配器。消息如何产生:
adapter自己生成:这种情况是没有外部输入的,adapter自己生成消息数据,转化为统一的消息格式 (如CloudEvents)基于外部信息源:如果是基于外部信息的事件源,则根据需要主动去外部轮询
pull指定接口或者被动监听相关数据,然后转化为统一的消息格式 (如CloudEvents)
消息如何发出:
adapter中需要有环境变量指定接收方的地址,图中 key 为 SinkURL 的环境变量就指定了消息接收方的地址,这里可以是k8s的service的集群内域名(yoursvc.yournamespace.svc.cluster.local) 或者knative service的url
controller:这个是需要自己开发的,k8s的自定义controller,主要作用是将 CR 资源 生成AdapterDeployment
开发流程
介绍完 基本原理之后,下面就带你研究如何一步步开发自己的事件驱动事件源。
前提条件:熟悉 kubernetes 开发,熟悉 Go 开发, 如果能利用 Ko 什么?云原生开发你竟然不知道 “Ko”,那更好不过了
代码参考 https://github.com/knative-sandbox/sample-source
1. API 定义
定义 事件源 CRD 结构体, CRD 结构体包含 事件驱动事件源的必要信息,如 duckv1.SourceSpec (包含 Sink )定义了事件接收方的地址
结构体中 Spec 主要用于定义事件源的期望状态,status 中定义事件源的实际状态,status 由 controller来同步。
这是一个定时事件源,spec 中主要定义了 定时器的时间间隔 interval ,接收方的地址duckv1.SourceSpec.Sink ;status 中主要定义了事件源 CR 的状态,(是否Ready),接收方的地址 SinkURI
其中,status 的状态由 controller 中对应资源的 Reconciler 来更改,可以通过下面的函数来对 status 进行更改
2. Controller
其中 controller 的部分与之前的文章 如何基于 Knative 开发 自定义controller 类似,想深究的小伙伴可以翻看一下那篇文章,此处不做过多的详细解析。
controller 入口
sharedmain.Main 会传入 controller 的实例化方法
3. Reconciler
接下来看一下 Reconciler 的实现, Reconcile 的作用有:
根据
SampleSource这个CR资源, 创建Deployment,Deployment中运行的进程是Receive Adapter根据
Receive Adapter这个Deployment的状态和接收方的状态,更新SampleSource的status
对于 创建 Receive Adapter 的过程,下面来看下详细的步骤
组装 Receive Adapter 的参数
2. 查看需要创建 的 Deployment 存在不存在,如果不存在需要重新创建
如果 deployment 已经存在,判断需要更新的情况下更新 Deployment
第一章中说明的 Deployment 中 的接收方的地址 SinkURI是怎么传进去的呢?别急,一步步带你研究
再来回头看下 ReconcileDeployment (sample-source/pkg/reconciler/sample/samplesource.go)这个函数
传入了 SinkBinding 对象,由 makeSinkBinding 构造完成 对象中包含 source中的 SourceSpec (这里面看之前的结构体可以知道,这里包含 接收方的地址 SinkURI)
在ReconcileDeployment 中有个 syncSink 函数,这个比较关键,这是将 SinkURI 传入 deployment 环境变量的关键一步,