对码当歌,猿生几何?

Flink开发中遇到的问题及解法

1. 当Source是Kafka的时候,如何设置Source Operator的并发度?

如果没有指定,Source Operator的个数与集群中的TaskManager的个数相等。如果手动设置,建议使用的slot个数=Kafka Partition的个数/TaskManager的个数。此时,Slot的个数需大于等于2.因为其中有一个Source Operator。也不建议在一个Slot中启用多线程。

2. Barrier如果丢失了怎么办?

因为Barrier是从Source开始周期性的发送的,所以过一段时间未被标记为阻塞的input channel会收到下一个checkpoint的barrier,这时Flink会进行比对,发现如果当前的检查点没有完成,但下一个checkpoint已经过来了,那么Flink会放弃当前的checkpoint,转而使用下一个checkpoint。

3. 在Flink UI上Cancel Job,Job所有的任务都会停止吗?

答:不是。Cancel按钮只是把Source,Transform和Sink这些Operator停掉,对应的线程停掉。但整个TaskManager还在。所以,如果Job中如果有不在Operator中初始化Spring容器,那么即便Cancel Job以后,这些对象依然存在。所以,正确的姿势是在Operator的open()方法中初始化Spring容器。在close()方法中释放这些资源。

4. Job运行过程中TaskManager挂了怎么办?

如果TaskManager挂了,Flink会先将Job cancel掉。然后再以相同的JobID,往集群中仍然存活的TaskManager上部署Job,这时候,如果还有足够的task slot,则Job能够恢复。但是这时候会有一个问题:部署在某些TaskManager上的Task数会比之前多,造成了这些TaskManager的负载较重,可能还是会出现问题。这时候就需要尽快恢复挂掉的TaskManager。

5. 某条数据在Input Channel之间传输失败了怎么办?

会抛出Exception,然后Job会重启。

6. Flink读取Kafka时,Checkpoint设置多久合适?

快照本身都是非常轻量级的,一般都在几M或者几十M。如果快照过大,比如几百M甚至更多,就会对程序运行产生影响。官方给出的例子是几秒钟一次,具体可视Job情况决定。

7. Checkpoint和Savepoint有什么区别?

savepoint可以理解为是一种特殊的checkpoint,savepoint就是指向checkpoint的一个指针,需要手动触发,而且不会过期,不会被覆盖,除非手动删除。正常情况下的线上环境是不需要设置savepoint的。除非对job或集群做出重大改动的时候,需要进行测试运行。

8. Flink的Operator不能带成员变量?

Flink operator function中不能带没有实现flink序列化的成员变量。因为flink本身自己有一套序列化方式,在任务提交执行的时候会有validation,如果把没有实现flink序列化的类作为成员变量,就会提交任务报错。目前的解决方案是将operator function与实际业务逻辑分离。或将成员变为static。

9. 每个TaskManager设置多少个TaskSlot合适?

建议为CPU核数个。

10. TaskManager中的BufferPool不够了咋办?

需要增大配置项:taskmanager.network.numberOfBuffers的值,该值表示网络栈buffer的数量,它的大小表示在同一时刻该TaskManager能够拥有的流处理的数据交换的channel数。

11. Job运行中出现了OOM

说明保留的空间不够,这时需减少中间层的空间大小,通过配置降低taskmanager.memory.fraction的值来减少中间层的内存占比。该值表示Flink用于管理底层buffer所占用的内存比例。

12. Job的并行度如何设置?

将所有的transform operator和sink operator的parallism设置成一样的,source operator的parallism根据source而定。这样的话,flink会自动把transform operator和sink operator 都merge成一个piple line去运行。那么这时候一个job就变成只有两个operator了,source operator和merge后的operator,这个pipeline operator中间就没有buffer了,性能最优。