单机安装
centos7安装kafka | 小鹏驿站 (gitee.io)
集群配置(一台服务器)
kafka9001.properties
配置
#修改配置
broker.id=1
host.name=192.168.101.143
listeners=PLAINTEXT://192.168.101.143:9001
advertised.listeners=PLAINTEXT://192.168.101.143:9001
log.dirs=/opt/module/kafka/datas-logs1
kafka9002.properties
配置
#修改配置
broker.id=2
host.name=192.168.101.143
listeners=PLAINTEXT://192.168.101.143:9002
advertised.listeners=PLAINTEXT://192.168.101.143:9002
log.dirs=/opt/module/kafka/datas-logs2
kafka9003.properties
配置
#修改配置
broker.id=3
host.name=192.168.101.143
listeners=PLAINTEXT://192.168.101.143:9003
advertised.listeners=PLAINTEXT://192.168.101.143:9003
log.dirs=/opt/module/kafka/datas-logs3
启动
./kafka-server-start.sh ../config/kafka9001.properties &
./kafka-server-start.sh ../config/kafka9002.properties &
./kafka-server-start.sh ../config/kafka9003.properties &
启动脚本
#!/bin/bash
case $1 in
"start"){
for i in 9001 9002 9003
do
echo ------------- kafka $i 启动 ------------
"source /etc/profile;nohup /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/kafka$i.properties"
done
}
;;
"stop"){
echo ------------- kafka 停止 ------------
"/opt/module/kafka/bin/kafka-server-stop.sh "
}
;;
esac
集群配置(三台服务器)
准备三台服务器
node2
,node3
,node4
修改
node2
配置文件config/server.properties
# broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0
# kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
# 配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=node2:2181,node3:2181,node4:2181/kafka
- 分发安装包
xsync配置linux脚本 | 小鹏驿站 (gitee.io)
cd /opt/module
xsync kafka/
- 修改
node3
,node4
下面config/server.properties
# node3 broker 的全局唯一编号,不能重复,只能是数字。
broker.id=1
# node4 broker 的全局唯一编号,不能重复,只能是数字。
broker.id=2
- 环境变量增加
kafka
配置(node2,node3,node4)
vim /etc/profile
添加内容
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
刷新环境变量脚本
source /etc/profile
- zk启动脚本,kafka启动脚本编写
# 先启动zk
./zk.sh start
# 再启动kafka
./kf.sh start
集群配置kraft
准备三台服务器
node2
,node3
,node4
修改
node2
配置文件config/kraft/server.properties
# 节点 id
node.id=2
#全 Controller 列表
controller.quorum.voters=2@node2:9093,3@node3:9093,4@node4:9093
#broker 对外暴露的地址
advertised.Listeners=PLAINTEXT://node2:9092
#kafka 数据存储目录
log.dirs=/opt/module/kafka/data
- 分发安装包
xsync配置linux脚本 | 小鹏驿站 (gitee.io)
cd /opt/module
xsync kafka/
- 修改
node3
,node4
下面config/server.properties
# 节点 id
node.id=3
# broker 对外暴露的地址
advertised.Listeners=PLAINTEXT://node3:9092
# 节点 id
node.id=4
# broker 对外暴露的地址
advertised.Listeners=PLAINTEXT://node4:9092
- 环境变量增加
kafka
配置(node2,node3,node4)
vim /etc/profile
添加内容
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
刷新环境变量脚本
source /etc/profile
- 初始化集群数据目录
# 生成存储目录唯一ID
[root@node2 ~]# cd /opt/module/kafka/bin
[root@node2 bin]# ./kafka-storage.sh random-uuid
MBJ8mVoITCC3hDl886zuyA
# node2
[root@node2 bin]# ./kafka-storage.sh format -t MBJ8mVoITCC3hDl886zuyA -c /opt/module/kafka/config/kraft/server.properties
Formatting /opt/module/kafka/data
# node3
[root@node3 bin]# ./kafka-storage.sh format -t MBJ8mVoITCC3hDl886zuyA -c /opt/module/kafka/config/kraft/server.properties
Formatting /opt/module/kafka/data
# node4
[root@node4 bin]# ./kafka-storage.sh format -t MBJ8mVoITCC3hDl886zuyA -c /opt/module/kafka/config/kraft/server.properties
Formatting /opt/module/kafka/data
- 启动
[root@node2 bin]# ./kafka-server-start.sh -daemon ../config/kraft/server.properties
[root@node3 bin]# ./kafka-server-start.sh -daemon ../config/kraft/server.properties
[root@node4 bin]# ./kafka-server-start.sh -daemon ../config/kraft/server.properties
- 启动脚本
#!/bin/bash
case $1 in
"start"){
for i in node2 node3 node4
do
echo ------------- kafka $i 启动 ------------
ssh $i "source /etc/profile;nohup /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/kraft/server.properties"
done
}
;;
"stop"){
for i in node2 node3 node4
do
echo ------------- kafka $i 停止 ------------
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
done
}
;;
esac
测试
#创建
./kafka-topics.sh --create --topic "first32" --bootstrap-server 192.168.101.143:9001,192.168.101.143:9002,192.168.101.143:9003 --partitions 3 --replication-factor 2
#查看描述
./kafka-topics.sh --describe --topic "first32" --bootstrap-server 192.168.101.143:9001,192.168.101.143:9002,192.168.101.143:9003
- 3个partitions 2个副本
- 3个partitions 3个副本
kafka-console-consumer.sh
# from-beginning 从头开始
./kafka-console-consumer.sh --bootstrap-server 192.168.101.143:9001,192.168.101.143:9002,192.168.101.143:9003 --from-beginning --topic first33
kafka-console-producer.sh
./kafka-console-producer.sh --bootstrap-server 192.168.101.143:9001,192.168.101.143:9002,192.168.101.143:9003 --topic first33
性能测试
kafka-producer-perf-test.sh
#生产数量100万个 每个数据1000字节 每秒两千个
[root@rocketmq-nameserver1 bin]# ./kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=192.168.101.143:9001,192.168.101.143:9002,192.168.101.143:9003 batch.size=16384 linger.ms=0
参数说明
- record-size 是一条信息有多大,单位是字节,本次测试设置为 1k
- num-records 是总共发送多少条信息,本次测试设置为 100 万条
- throughput 是每秒多少条信息,设成-1,表示不限流,尽可能快的生产数据,可测出生产者最大吞吐量。本次实验设置为每秒钟 1 万条
- producer-props 后面可以配置生产者相关参数,batch.size 配置为 16k
参数 | 压测结果(机器不同效率也有影响) |
---|---|
batch.size=16384 linger.ms=0 | 1000000 records sent, 6565.384666 records/sec (6.41 MB/sec), 4552.62 ms avg latency, 11020.00 ms max latency, 4028 ms 50th, 8530 ms 95th, 10529 ms 99th, 10988 ms 99.9th. |
batch.size=32768 linger.ms=0 | 1000000 records sent, 9996.101520 records/sec (9.76 MB/sec), 1187.20 ms avg latency, 4831.00 ms max latency, 374 ms 50th, 4008 ms 95th, 4556 ms 99th, 4731 ms 99.9th. |
batch.size=4096 linger.ms=0 | 1000000 records sent, 3801.052892 records/sec (3.71 MB/sec), 6347.24 ms avg latency, 16980.00 ms max latency, 7125 ms 50th, 13355 ms 95th, 16519 ms 99th, 16904 ms 99.9th. |
batch.size=4096 linger.ms=50 | 1000000 records sent, 4139.569733 records/sec (4.04 MB/sec), 5786.82 ms avg latency, 17962.00 ms max latency, 3667 ms 50th, 16259 ms 95th, 17789 ms 99th, 17923 ms 99.9th |
batch.size=32768 linger.ms=0 compression.type=snappy | 1000000 records sent, 9997.600576 records/sec (9.76 MB/sec), 1865.76 ms avg latency, 5883.00 ms max latency, 1455 ms 50th, 4909 ms 95th, 5438 ms 99th, 5801 ms 99.9th. |
batch.size=32768 linger.ms=50 compression.type=gzip | 1000000 records sent, 9998.300289 records/sec (9.76 MB/sec), 37.91 ms avg latency, 739.00 ms max latency, 6 ms 50th, 199 ms 95th, 562 ms 99th, 639 ms 99.9th. |
batch.size=16384 linger.ms=50 compression.type=gzip buffer.memory=67108864 | 1000000 records sent, 9998.400256 records/sec (9.76 MB/sec), 45.36 ms avg latency, 1219.00 ms max latency, 4 ms 50th, 260 ms 95th, 945 ms 99th, 1161 ms 99.9th |
kafka-consumer-perf-test.sh
#消费数量10万个
./kafka-consumer-perf-test.sh --bootstrap-server 192.168.101.143:9001,192.168.101.143:9002,192.168.101.143:9003 --topic test --messages 1000000 --consumer.config ../config/consumer.properties
- –bootstrap-server 指定 Kafka 集群地址
- –topic 指定 topic 的名称
- –messages 总共要消费的消息个数。本次实验 100 万条。
修改配置consumer.properties
max.poll.records=500:一次 poll 拉取数据返回消息的最大条数
参数 | 结果start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec |
---|---|
max.poll.records=500 fetch.max.bytes=52428800 | 2022-07-21 20:39:56:913, 2022-07-21 20:40:07:561, 976.7051, 91.7266, 1000146, 93928.0616, 1531, 9117, 107.1301, 109701.2175 |
max.poll.records=2000 fetch.max.bytes=52428800 | 2022-07-21 20:41:14:571, 2022-07-21 20:41:19:923, 976.9424, 182.5378, 1000389, 186918.7220, 641, 4711, 207.3747, 212351.7300 |
max.poll.records=2000 fetch.max.bytes=104857600 | 2022-07-21 20:42:24:476, 2022-07-21 20:42:29:266, 977.1650, 204.0011, 1000617, 208897.0772, 509, 4281, 228.2563, 233734.4078 |
服役新节点
- 新增配置文件
kafka9004.properties
cp kafka9001.properties kafka9004.properties
- 修改内容
#修改配置
broker.id=4
host.name=192.168.101.143
listeners=PLAINTEXT://192.168.101.143:9004
advertised.listeners=PLAINTEXT://192.168.101.143:9004
log.dirs=/opt/module/kafka/datas-logs4
- 启动新节点
./kafka-server-start.sh ../config/kafka9004.properties &
- 创建一个需要负载均衡的主题
topics-to-move.json
# 可以传多个主题
{
"topics": [
{"topic": "first"}
],
"version": 1
}
- 生成负载均衡计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.101.143:9001 --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3,4" --generate
执行结果
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first","partition":2,"replicas":[3,2],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
# 需要执行的存储计划json
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first","partition":1,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first","partition":2,"replicas":[3,4],"log_dirs":["any","any"]}]}
- 创建副本存储计划
increase-replication.json
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first","partition":1,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first","partition":2,"replicas":[3,4],"log_dirs":["any","any"]}]}
- 执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.101.143:9001 --reassignment-json-file increase-replication.json --execute
执行结果
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first","partition":2,"replicas":[3,2],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-0,first-1,first-2
- 验证副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.101.143:9001 --reassignment-json-file increase-replication.json --verify
执行结果
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.
Clearing broker-level throttles on brokers 1,2,3,4
Clearing topic-level throttles on topic first
退役旧结点
- 创建一个需要负载均衡的主题
topics-to-move.json
{
"topics": [
{"topic": "first"}
],
"version": 1
}
- 创建执行计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.101.143:9001 --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3" --generate
执行结果
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first","partition":2,"replicas":[3,2],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first","partition":1,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first","partition":2,"replicas":[2,1],"log_dirs":["any","any"]}]}
- 创建副本存储计划
decrease-replication.json
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first","partition":1,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first","partition":2,"replicas":[2,1],"log_dirs":["any","any"]}]}
- 执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.101.143:9001 --reassignment-json-file decrease-replication.json --execute
执行结果
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first","partition":1,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first","partition":2,"replicas":[3,4],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-0,first-1,first-2
- 验证副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.101.143:9001 --reassignment-json-file decrease-replication.json --verify
执行结果
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.
Clearing broker-level throttles on brokers 1,2,3,4
Clearing topic-level throttles on topic first
指定分区副本
- 创建分区主题
# 4分区 2 副本
./kafka-topics.sh --create --topic "rocpenghua3" --bootstrap-server 192.168.101.143:9001 --partitions 4 --replication-factor 2
- 创建分区配置
assign-replication-factor.json
{
"version":1,
"partitions":[{"topic":"rocpenghua3","partition":0,"replicas":[1,2]},
{"topic":"rocpenghua3","partition":1,"replicas":[1,2]},
{"topic":"rocpenghua3","partition":2,"replicas":[2,1]},
{"topic":"rocpenghua3","partition":3,"replicas":[2,1]}]
}
- 执行副本存储计划
./kafka-reassign-partitions.sh --bootstrap-server 192.168.101.143:9001 --reassignment-json-file ../assign-replication-factor.json --execute
- 验证副本存储计划
./kafka-reassign-partitions.sh --bootstrap-server 192.168.101.143:9001 --reassignment-json-file ../assign-replication-factor.json --verify
- 查看主题信息
bin/kafka-topics.sh --bootstrap-server 192.168.101.143:9001 --describe --topic "rocpenghua3"
结果信息
Topic: rocpenghua3 TopicId: e7ZcYefrQ_ekcjzbEX4fbw PartitionCount: 4 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: rocpenghua3 Partition: 0 Leader: 2 Replicas: 1,2 Isr: 2,1
Topic: rocpenghua3 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 2,1
Topic: rocpenghua3 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: rocpenghua3 Partition: 3 Leader: 1 Replicas: 2,1 Isr: 1,2
增加分区副本
- 创建主题
# 3分区 1 副本
./kafka-topics.sh --create --topic "rocpenghua4" --bootstrap-server 192.168.101.143:9001 --partitions 3 --replication-factor 1
- 创建分区配置
increase-replication-factor.json
{
"version":1,
"partitions":[{"topic":"rocpenghua4","partition":0,"replicas":[1,2,3]},
{"topic":"rocpenghua4","partition":1,"replicas":[1,2,3]},
{"topic":"rocpenghua4","partition":2,"replicas":[1,2,3]}]
}
- 执行副本存储计划
./kafka-reassign-partitions.sh --bootstrap-server 192.168.101.143:9001 --reassignment-json-file ../increase-replication-factor.json --execute
- 验证副本存储计划
./kafka-reassign-partitions.sh --bootstrap-server 192.168.101.143:9001 --reassignment-json-file ../increase-replication-factor.json --verify
- 查看主题信息
bin/kafka-topics.sh --bootstrap-server 192.168.101.143:9001 --describe --topic "rocpenghua4"
核心参数配置
生产者
bootstrap.servers
:生 产 者 连 接 集 群 所 需 的 broker 地 址 清 单 。 例 如
192.168.101.143:9001,192.168.101.143:9002,可以设置 1 个或者多个,中间用逗号隔开,并非所有的brokerkey.serializer
和value.serializer
:指定发送消息的 key 和 value 的序列化类型。一定要写
全类名buffer.memory
:RecordAccumulator 缓冲区总大小,默认 32m。batch.size
:缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。linger.ms
:如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间acks
:默认值是-1- 0:生产者发送过来的数据,不需要等数据落盘应答。
- 1:生产者发送过来的数据,Leader 收到数据后应答。
- -1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。
max.in.flight.requests.per.connection
:允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字retries
:当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了retry.backoff.ms
:两次重试之间的时间间隔,默认是 100ms。enable.idempotence
:是否开启幂等性,默认 true,开启幂等性compression.type
:生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstdpartitioner.class
:分区器默认:
org.apache.kafka.clients.producer.internals.DefaultPartitioner
自定义:
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.rocpenghua.kafka.producer.MyPartitioner");
public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 获取数据 String msgValues = value.toString(); int partition; if (msgValues.contains("rocpenghua")){ partition = 2; }else { partition = 1; } return partition; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
事务开启
public class CustomProducerTranactions { public static void main(String[] args) { // 0 配置 Properties properties = new Properties(); // 连接集群 bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.101.143:9001,192.168.101.143:9002,192.168.101.143:9003"); // 指定对应的key和value的序列化类型 key.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 指定事务id properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01"); // 1 创建kafka生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); //初始化事务 kafkaProducer.initTransactions(); //开启事务 kafkaProducer.beginTransaction(); try { // 2 发送数据 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("first", "rocpenghua" + i)); } int i = 1 / 0; //提交事务 kafkaProducer.commitTransaction(); } catch (Exception e) { //放弃事务 kafkaProducer.abortTransaction(); } finally { // 3 关闭资源 kafkaProducer.close(); } } }
提高吞吐量配置
- buffer.memory:缓冲区大小 默认 32m
- batch.size:缓冲区一批数据最大值 默认 16k
- linger.ms:默认值是 0ms,表示没有延迟
- compression.type:默认是 none
数据可靠性
- acks:默认值是-1
数据去重
- enable.idempotence:开启幂等性,默认 true
数据乱序
- enable.idempotence:开启幂等性,默认 true
- max.in.flight.requests.per.connection:允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字
消费者
bootstrap.servers
:向 Kafka 集群建立初始连接用到的 host/port 列表。key.deserializer
和value.deserializer
:指定接收消息的 key 和 value 的反序列化类型group.id
:标记消费者所属的消费者组。命令默认生成。enable.auto.commit
:默认值为 true,消费者会自动周期性地向服务器提交偏移量auto.commit.interval.ms
:如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。auto.offset.reset
:当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理?- earliest:自动重置偏移量到最早的偏移量。
- latest:默认,自动重置偏移量为最新的偏移量。
- none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。
- anything:向消费者抛异常
offsets.topic.num.partitions
:__consumer_offsets 的分区数,默认是 50 个分区。heartbeat.interval.ms
:Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。session.timeout.ms
:Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。max.poll.interval.ms
:消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。fetch.min.bytes
:默认 1 个字节。消费者获取服务器端一批消息最小的字节数。fetch.max.wait.ms
:默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。fetch.max.bytes
:默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。max.poll.records
:一次 poll 拉取数据返回消息的最大条数,默认是 500 条。partition.assignment.strategy
:分区策略,默认org.apache.kafka.clients.consumer.RangeAssignor
org.apache.kafka.clients.consumer.CooperativeStickyAssignor
org.apache.kafka.clients.consumer.RangeAssignor
:基于每个主题分配分区。org.apache.kafka.clients.consumer.RoundRobinAssignor
:以轮循机制方式将分区分配给使用者。org.apache.kafka.clients.consumer.StickyAssignor
:保证分配的最大平衡,同时保留尽可能多的现有分区分配。org.apache.kafka.clients.consumer.CooperativeStickyAssignor
:遵循相同的粘性Assignor逻辑,但允许合作再平衡。
消费者再平衡
- heartbeat.interval.ms:默认 3s,小于 session.timeout.ms,也不应该高于session.timeout.ms 的 1/3
- session.timeout.ms:默认 45s
- max.poll.interval.ms:默认是 5 分钟
- partition.assignment.strategy:默 认 策 略 是Range +CooperativeSticky
消费者提高吞吐量
- fetch.max.bytes:Default: 52428800(50 m)
- max.poll.records:一次 poll 拉取数据返回消息的最大条数,默认是 500 条
broker
replica.lag.time.max.ms
:ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s。auto.leader.rebalance.enable
:默认是 true。 自动 Leader Partition 平衡。leader.imbalance.per.broker.percentage
:**默认是 10%**。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。leader.imbalance.check.interval.seconds
:默认值 300 秒。检查 leader 负载是否平衡的间隔时间log.segment.bytes
:Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G。log.index.interval.bytes
:默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。log.retention.hours
:Kafka 中数据保存的时间,默认 7 天。log.retention.minutes
:Kafka 中数据保存的时间,分钟级别,默认关闭。log.retention.ms
:Kafka 中数据保存的时间,毫秒级别,默认关闭。log.retention.check.interval.ms
:检查数据是否保存超时的间隔,默认是 5 分钟log.retention.bytes
:默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。log.cleanup.policy
:默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策略。num.io.threads
:默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。num.replica.fetchers
:副本拉取线程数,这个参数占总核数的 50%的 1/3num.network.threads
:默认是 3。数据传输线程数,这个参数占总核数的50%的 2/3 。log.flush.interval.messages
:强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理log.flush.interval.ms
:每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。
Partition负载平衡
- auto.leader.rebalance.enable:建议设置为 false 关闭。
- leader.imbalance.per.broker.percentage:默认是 10%。每个 broker 允许的不平衡的 leader的比率
- leader.imbalance.check.interval.seconds:默认值 300 秒。检查 leader 负载是否平衡的间隔时间
自动创建主题
- auto.create.topics.enable:默认true,生产环境改成false
单条日志大于1m
- message.max.bytes:默认 1m,broker 端接收每个批次消息最大值
- max.request.size:默认 1m,生产者发往 broker 每个请求消息最大值
- replica.fetch.max.bytes:默认 1m,副本同步数据,每个批次消息最大值
- fetch.max.bytes:默认 Default: 52428800(50 m)