Kafka 最佳实践
Apache Kafka 是一款开源的分布式 消息引擎系统, 消息引擎系统是一组规范, 利用这组规范在不同系统之间传递语义准确的消息,实现 松耦合的异步式 数据传递。
Kafka 同时支持 点对点模型 和 发布订阅模型,使用的是纯二进制的字节序列,消息是结构化的。
Kafka 所有通信都是基于 TCP
的。
术语
副本
Kafka定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica)。
前者对外提供服务,对外指的是与客户端程序进行交互;而后者只是被动地追随领导者副本而已,不能与外界进行交互。
副本的工作机制是:
- 生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。
- 追随者副本,向领导者副本发送请求,请求领导者把最新生产的消息发给它,保持与领导者的同步。
分区
分区是为了解决伸缩性问题,在其他分布式系统中可能是类似分片的概念。如 ElasticSearch
中的分片,HBase
中的 Region
Kafka 中的分区机制指的是将 每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中。
分区策略有:轮询、随机、自定义等。
消息架构
- 主题层,每个主题可以配置 M 个分区,而每个分区又可以配置N个副本。
- 分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用。
- 消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。
- 客户端,程序只能与分区的领导者副本进行交互。
持久化
Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机I/O操作,改为性能较好的顺序I/O写操作。这个类似于很多其他系统的 WAL(Write-Ahead-Log)。
Kafka 会通过日志段机制定时清理老旧数据,实现回收磁盘的目的。
消费者组
消费者组,指的是多个消费者实例共同组成一个组来消费一组主题。
这组主题中的每个分区都只会被 组内的一个消费者实例 消费,其他消费者实例不能消费它。
主要是为了提升消费者端的吞吐量。可以多个消费者实例同时消费,加速整个消费端的吞吐量(TPS)。
当组内某个实例挂掉,Kafka 能够自动检测然后进行(重平衡)。重平衡的代价很大,会导致整个消费者阻塞。
每个消费者在消费信息的过程中需要有字段记录它当前消费到分区的那个位置,这个字段就是消费者位移(Consumer Offset),这部分是存放在 Kafka 的内部隐藏主题中的。
参数配置
Broker 端参数
auto.create.topics.enable
,是否允许自动创建 Topic。根据需求而定,建议最好设置为 false。
unclean.leader.election.enable
,是否允许 Unclean Leader 选举,建议设置为 false。后果是分区不可用,但不会丢失数据。
auto.leader.rebalance.enable
是否允许定期进行 Leader 选举,选举的代价很大,建议设置为 false。
log.retention.hour
控制消息持久化时间。默认保存 7 天。
log.retention.bytes
控制消息保存的总磁盘容量大小。默认 -1 ,即不限制。建议根据实际容量配置。
message.max.bytes
控制最大消息大小。默认是 1KB 左右。建议根据业务场景调大。
num.io.threads
默认是 8。用于处理请求的线程数,包括磁盘IO,配置为 CPU 总核数即可。
num.network.threads
默认是3。接收网络请求并向网络发送响应的线程数。
log.flush.interval.ms
默认未启用。强制控制刷盘策略,默认情况下使用 fsync() 写入到文件系统缓存即成功。文件系统默认最长时间是 3 秒。
log.flush.interval.messages
默认未启用。强制控制刷盘策略,当写入多少条时进行刷盘。
Topic 参数
Topic 参数会覆盖全局 Broker 参数的值。
retention.ms
Topic 级别保存的时长。默认是7天。
retention.bytes
Topic 级别预留磁盘空间,默认-1,无限制。
JVM 参数
建议使用 JDK8 以上新版本。默认堆是 1GB,通常合理值是 6GB-8GB 可以满足大多数场景,可根据情况稍作调整。
直接无脑使用 G1 GC
,以上可通过环境变量设置。
1 | export KAFKA_HEAP_OPTS="-Xms8G -Xmx8G -server -XX:+UseG1GC" |
可写入到 /etc/profile
设置此环境变量。
注意, 如果你通过
/etc/rc.local
来进行开机自启动 KAFKA,你需要同时在/etc/rc.local
进行环境变量的export
1 | export JMX_PORT=9999 |
操作系统参数
- 文件描述符
- 日志型文件系统
- SWAP
- 刷盘时间
刷盘时间 CentOS7 最长时间为 3000ms,可通过 cat /proc/sys/vm/dirty_expire_centisecs
查看。
压缩算法
在硬件性能保证的前提下,压缩可以提高 Kafka 集群整体吞吐量,优化网络 IO,节省磁盘空间。
Kafka 压缩可能发生在两个地方:生产者端和 Broker 端。
Broker 端默认配置是 producer, 意思是尊重生产者的压缩算法。
所以建议是:生产者启用压缩并选择一种压缩算法,Broker 端保持生产者算法,客户端解压缩。避免二次重复压缩的问题。
Broker 在做消息校验时也会进行解压缩,因此启用压缩会对 Broker 端CPU、性能有一些影响。
压缩算法推荐使用 lz4
和 zstd
。其中 lz4
吞吐量最高,zstd
在保证一定的吞吐量的前提下压缩比最高。
无消息丢失配置
replication.factor
副本因子 >=3,消息多保存几份,防止消息丢失。
min.insync.replicas
至少多少副本才算成功。
确保 replication.factor > min.insync.replicas
,如果两者相等,只要有一个副本挂机,整个分区就无法正常工作。推荐 replication.factor > min.insync.replicas + 1
生产者使用重试机制,以及带有回调通知的 API,如 producer.send(msg, callback)
。
生产者设置 acks = all
,表示所有副本 Broker 都要接收到消息,才算提交成功。是最高等级的提交定义。
消费者是否开启自动提交位移,消费者消费完成后再提交,关闭 enable.auto.commit
,采用手动提交位移的方式。
消费者组
即 Group ID。一个消费者组有多个消费者或者多个消费者实例。组内所有消费者协调一起订阅一个主题的所有分区。每个分区只能由一个消费者组内的一个消费者实例消费。
理想情况下,消费者实例应该等于该消费者组订阅的主题分区总数。一般不推荐设置大于总分区数的消费者实例。
消费组的重平衡,主要发生于以下情景:
- 组成员发生变更,如新的消费者实例加入,或者离开
- 订阅主题数发生变更,如消费者组使用正则方式订阅主题,当新增一个符合条件的主题时。
- 订阅主题分区发生变更。
消费组的重平衡时会阻塞所有消费者。目前 无法解决这个重平衡机制问题,但是有一些优化途径,尽量避免此类情况发生。
消费者未能及时发送心跳
消费者提供了允许控制发送心跳频率的参数:heartbeat.interval.ms
。当消费者不能及时发送心跳,消费者会被踢出组。
另一个参数是 session.timeout.ms
默认是 10 秒。即决定消费者存活性的参数。
1 | session.timeout.ms = 6s |
这样配置,保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat
.interval.ms
消费者消费时间过长
消费者用于控制消费能力的参数:max.poll.interval.ms
,默认是 5 分钟。如果5分钟内无法消费完 poll 方法返回的消息,那么消费者会自动离组。
GC 时间太长
Full GC 也会导致引发重平衡
bootstrap.servers
Java Producer API 中 bootstrap.servers
不用配置所有的集群节点,只要配置 3-4 个,因为一旦连接到任一台 Broker,都能拿到整个集群的 Broker 信息。
副本机制
Kafka 副本是在分区层级下定义的,每个分区配置有若干个副本,副本提供数据冗余,用以实现高可用性。
副本分为领导者副本(Leader)和追随者副本(Follower),追随者副本只是异步拉取同步消息,不对外提供服务,所有请求都必须由领导者副本来处理。
当领导者副本挂了,Kafka依托Zookeeper提供的监控实时感知,会开启领导者选举,老的领导者副本重启回来以后,只能作为追随者。
这样设计主要为了实现:
- Read your writes,即写入消息后,立刻就能读到。
- Monotonic Reads,单调读。有点类似幻读。
In-sync Replicas (ISR)
Kafka 通过 In-sync Replicas (ISR) 机制,来动态决定追随者副本是否同步。
replica.lag.time.max.ms
参数,默认是 10 秒,意思是追随者副本落后领导者副本不连续超过10秒,则是同步的。如果不同步 Kafka 会自动收缩,不会放入 ISR。
如果副本后期追上了领导者副本的进度,同样可以被重新加回 ISR。