【图解 Knative】解密 Eventing Broker-Trigger实现原理
1. 背景介绍
我们都知道,Knative 有两个主要的子项目:Serving 和 Eventing。其中 关于 Serviing 可以查看之前的一篇公众号文章 【超详细】深入探究 Knative 扩缩容的奥秘。Eventing 将系统中的服务以事件驱动的方式松耦合的绑定在一起:即事件发送者不关注谁来消费事件,事件消费者也不关注事件是由谁产生的。Eventing 中有多种事件组合的方式,比如:
最简单的
Source->Service直接绑定通过
channel与subscriptions订阅的方式并行事件流处理 的方式
Parallel,以及串行事件流处理的方式Sequence以及支持事件过滤的
Broker & Trigger也是本文将要重点介绍的模式
✅注:对于其他几种事件组合的方式,感兴趣的可以在官网查阅 https://knative.dev/docs/eventing/
2. 工作原理介绍

上图展示的是 Broker & Trigger 模型的中 Event 传递的基本方式,其中事件传递的格式是标准的 CLoudEvent 的格式。简要介绍下图中的流程:
事件由事件源
Source产生, Knative 支持多种事件源,如GitHub、Heartbeats、k8s、ContainerSource等,更多Source可在官网查阅 https://knative.dev/docs/eventing/sources/,也可以自定义Source,参考之前发的一篇公众号为 Serverless 平台插上腾飞的翅膀--自定义 Faas 事件驱动事件源。图中 事件源产生
type为foo的事件,发送到Broker,其中 有三个Trigger绑定了Broker,两个Trigger的filter是type:foo,也就是会关注type=foo的事件,然后发送给对应的消费者。Service1和Service3只是单纯的消费,并不回复事件Service2收到事件后回复type=bar的事件,事件重新传递到 Broker上,此时只有一个 Trigger 过滤了type:bar事件被传送到Service3消费。其中消费者是否回复事件是可选的。
关于图中的实例 yaml,可参考
附录章节
3. 底层实现原理
上面介绍了 Broker & Trigger 的工作原理,现在从底层实现的角度进一步讲解。
Knative Eventing 事件传递过程中依赖消息通道 Channel,为了便于持久化,生产环境使用可持久化的消息通过( 本文以 NatsStreaming 为例) 来做事件的消息通道,如果是开发调试,则直接用 InMemoryChannel 即可(只会保存在内存中,不会持久化)
先上图

图中包含控制平面与数据平面,图中箭头:控制平面为实线,数据平面为虚线
实线的方框为 Knative 组件,虚线的方框为 k8s CR 资源
下面分别从数据平面和控制平面分别讲解
3.1 数据平面
EventSource------> Broker------>Trigger------->SubScriber
1. EventSource------> Broker
Broker 可以手动生成,或者通过给 namespace 打 label eventing.knative.dev/injection:true ,让sugar-controller 会自动生成对应的 Broker 实例
EventSource 一般通过 SinkURI 环境变量将 Broker 的 地址传入 。比如,Broker 的地址为 http://broker-ingress.knative-eventing.svc.cluster.local/default/default,该地址为broker-ingress 的地址,而 broker-ingress通过 请求 URL的 path 可以得到 Broker 的信息,比如/default/default表示 default namespace 下名为 default 的 Broker
2.Broker------>Trigger
broker-ingress 得到 Broker 的信息(name namespace)之后,可以得到 Broker 的 status 信息,如下得到 channelAddress 的地址(http://default-kne-trigger-kn-channel.default.svc.cluster.local)
channelAddress 的地址是个svc 的地址,通过 externalName 指向 natss-ch-dispatcher
natss-ch-dispatcher 负责将 消息(主题为channel.Name + "." + channel.Namespace) 发布到 Natss-Streaming 组件
3. Trigger------->SubScriber
natss-ch-dispatcher 不仅负责发布,还负责订阅消息,natss-ch-dispatcher watch natssChannel (见下面的 NatssChannel), 获取 natssChannel的 subscriber的地址 subscriberUri,通过 subscriberUri发送消息给 broker-filter ,跟 broker-ingress 一样,subscriberUri 的地址是broker-filter 的 地址,通过请求 path 区分哪个 trigger ,请求 path :/triggers/<trigger namespace>/<trigger name>/<trigger UID>
broker-filter 获取到 Trigger 的信息后,通过 根据 Trigger 的filter 将消息过滤,再决定是否将消息发给 对应的 subscriber, subscriber可以从 Trigger status 的 subcriberUri 获取到,见下图,对于subscriber Reply 的消息,broker-filter 发送到 replyUri 地址上,http://broker-ingress.knative-eventing.svc.cluster.local/default/default ,也就是发送给 Broker (实际是 broker-ingress)。
至此数据面的通信流就完成了,接下来看控制面的数据流。
3.2 控制平面
控制平面主要看上图中的实线部分,此处再贴一下图

接下来按流程讲解
1.1
mt-broker-controllerwatchBroker的创建1.2
mt-broker-controller根据Broker的配置,创建对应的Channel,此处为NatssChannel1.3
Natss-ch-controllerwatchNatssChannel的创建,更新Natss-Streaming的服务端状态到NatssChannel的 status1.4
Natss-ch-controller创建 svc,externalname指向natss-ch-dispatcher1.5
mt-broker-controllerwatchNatssChannel的status1.6
mt-broker-controller更新Broker 的status,其中包含:Broker的address(broker-ingress的地址),channel的address,供broker-ingress使用2.1
mt-broker-controllerwatchTrigger2.2
mt-broker-controller根据Trigger(含 subscriber 的信息)创建subscription,其中包含subsciber(broker-filter)的地址和 Broker的信息
2.3
eventing-controllerwatchsubscription,解析subcriber和replyUri的地址(broker-ingress的地址)2.4
eventing-controller根据subcription更新Natss-Channel:subsriberUrl和replyUrl
2.5 mt-broker-controller 根据 subscription 和 broker 的状态 更新 Trigger 的状态
控制面的逻辑就完了,和数据面的逻辑结合的地方在于:
broker-ingress 和 broker-filter :
broker-ingresswatchBroker,从Broker的status中获取channel的地址(natss-ch-dispatch的svc的地址)broker-filterwatchTrigger,根据 trigger 中subscriber 地址,将event filter 后决定是否发送到 对应的 target
至此,整个流程都讲完了,可以按照附录中的资源,创建一下,看看其中的资源。
附录
broker.yaml
2. trigger1.yaml
2. trigger2.yaml
3. trigger3.yaml
4. service1.yaml
5.service3.yaml (service1和service3 一样都是打印接收到的 event)
6. service2.yaml
Last updated