为 Serverless 平台插上腾飞的翅膀--自定义 Faas 事件驱动事件源
我们都知道,Faas 最主要的两个特征是事件驱动
与自动扩速容(支持缩容到零)
,本文就带你研究 Faas 事件驱动的进阶开发:如何为你的 Faas 平台 添加自己的事件驱动源。
注意:仅适用于云原生的 Faas 平台,即基于 k8s 的 Faas 平台,因为本文的所有逻辑都是基于 k8s 的资源来进行操作的。
1. 基本原理

解释图中的几个概念
your Source CR: 这个是 k8s 的自定义资源 (CRD),CR 中主要包含事件源的相关字段,以及接收方的信息,(如果你是一个定时器的事件源,那么包含事件消息,事件的生成时间,接收方的地址等)如果你不知道 k8s 的 CRD 是什么,可以参考 k8s 官网 [https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/]
Adapter
:这个是需要自己开发的,是k8s
的Deployment
,主要是用来生成事件,或者将其他信息源转为你的Faas
平台统一的消息格式 (比如统一转为Serverless
的标准格式CloudEvents
),即事件生成器或者事件适配器。消息如何产生:
adapter
自己生成:这种情况是没有外部输入的,adapter
自己生成消息数据,转化为统一的消息格式 (如CloudEvents
)基于外部信息源:如果是基于外部信息的事件源,则根据需要主动去外部轮询
pull
指定接口或者被动监听相关数据,然后转化为统一的消息格式 (如CloudEvents
)
消息如何发出:
adapter
中需要有环境变量指定接收方的地址,图中 key 为 SinkURL 的环境变量就指定了消息接收方的地址,这里可以是k8s
的service
的集群内域名(yoursvc.yournamespace.svc.cluster.local
) 或者knative service
的url
controller
:这个是需要自己开发的,k8s
的自定义controller
,主要作用是将 CR 资源 生成Adapter
Deployment
开发流程
介绍完 基本原理之后,下面就带你研究如何一步步开发自己的事件驱动事件源。
前提条件:熟悉 kubernetes 开发,熟悉 Go 开发, 如果能利用 Ko 什么?云原生开发你竟然不知道 “Ko”,那更好不过了
代码参考 https://github.com/knative-sandbox/sample-source
1. API 定义
定义 事件源 CRD 结构体, CRD 结构体包含 事件驱动事件源的必要信息,如 duckv1.SourceSpec
(包含 Sink )定义了事件接收方的地址
# pkg/apis/samples/v1alpha1/samplesource_types.go
// +genclient
// +genreconciler
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +k8s:openapi-gen=true
type SampleSource struct {
metav1.TypeMeta `json:",inline"`
// +optional
metav1.ObjectMeta `json:"metadata,omitempty"`
// Spec holds the desired state of the SampleSource (from the client).
Spec SampleSourceSpec `json:"spec"`
// Status communicates the observed state of the SampleSource (from the controller).
// +optional
Status SampleSourceStatus `json:"status,omitempty"`
}
// SampleSourceSpec holds the desired state of the SampleSource (from the client).
type SampleSourceSpec struct {
// inherits duck/v1 SourceSpec, which currently provides:
// * Sink - a reference to an object that will resolve to a domain name or
// a URI directly to use as the sink.
// * CloudEventOverrides - defines overrides to control the output format
// and modifications of the event sent to the sink.
duckv1.SourceSpec `json:",inline"`
// Interval is the time interval between events.
//
// The string format is a sequence of decimal numbers, each with optional
// fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m". Valid time
// units are "ns", "us" (or "µs"), "ms", "s", "m", "h". If unspecified
// this will default to "10s".
Interval string `json:"interval"`
}
// SampleSourceStatus communicates the observed state of the SampleSource (from the controller).
type SampleSourceStatus struct {
duckv1.Status `json:",inline"`
// SinkURI is the current active sink URI that has been configured
// for the SampleSource.
// +optional
SinkURI *apis.URL `json:"sinkUri,omitempty"`
}
结构体中 Spec
主要用于定义事件源的期望状态,status
中定义事件源的实际状态,status
由 controller
来同步。
这是一个定时事件源,spec
中主要定义了 定时器的时间间隔 interval
,接收方的地址duckv1.SourceSpec.Sink
;status
中主要定义了事件源 CR 的状态,(是否Ready),接收方的地址 SinkURI
其中,status
的状态由 controller
中对应资源的 Reconciler
来更改,可以通过下面的函数来对 status
进行更改
# pkg/apis/samples/VERSION/samplesource_lifecycle.go
// InitializeConditions sets relevant unset conditions to Unknown state.
func (s *SampleSourceStatus) InitializeConditions() {
SampleCondSet.Manage(s).InitializeConditions()
}
...
// MarkSink sets the condition that the source has a sink configured.
func (s *SampleSourceStatus) MarkSink(uri *apis.URL) {
s.SinkURI = uri
if len(uri.String()) > 0 {
SampleCondSet.Manage(s).MarkTrue(SampleConditionSinkProvided)
} else {
SampleCondSet.Manage(s).MarkUnknown(SampleConditionSinkProvided, "SinkEmpty", "Sink has resolved to empty.%s", "")
}
}
// MarkNoSink sets the condition that the source does not have a sink configured.
func (s *SampleSourceStatus) MarkNoSink(reason, messageFormat string, messageA ...interface{}) {
SampleCondSet.Manage(s).MarkFalse(SampleConditionSinkProvided, reason, messageFormat, messageA...)
}
2. Controller
其中 controller
的部分与之前的文章 如何基于 Knative 开发 自定义controller 类似,想深究的小伙伴可以翻看一下那篇文章,此处不做过多的详细解析。
controller 入口
#cmd/controller
import (
// The set of controllers this controller process runs.
"knative.dev/sample-source/pkg/reconciler/sample"
// This defines the shared main for injected controllers.
"knative.dev/pkg/injection/sharedmain"
)
func main() {
sharedmain.Main("sample-source-controller", sample.NewController)
}
sharedmain.Main
会传入 controller
的实例化方法
#pkg/reconciler/sample/controller.go
func NewController(
ctx context.Context,
cmw configmap.Watcher,
) *controller.Impl {
// 借助 injection 从 context 中获取 informer
deploymentInformer := deploymentinformer.Get(ctx)
sampleSourceInformer := samplesourceinformer.Get(ctx)
// 实例化 Reconciler
r := &Reconciler{
dr: &reconciler.DeploymentReconciler{KubeClientSet: kubeclient.Get(ctx)},
// Config accessor takes care of tracing/config/logging config propagation to the receive adapter
configAccessor: reconcilersource.WatchConfigurations(ctx, "sample-source", cmw),
}
if err := envconfig.Process("", r); err != nil {
logging.FromContext(ctx).Panicf("required environment variable is not defined: %v", err)
}
// 实例化 controller.impl 返回 供 controller 框架调用
impl := samplesource.NewImpl(ctx, r)
r.sinkResolver = resolver.NewURIResolver(ctx, impl.EnqueueKey)
logging.FromContext(ctx).Info("Setting up event handlers")
// 添加 informer 的hander 函数
sampleSourceInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterControllerGK(v1alpha1.Kind("SampleSource")),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})
return impl
}
3. Reconciler
接下来看一下 Reconciler
的实现, Reconcile
的作用有:
根据
SampleSource
这个CR
资源, 创建Deployment
,Deployment
中运行的进程是Receive Adapter
根据
Receive Adapter
这个Deployment
的状态和接收方的状态,更新SampleSource
的status
对于 创建 Receive Adapter 的过程,下面来看下详细的步骤
组装 Receive Adapter 的参数
# pkg/reconciler/sample/samplesource.go
//1. 组装 Receive Adapter 的参数
raArgs := resources.ReceiveAdapterArgs{
EventSource: src.Namespace + "/" + src.Name,
Image: r.ReceiveAdapterImage,
Source: src,
Labels: resources.Labels(src.Name),
AdditionalEnvs: r.configAccessor.ToEnvVars(), // Grab config envs for tracing/logging/metrics
}
2. 查看需要创建 的 Deployment 存在不存在,如果不存在需要重新创建
# pkg/reconciler/deployment.go
namespace := owner.GetObjectMeta().GetNamespace()
ra, err := r.KubeClientSet.AppsV1().Deployments(namespace).Get(ctx, expected.Name, metav1.GetOptions{})
如果 deployment
已经存在,判断需要更新的情况下更新 Deployment
} else if r.podSpecImageSync(expected.Spec.Template.Spec, ra.Spec.Template.Spec) {
ra.Spec.Template.Spec = expected.Spec.Template.Spec
if ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Update(ra); err != nil {
return ra, err
}
第一章中说明的 Deployment 中 的接收方的地址 SinkURI
是怎么传进去的呢?别急,一步步带你研究
再来回头看下 ReconcileDeployment (sample-source/pkg/reconciler/sample/samplesource.go)
这个函数
ra, sb, event := r.dr.ReconcileDeployment(ctx, src, makeSinkBinding(src),
resources.MakeReceiveAdapter(&resources.ReceiveAdapterArgs{
EventSource: src.Namespace + "/" + src.Name,
Image: r.ReceiveAdapterImage,
Source: src,
Labels: resources.Labels(src.Name),
AdditionalEnvs: r.configAccessor.ToEnvVars(), // Grab config envs for tracing/logging/metrics
}),
)
传入了 SinkBinding
对象,由 makeSinkBinding
构造完成 对象中包含 source
中的 SourceSpec
(这里面看之前的结构体可以知道,这里包含 接收方的地址 SinkURI
)
func makeSinkBinding(src *v1alpha1.SampleSource) *sourcesv1.SinkBinding {
return &sourcesv1.SinkBinding{
ObjectMeta: metav1.ObjectMeta{
// this is necessary to track the change of sink reference.
Name: src.GetName(),
Namespace: src.GetNamespace(),
},
Spec: sourcesv1.SinkBindingSpec{
SourceSpec: src.Spec.SourceSpec,
},
}
}
在ReconcileDeployment 中有个
syncSink
函数,这个比较关键,这是将 SinkURI
传入 deployment
环境变量的关键一步,
syncSink(ctx, binder, expected.Spec.Template.Spec)
func syncSink(ctx context.Context, binder *sourcesv1.SinkBinding, now corev1.PodSpec) {
// call Do() to project sink information.
ps := &duckv1.WithPod{}
ps.Spec.Template.Spec = now
binder.Do(ctx, ps)
}