解读 EventBridge Transform:数据转换和处理的灵活能力

作者:木则

阿里云 EventBridge 提供了强大而灵活的事件总线服务,它可以连接应用程序、阿里云云服务和阿里云 Serverless 服务来快速构建 EDA(Event-driven Architectures)事件驱动架构,驱动应用与应用,应用与云的连接。除此之外,它还可以作为流式的数据管道,在不同的数据仓库和数据处理或分析程序之间快速构建 ETL 系统。

本文将从以下几个方面展开对阿里云 EventBridge Transform 能力的介绍:

1)首先介绍 ETL 基本概念;

2)接着介绍 T(Transform)的能力;

3)最后探讨 EventBridge Transform 能力及落地场景。

什么是 ETL?

ETL 表示的是数据提取(Extract)、转换(Transform)和加载(Load)的过程,是数据集成的核心任务。三个步骤的主要作用如下:

1. 提取: 从数据源中提取数据,数据源可以是各种数据存储系统,比如消息队列、数据库等。

2. 转换: 对提取的数据进行转换操作,比如数据富化、数据清洗、数据聚合、数据拆分、格式转换等。

3. 加载: 将经过转换后的数据加载到目标服务中,比如数据仓库、数据湖、BI 系统等。ETL 应用广泛,它可以帮助企业管理和利用数据,实现数据驱动的决策和业务转型。

图片

Transform 应用场景

ETL 中的 T(Transform)可以对提取的数据进行转换操作,它具体的使用场景如下:

  • 数据富化

    调用外部服务获取额外信息丰富原始数据,提高数据的完整度和可应用性。

  • 数据清洗

    对原始数据进行清洗或验证,去除重复、缺失或者不准确的数据,确保数据的质量和准确性,或者对数据中的信息进行脱敏,确保 数据的安全性。

  • 数据聚合

    将多条原始数据进行合并,形成一个统一的数据视图,便于后续的快速分析和查询。

  • 数据拆分

    将单条原始数据根据业务需求拆分为多条数据。

  • 数据格式转换

    将上游数据转换为目标服务可接受的格式,比如将 Base64、Avro、PB 等格式的原始数据统一转换为 json 格式。

通过 Transform,可以将原始数据转化为一致性、准确性和安全性兼具的高质量数据,为后续的数据分析等操作提供可靠的基础。

业界 Transform 架构概述

目前业界的 Transform 能力,常见的做法有以下几类:

  1. 内置开箱即用的简单且轻量的 Transform 能力:

a. 数据清洗:去除数据中的敏感字段、处理噪音数据等;

b. 数据格式转换:将数据中的指定字段转换为特定格式。

  1. 内置 Custom Transform 能力:

用户可自定义 Transform 的逻辑。这种常见的做法是:用户根据 Custom Transform 的接口规范,实现接口并将实现的代码打成 jar 包,之后在系统导入该 jar 包即可使用自己编写的 Transform 逻辑。

  1. Remote Custom Transform 能力:

通过 Remote 调用的方式调用外部系统对数据进行 Transform。

上述 1、2 两种做法,由于其 Transform 与系统逻辑高度耦合,共享计算资源,并不太适合在 Transform 中进行重量级计算,仅适合应用在一些轻量、简单的业务场景。更优的做法是 Remote Custom Transform,它解耦了 Transform 业务逻辑与数据通路,更具灵活性。

阿里云 EventBridge Transform 设计

阿里云 EventBridge 通过集成阿里云函数计算实现了 Custom Transform 能力,通过 Remote 调用的方式将 Transform 业务逻辑与数据通路解耦。提高了 Transform 的灵活性,降低计算资源的挤兑风险。

链路架构

使用阿里云的函数计算进行 Transform 时,EventBridge 的整体链路如图所示。

  1. EventBridge 从 Source 侧提取数据。

  2. 提取的数据,先经过攒批(window)逻辑的处理,达到攒批条件后,数据将以批的方式交由下一步处理。

  3. 过滤(Filter)会遍历每一条数据,判断是否要丢弃该条数据。过滤完成后,数据将仍以批的方式交由下一步处理。

  4. 转换(Transform)阶段会调用函数计算,将数据交由用户编写的函数代码进行处理,Transform 阶段会等待函数执行完成并接收其返回的处理结果。

  5. EventBridge 将 Transform 处理后的数据加载到 Sink 侧。

图片

下文在此基础之上继续探讨链路中涉及的几个关键问题。

攒批问题

攒批可以批量聚合多条数据,在达到攒批条件后再将数据批量推送给下一步进行处理。EventBridge 将攒批能力置于 Transform 之前,通过攒批能力提升了数据的处理效率和吞吐量,并且显著降低 Transform 调用函数计算的次数。

EventBridge 从数量和时间两个条件来控制攒批的行为,只要达到其中一个条件时就会触发批量推送。

  • 批量推送条数: 单次可聚合的最大数据条数。
  • 批零推送间隔: 聚合的间隔时间,系统每到间隔时间会将已聚合的数据批量推送给下一步。
高可用问题

Transform 处理数据时可能出现异常,为避免异常导致数据丢失或影响链路的稳定性和可用性等。Transform 复用了 EventBridge 的重试、死信、容错等机制。

  • 重试机制

    由于网络异常、系统 crash 等原因导致 Transform 处理异常时,EventBridge 会按照用户选择的重试策略进行重试,目前支持退避重试、指数衰减重试两种方式。

  • 死信队列

    当数据超过重试次数后仍未 Transform 成功时,会变成死信数据。如果不希望死信数据被丢弃,用户可以配置死信队列,所有的死信数据会被 EventBridge 投递到死信队列中,目前 EventBridge 支持 Kafka、RocketMQ、MNS 作为死信队列的目标端。

  • 容错策略

    当 Transform 发生错误时,EventBridge 提供了以下两种处理方式:

    • 允许异常容错:当 Transform 异常发生时不会阻塞执行,会继续处理后续的数据。但是,EventBridge 会重试发生异常的数据,在超出重试策略后根据配置将数据投递至死信队列或直接丢弃。
    • 禁止容错:不允许错误,当 Transform 异常发生且超过重试策略配置时会阻塞执行。
费用问题

函数计算的调用和函数的执行会产生一定费用,包含函数调用、资源使用(CPU、Mem 等)和公网出流量三部分的费用。为减少函数计算产生的费用,函数计算定向减免了来自 EventBridge 的函数调用次数费用,即 EventBridge 触发函数计算产生的函数调用次数不再计入费用账单[3,4]。

产品交互

目前可在 EventBridge 的事件流中体验 Transform 能力,如图所示。

图片

对于阿里云函数计算来说,我们提供了两种方式:

1. 新建函数模板: 可在提供的模板之上,直接创建函数。产品层面提供了简易的 IDE,便于用户编写和调试代码。

图片

2. 绑定现有函数: 支持绑定用户已有的函数。

图片

更详细的使用可参考 Transform 帮助文档,见附录[4]。

Transform 优势

  • Serverless Transform 特性

    EventBridge Transform 基于 Serverless 函数计算构建,可享受 Serverless 服务免运维、资源弹性伸缩、按量付费等特性,具体如下:

    • 弹性:百毫秒内级别的伸缩,可满足波峰波谷、Burst、持续稳定等多样化的负载场景。
    • 免运维:用户无需关心和运维 Transform 运行环境及资源。
    • 按量付费:用户只需支付函数运行所产生的费用,更重要的是 EventBridge 调用函数所产生的调用次数费用将不计费。
  • 灵活性

    UDF 的方式可满足实际业务中复杂、个性化的需求。

  • 多语言支持

    支持 go、python、java、nodejs 等主流语言,可选择熟悉或适合的语言实现 Custom Transform 逻辑。

  • 架构解耦

    Remote Transform 的架构将 Transform 业务逻辑和系统逻辑结解耦,资源隔离,避免产生资源争抢等问题。

  • 模版支持

    产品层面提供了多种 Transform 函数模板,避免用户从零开始。

  • 攒批提效

    通过攒批,函数的入参为批量的消息,大幅提升了消息的处理效率和吞吐。

客户场景案例介绍

数据格式转换+架构升级:消息(MNS)->Transform->消息(RocketMQ)

客户面临架构升级问题,希望将系统依赖的 MNS 升级为 RocketMQ,但系统架构复杂,依赖 MNS 逻辑较多,且牵涉研发人员较多,预计全部升级架构需持续几个月时间。为保证架构升级过程中产生的数据一致性问题,客户使用 EventBridge 将旧架构的 MNS 消息实时同步到新架构的 RocketMQ 实例中,来保证数据在一致性。同时为了适配新架构中的消息设计,客户使用 FC Transform 先将旧消息转换为目标格式,再投递至 RocketMQ 中。

数据清洗+数据转储:消息(RocketMQ)->Transform->OSS

客户会将用户产生的视频数据投递到 RocketMQ 中,这些数据用户是可以查看的。为此客户选择 OSS 来进行文件存储,满足这种写多读少、低成本存储数据的场景。但是,视频数据中包含了若干敏感信息,为此客户使用 FC Transform 对视频中的敏感数据做清除后,再将视频投递到 OSS 中。

总结与展望

EventBridge Transform 通过集成函数计算,满足了实际业务中复杂、个性化的需求。其弹性伸缩、免运维、按量付费的特性深受客户青睐。未来 Transform 会通过集成更多的服务(阿里云工作流、HTTP Destination 等)解锁更多的业务场景,满足多样化需求。

相关链接:

[1] EventBridge-事件流-事件内容转换

https://help.aliyun.com/zh/eventbridge/user-guide/transform/?spm=a2c4g.11186623.0.0.501b5750w5RP1Q

[2] EventBridge-事件流产品首页

https://eventbridge.console.aliyun.com/cn-hangzhou/event-streamings

[3] 定向减免消息类产品和云工作流的函数调用次数费用

https://help.aliyun.com/zh/fc/product-overview/targeted-reduction-of-function-call-fees-for-message-products-and-cloud-workflows?spm=a2c4g.11186623.0.0.55c34df85cgbhf

[4] 函数计算计费项降价通知

https://help.aliyun.com/zh/fc/product-overview/function-calculation-billing-item-price-reduction-notice?spm=a2c4g.11186623.0.0.16965893V9CWsH