客户端:

根据具体消息业务需要,调整配置以下参数:

1.是否启用压缩方式compressionCodec

2.消息确认方式:request.required.acks

3.消息发送类型,同步异步:producer.type

4.异步模式下缓冲的最大消息数(queue.buffering.max.messages)

5.异步模式下,每次发送的消息数(batch.num.messages)

6.消费者socket接收缓存空间大小(socket.receive.buffer.bytes)

7.消费者从每个分区fetch的消息大小(fetch.message.max.bytes)

8.消费者配置rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms

9.消费者消费起始位置配置:auto.offset.reset

10.消费者group.id

服务端:

1.调整服务器最大打开文件数限制:ulimit

2.Kafka建议开启JMX监控端口

3.broker配置:

kafka server.properties配置参数策略

注:以下各值仅供参考,具体配置值需根据实际硬件环境情况调整

必须的参数配置:

#broker在集群中的唯一表示

broker.id=0

#broker处理消息的最大线程数,建议为cpu核数

num.network.threads=4

#broker处理磁盘IO的线程数 ,建议为cpu核数2倍

num.io.threads=8

#socket的发送缓冲区

socket.send.buffer.bytes=1048576

#socket的接受缓冲区

socket.receive.buffer.bytes=1048576

#socket请求的最大数值,防止serverOOM,message.max.bytes要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖

socket.request.max.bytes=104857600

#kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 log.dirs=/tmp/kafka-logs

#topic的分区个数,根据broker数量灵活调整

num.partitions=2

#数据文件保留多长时间, 存储的最大时间,超过这个时间会根据log.cleanup.policy设置数据清除策略 log.retention.hours=168

#topic的分区是以一堆segment文件存储的,控制每个segment的大小

log.segment.bytes=536870912

#文件大小检查的周期时间,是否触发 log.cleanup.policy中设置的策略

log.retention.check.interval.ms=600000

#是否开启日志清理

log.cleaner.enable=true

#zookeeper集群的地址,逗号分隔多个

zookeeper.connect=

#ZooKeeper连接超时时间

zookeeper.connection.timeout.ms=1000000

建议优化的参数配置:

#broker的主机地址,默认null,一般不设置.

#若是设置会绑定到这个特定地址(IP或hostName,若设置为hostname时消费者端服务器需配置host解析)

#若是没有,会绑定到所有的IP地址上,并将其中之一发送到ZK

host.name=borkerIp

#为提高producer写入TPS,建议设置如下参数,取值根据应用情况

#每达到消息数时写入磁盘

log.flush.interval.messages=10000

#每间隔1秒钟时间,刷数据到磁盘

log.flush.interval.ms=1000

#日志文件清理策略:delete和compact(删除\压缩)主要针对过期数据的处理

log.cleanup.policy = delete

#replication对写入TPS有影响,建议设置为最小副本数

default.replication.factor=3