客户端:
根据具体消息业务需要,调整配置以下参数:
1.是否启用压缩方式compressionCodec
2.消息确认方式:request.required.acks
3.消息发送类型,同步异步:producer.type
4.异步模式下缓冲的最大消息数(queue.buffering.max.messages)
5.异步模式下,每次发送的消息数(batch.num.messages)
6.消费者socket接收缓存空间大小(socket.receive.buffer.bytes)
7.消费者从每个分区fetch的消息大小(fetch.message.max.bytes)
8.消费者配置rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms
9.消费者消费起始位置配置:auto.offset.reset
10.消费者group.id
服务端:
1.调整服务器最大打开文件数限制:ulimit
2.Kafka建议开启JMX监控端口
3.broker配置:
kafka server.properties配置参数策略
注:以下各值仅供参考,具体配置值需根据实际硬件环境情况调整
必须的参数配置:
#broker在集群中的唯一表示
broker.id=0
#broker处理消息的最大线程数,建议为cpu核数
num.network.threads=4
#broker处理磁盘IO的线程数 ,建议为cpu核数2倍
num.io.threads=8
#socket的发送缓冲区
socket.send.buffer.bytes=1048576
#socket的接受缓冲区
socket.receive.buffer.bytes=1048576
#socket请求的最大数值,防止serverOOM,message.max.bytes要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600
#kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 log.dirs=/tmp/kafka-logs
#topic的分区个数,根据broker数量灵活调整
num.partitions=2
#数据文件保留多长时间, 存储的最大时间,超过这个时间会根据log.cleanup.policy设置数据清除策略 log.retention.hours=168
#topic的分区是以一堆segment文件存储的,控制每个segment的大小
log.segment.bytes=536870912
#文件大小检查的周期时间,是否触发 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=600000
#是否开启日志清理
log.cleaner.enable=true
#zookeeper集群的地址,逗号分隔多个
zookeeper.connect=
#ZooKeeper连接超时时间
zookeeper.connection.timeout.ms=1000000
建议优化的参数配置:
#broker的主机地址,默认null,一般不设置.
#若是设置会绑定到这个特定地址(IP或hostName,若设置为hostname时消费者端服务器需配置host解析)
#若是没有,会绑定到所有的IP地址上,并将其中之一发送到ZK
host.name=borkerIp
#为提高producer写入TPS,建议设置如下参数,取值根据应用情况
#每达到消息数时写入磁盘
log.flush.interval.messages=10000
#每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000
#日志文件清理策略:delete和compact(删除\压缩)主要针对过期数据的处理
log.cleanup.policy = delete
#replication对写入TPS有影响,建议设置为最小副本数
default.replication.factor=3