event flows

Knative 提供了两种模型来定义事件流:Sequence 和 Parallel。前者是串型的,一个函数产出的 event 是下一个函数的输入;后者是并行的,多个函数可以收到同一个事件。下面会结合实例来说明

Sequence

以下 demo 将3个 knative service 通过 event 串联起来

创建 knative service

先创建好3个应用

$ cat services.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: first
spec:
  template:
    spec:
      containers:
      - image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/appender@sha256:e9ed1c369ba9a15675c12a58187b177089dfeeb5aecd406343dea7f1cbbcc6dd
        env:
        - name: MESSAGE
          value: " - Handled by 0"
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: second
spec:
  template:
    spec:
      containers:
      - image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/appender@sha256:e9ed1c369ba9a15675c12a58187b177089dfeeb5aecd406343dea7f1cbbcc6dd
        env:
        - name: MESSAGE
          value: " - Handled by 1"
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: third
spec:
  template:
    spec:
      containers:
      - image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/appender@sha256:e9ed1c369ba9a15675c12a58187b177089dfeeb5aecd406343dea7f1cbbcc6dd
        env:
        - name: MESSAGE
          value: " - Handled by 2"

$ kubectl apply -f services.yaml

appender 镜像的源码见:https://github.com/knative/eventing-contrib/tree/release-0.17/cmd/appender,基本功能就是收到 event 后,将环境变量 MESSAGE 内容追加到 event message 字段并返回 event

创建 sequence

上述命令对应的完整流程是:

  • 创建了 sequence

  • eventing-controller 根据 sequence 描述创建了对应的 imc 及其 svc

验证

下面通过 pingsource 来触发完整 sequence

上述命令对应的完整流程是:

  • 创建了 pingsource

  • mtping 每分钟发送事件到 sink,也就是 sequence-kn-sequence-0-kn-channel

  • 实际上就是发送到了 imc-dispatcher

  • imc-dispatcher 查看对应的 subscriber,发送到 first pod

  • first pod 收到 event 后,追加内容到 event message,再返回 event 到 imc-dispatcher

  • imc-dispatcher 转发 event 到 sequence-kn-sequence-1-kn-channel

  • 接下来的流程基本是上述流程的重复,最终 event 会抵达 third pod

Parallel

以下 demo 将同一个事件发送到两个处理函数

创建 knative service

先创建好4个应用

filter-nodejs 和 transformer-nodejs 源码没有放到 github 上,代码可以去镜像里确认,不过是 nodejs 的。这里说下大致的功能:

  • filter 判断 FILTER 环境变量,如果为 true,则返回 event;如果为 false,则只返回 200 响应

  • transformer 读取 TRANSFORMER 环境变量,并将其填充到 event message 发出

创建 parallel

这里复用一下 event-display

上述命令对应的完整流程是:

  • 创建了 parallel

  • eventing-controller 依据 parallel 描述创建了对应的 imc 及其 svc

验证

下面通过 pingsource 来触发完整 parallel

上述命令对应的完整流程是:

  • 创建了 pingsource

  • mtping 每分钟发送事件到 sink,也就是 odd-even-parallel-kn-parallel-kn-channel

  • 实际上就是发送到了 imc-dispatcher

  • imc-dispatcher 查看对应的 subscriber,发送到 even-filter 和 odd-filter

  • 两个 filter 收到 event 后,根据 event time 判断是否返回 event

  • imc-dispatcher 收到返回的 event 后,再根据 event 源查找对应的 replyUri 转发到下游 transformer 函数

  • transformer 函数加上相应信息,发送 event 到 event-display

  • 可以看到 event-display 间隔收到两个 transformer 发送的 event

Last updated

Was this helpful?