如何计算Flink集群规模:信封背计算法

2017年柏林Flink Forward大会上Robert Metger的”Keep It Going: How to Reiably and Efficiently Operate Apache Flink”的演讲很受欢迎。Robert的其中一个主题演讲涉及到了如何估算Flink集群规模。Flink Forward大会的观众们认为这个计算方法对他们很有用,因此我们把他的演讲主题转变成这篇博客。

Flink社区上经常被问起的一个问题是当从开发转到线上时,如何估算集群规模大小。当然,最准确的的答案是根据需要,但是这并没有什么用。这篇博客提出了一系列问题使你能够计算出一些基准。

1 通过数学建立基准

首先,思考一下你的应用操作需要的资源基准的指标。

关键指标如下:

  • 每秒的数据量和每条数据的大小
  • 去重key的数量和每个key的state大小
  • state更新的数量和state backend的方式

最后,考虑一下你的服务等级协议(SLAS),比如宕机时间、延迟和最大的吞吐量。这些指标将直接影响你的容量计算。

接下来,看一下基于预算可用的资源大小。比如:

  • 网络容量,需要考虑外部服务的网络消耗,比如Kakfa、HDFS等。
  • 磁盘带宽,比如你使用磁盘的state backend,比如RocksDB。同时需要考虑外部服务的磁盘使用,比如Kafka、HDFS等。
  • 机器的数量和它们的CPU和内存。

基于上述这些因素,你现在能够估算正常流程的的资源基准。另外,还需要增加一些资源用作异常的恢复和checkpointing。

2 样例:计算

我现在通过一个集群上的虚拟job部署来描述整个资源基准的建立过程。信封背计算法的所用到的数字是不精准的,同时并没有考虑的很全面。在后面,我会指出在做计算时的忽视的一些点。

2.1 Flink流式应用样例和硬件

Example Flink Streaming job topology

在这个案列中,我将部署典型的Flink流式应用,Kafka Topic的数据作为数据源。这个流接着使用keyed, aggregating window操作转换。窗口操作执行5分钟的窗口聚合。同时假设有源源不断的数据进来,window被设置成1分钟滑动一次。

这表示每分钟执行一次过去5分钟内的窗口聚合。这个流式应用根据userId字段进行聚合。Kafka Topic中消息的大小平均是2KB。

吞吐量是每秒100万条消息。为了理解窗口操作的state大小,你需要知道distinct Keys的数量,就是userIds的数量,这边大约是5000万个不同的用户ID。对于每个用户,你需要计算4个数字,通过longs(8 byte)存储。

现在,让我们总结一下这个任务的关键指标:

  • 消息大小:2KB
  • 吞吐量:1000000 msg/sec
  • Distinct Keys: 500000000(窗口聚合:每个key4个long大小)
  • Checkpointing: 每分钟一次

— 硬件:

  • 5台机器
  • 10 gigabit 以太网
  • 每台机器运行一个Flink TaskManager
  • 磁盘通过网络挂载

— Kafka 独立部署

假设硬件步骤

总共有5台机器运行这个job,每台机器上运行一个TaskManager。磁盘通过网络挂载,同时有10 gigabit的以太网接入。同时Kafka是独立部署在其他机器上。

每台机器有16CPU核。为了简化的需要,这边不考虑CPU和内存的使用情况。在实际情况下,你需要根据应用逻辑和state backend的使用,来考虑内存的使用。这个例子使用RocksDB state backend。(它是健壮的,同时对内存需求比较低)。

2.2 单机计算

为了理解整个job运行部署的资源需求,最容易的方式是关注单台机器和TaskManager的操作。你可以通过单台机器计算出来的数字来推断整个集群的资源需求。

默认(所有的操作都有并行度和没有特殊的调度限制)所有的操作在每台机器上都有运行。

在这个例子中,Kafka source, 窗口操作和Kafka sink都运行在每台机器上。

A machine perspective - TaskManager n

keyBy是一个分离的操作,因此资源需求计算比较容易。在现实中,keyBy是一个API,连接了Kafak Source和窗口操作。

我现在将从头到底理解这些操作的网络资源需求。

2.3 Kafka source

为了计算Kafka source收到的数据量,首先需要计算Kafka的聚合输入。sources每秒收到100万消息,每条消息2KB大小。
$$
2KB \times 1000000/s = 2GB/s
$$
2GB/s除以5台机器,得到如下结果:
$$
2GB/s \div 5 machines = 400MB/s
$$
集群中每台机器上TaskManager的source收到400MB/s的数据。

Kafka source calculation

2.4 Shuffle / keyBY

接下来,你需要确保同一个key的所有事件落在一些机器上。这边,你从kafka中读取的数据可能被重新分区。
shuffle过程发送所有拥有相同key的数据到同一台机器,因此这边把400MB/s的数据分割成一个根据userId分区的流。
$$
400MB/s \div 5 machines = 80MB/s
$$
平均来看,你将发送80MB/s的数据到每一台机器。这个分析是从单台机器的角度,但是一些数据已经在目标机器上了,因此要减去80MB/s。
$$
400MB/s - 80MB/s = 320MB/s
$$
shuffle calculation

2.5 Window Emit and Kafka Sink

接下去的问题是窗口操作发送多少数据到Kafka Sink。结果是67MB/s,让我们看一下如何计算。
窗口操作为每个key保持了4个数字(longs)聚合。每一分钟,操作将发送当前的聚合值。每个key发送2ints(user_id, window_ts)和4 longs。
$$
(2 \times4bytes) + (4\times8bytes) = 40 bytes , per, key
$$
然后乘以keys数量(500000000除以机器数量)
$$
500000000 \div5machines \times40bytes = 40GB
$$
然后计算每秒的大小:
$$
40GB/min \div60=67MB/s
$$
这表示每个TaskManager从窗口操作中平均发送67MB/s的用户数据。因为Kafka sink运行在每个TaskManager上,所以没有进一步的分区操作。这就是从Flink到Kafka的发送的数据量。

User data: From Kafka, shuffled to the window operators and back to Kafka

从窗口操作中得到的数据每分钟会发送一次。在实际中,这个操作不会发以67MB/s的发送数据,而是在一分钟之内的几秒间到达最大带宽。

现在,总结一下:

  • 进来的数据:720MB/s(400+320)per machine
  • 出去的数据:387MB/s(320+67)per machine

2.6 State和Checkpointing

到目前为止,我们仅仅计算了Flink处理的用户数据。你同时还需要考虑磁盘的使用,比如存储state 和checkpointing。为了计算磁盘的花销,你需要查看窗口计算如何进入state。Kafka Source也需要保持一些state,但是跟窗口操作的state相比,可以忽略不计。

为了理解窗口操作的state大小,让我们换一个角度看这个问题。Flink计算5分钟的时间窗口,并且1分钟滑动一次。Flink是通过保持5个窗口来实现滑动窗口。根据先前提到的,在使用窗口时,你需要为每个窗口保持40bytes的状态,并且窗口是提前聚合的。对于每一条到来的事件,你首先需要取出当前聚合值,再更新聚合值,然后把新值写回去。

Window State

这意味着:
$$
40 , bytes ,of ,state \times 200000 msg/s , per , machine = 40MB/s
$$

有40MB/s的磁盘读写(每台机器上)。根据先前说的,磁盘是通过网络挂载的。因此需要在先前的基础上增加这个值。

现在总共需要的资源如下:

  • 进入的数据:760MB/s(400MB/s data in + 320MB/s shuffle + 40MB/s state)
  • 出去的数据:427MB/s(320MB/s shuffle + 67MB/s data out + 40MB/s state)

上述的计算考虑了state的进入,当事件到达窗口操作时触发。你还需要checkpoint和容错机制。如果一台机器或者其他任何东西挂掉,你想要恢复你的窗口并继续处理。

Checkpointing是每隔1分钟执行一次,并且每个checkpoint复制整个job的状态到通过网络挂载的文件系统。

让我们快速的看一下每台机器的state大小:
$$
40 , bytes , of , state \times 5 , windows \times100000000 , keys = 20GB
$$
接着算每秒的值:
$$
20GB \div 60 =333MB/s
$$
和窗口操作类似,checkpointing也是每分钟执行一次。它尝试全速发送数据到外部存储。Checkpointing引起了额外的state进入。自从Flink1.3后,RocksDB支持增量checkpointing来降低每次checkpoint时所需的网络传输。

计算更新如下:

  • 进入的数据:760MB/s(400 + 320 + 40)
  • 出去的数据:760MB/s(320 + 67 + 40 + 333)

Window State

这意味着整个集群网络流量是:
$$
(760 + 760)\times5 + 400 + 2335=10335MB/s
$$
400是80MB的state读写乘以5台机器。2335是Kafka进和出的总值。

整个硬件的网络可用容量如下:

Networking requirements

这边我要加一个免责声明。上述这些计算没有包含协议的花销,比如TCP、Ethernet和RPC(在Flink、Kafka和HDFS等中)。但是上述的计算仍旧对如何计算一个job的资源有指导意义。

Scale Your Way

基于我的分析,这个例子中,5个节点的集群,在典型的操作中,每个机器需要处理760MB/s的数据进出,同时每台机器可以处理的容量是1250MB/s。这样保留了40%的网络容量来应对我刚才提到的复杂度,比如网络协议花销,事件重放,数据倾斜引起的不平均的负载等。

当然,没有一个标准答案来说明留40%的余量是否合适。但是这个算法可以给你一个好的开始。尝试上述的计算,修改上述的参数为你自己的参数。Happy scaling!

翻译源

How To Size Your Apache Flink® Cluster: A Back-of-the-Envelope Calculation

Author: MrHook
Link: https://bigjar.github.io/2019/11/08/%E5%A6%82%E4%BD%95%E8%AE%A1%E7%AE%97Flink%E9%9B%86%E7%BE%A4%E8%A7%84%E6%A8%A1%EF%BC%9A%E4%BF%A1%E5%B0%81%E8%83%8C%E8%AE%A1%E7%AE%97%E6%B3%95/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.