Middleware / 运维笔记

ZooKeeper+Kafka 高可用集群搭建

Einic Yeo · 4月18日 · 2019年 · ·

Kafka是一种高吞吐量的 分布式 发布订阅消息系统,它可以处理消费者规模的网站中所有动作流数据。Kafka的目的是通过Hadoop 并行加载机制统一线上和离线消息处理,并通过 集群 提供实时消息。本文内容较基础,主要围绕kafka的体版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!系架构和功能展开。

正文开始之前,我们先了解一下Kafka中涉及的相关术语:

1、Broker——Kafka集群包含一个或多个服务器,这种服务器被称为broker ;

2、Topic——每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

3、Partition——Partition是物理上的概念,每个Topic包含一个或多个Partition.

4、Producer——负责发布消息到Kafka broker

5、Consumer——消息消费者,向Kafka broker读取消息的客户端。

6、Consumer Group——每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

Kafka的topic可以看做是一个记录流 (“/orders”, “/user-signups”),每个topic都有一个日志,它存储在磁盘上。每个topic又被分成多个partition(区),每个partition在存储层面是append log文件,任何发布版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!到partition的消息都会被直接追加到日志文件的尾部,Kafka Producer API用于生成数据记录流,Kafka Consumer API用于使用Kafka的记录流。

Kafka架构:Topic、Partition、Producer、Consumer

通常,一个普通的工作流程是Kafka的producer向topic写入消息,consumer从topic中读取消息。topic与日志相关联,日志是存储在磁盘上的数据结构,Kafka将producer的记录附加到topic日志的末尾。topic日志由分布在多个文件上的许多分区组成,这些文件可以分布在多个Kafka集群节点上。Kafka在集群的不同节点上分发topic日志分区,以实现具有水平可伸缩性的高性能。Spreading 分区有助于快速写入数据,Kafka将分区复制到许多节点以提供故障转移。

如果多个producer和consumer同时读取和写入相同的Kafka主题日志,Kafka如何扩展?首先,Kafka本身的写入速度很快,顺序写入文件系统本身就不需要太多时间;其次,在现代的快速驱动器上,Kafka可以轻松地每秒写入700 MB或更多字节的数据。

Hostname IP Remark
zk_ka_node1192.168.0.240 ZooKeeper+Kafka
zk_ka_node2192.168.0.250 ZooKeeper+Kafka
zk_ka_node3192.168.0.225 ZooKeeper+Kafka
zk_ka_node4192.168.0.220 ZooKeeper(Observer)+Kafka

一、系统环境准备

java环境准备,并需要配置环境变量,详情安装在此略。。。

2.1 ZooKeeper集群部署

#创建数据与日志文件目录
mkdir -p /data/zookeeper/{data,logs}
#下载与解压
wget -P /usr/local/ https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz && tar -zxf zookeeper-3.4.13.tar.gz && mv zookeeper-3.4.14 zookeeper-cluster
#创建用户与目录授权
useradd -s /sbin/nologin -M -U zookeeper && chown -R zookeeper:zookeeper /usr/local/zookeeper-cluster /data/zookeeper/
#配置环境变量
# zookeeper
export ZK_LOGS=/data/zookeeper/logs/
export ZK_HOME=/usr/local/zookeeper-cluster
export PATH=$ZK_HOME/bin:$PATH

日志输出标准化设置

#修改zookeeper.out输出路劲
将ZOO_LOG_DIR="."
修改为:ZOO_LOG_DIR="$ZK_LOGS"
#修改日志按照每天滚动输出(../bin/zkEnv.sh ../conf/log4j.properties)
将ZOO_LOG4J_PROP="INFO,CONSOLE" 
修改为:ZOO_LOG4J_PROP="INFO,ROLLINGFILE"

将log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
修改为:log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender

其它可配置参数:
log4j.appender.ROLLINGFILE.File=zookeeper.log
log4j.appender.ROLLINGFILE.DataPattern='.'yyyy-MM-dd-HH-mm
log4j.appender.ROLLINGFILE.Threshold=debug
log4j.appender.ROLLINGFILE.encoding=UTF-8
log4j.appender.ROLLINGFILE.Append=false
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern= [%d{yyyy-MM-dd HH\:mm\:ss}]%-5p %c(line\:%L) %x-%m%n

zookeeper 配置

所有节点配置文件如下

#新增配置文件
more /usr/local/zookeeper-cluster/conf/zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/data/logs
snapCount=2
clientPort=2181
autopurge.snapRetainCount=50
autopurge.purgeInterval=1
#开启超管账户 echo zookeeper.DigestAuthenticationProvider.superDigest=super:`echo -n super:admin | openssl dgst -binary -sha1 | openssl base64` >>/usr/local/zookeeper-cluster/conf/zoo.cfg | systemctl restart zookeeper 使用zkcli进入再执行addauth digest super:admin
#zookeeper.DigestAuthenticationProvider.superDigest=super:xQJmxLMiHGwaqBvst5y6rkB6HQs=
peerType=observer #observer 所在节点配置开启
server.0 = 192.168.0.240:2888:3888
server.1 = 192.168.0.250:2888:3888
server.2 = 192.168.0.225:2888:3888
server.3 = 192.168.0.220:2888:3888:observer

创建标识文件

#对应节点主机创建标识文件,标识文件内容对应zoo.cfg中server.x;x标识节点标识号,具有唯一性。
如:
192.168.0.240节点创建
echo "0" >/data/zookeeper/data/myid
192.168.0.250节点创建
echo "1" >/data/zookeeper/data/myid
192.168.0.225节点创建
echo "2" >/data/zookeeper/data/myid
192.168.0.220节点创建
echo "3" >/data/zookeeper/data/myid

配置启动文件

more /etc/init.d/zookeeper 

#!/bin/bash  
#chkconfig:2345 20 90  
#description:zookeeper  
#processname:zookeeper  
export JAVA_HOME=/usr/java/jdk1.8.0_172
case $1 in  
        start) su - zookeeper -s /bin/bash -c '/usr/local/zookeeper-cluster/bin/zkServer.sh start';;  
        stop) su - zookeeper -s /bin/bash -c '/usr/local/zookeeper-cluster/bin/zkServer.sh stop';;  
        status) su - zookeeper -s /bin/bash -c '/usr/local/zookeeper-cluster/bin/zkServer.sh status';;  
        restart) su - zookeeper -s /bin/bash -c '/usr/local/zookeeper-cluster/bin/zkServer.sh restart';;  
        *) echo "require start|stop|status|restart" ;;  
esac  
  • 启动
#集群启动成功标识

[root@zk_ka_node1 data]# service zookeeper status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-cluster/bin/../conf/zoo.cfg
Mode: leader

[root@zk_ka_node2 data]# service zookeeper status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-cluster/bin/../conf/zoo.cfg
Mode: follower

[root@zk_ka_node3 data]# service zookeeper status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-cluster/bin/../conf/zoo.cfg
Mode: follower

[root@zk_ka_node4 data]# service zookeeper status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-cluster/bin/../conf/zoo.cfg
Mode: observer

2.2 zookeeper常用命令

1.连接客户端
a.打开ZooKeeper的文件目录\bin运行./zkCli.sh。
b.进入到ZooKeeper安装目录,运行./bin/zkCli.sh -server 127.0.0.1:2181连接到ZooKeeper客户端。
2.简单命令:(默认目录为ZK)
1. 显示根目录下、文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容
2. 显示根目录下、文件: ls2 / 查看当前节点数据并能看到更新次数等数据
3. 创建文件,并设置初始内容: create /zk "test" 创建一个新的 znode节点“ zk ”以及与它关联的字符串
4. 获取文件内容: get /zk 确认 znode 是否包含我们所创建的字符串
5. 修改文件内容: set /zk "zkbak" 对 zk 所关联的字符串进行设置
6. 删除文件: delete /zk 将刚才创建的 znode 删除
7. 退出客户端: quit
8. 帮助命令: help 


3.Zookeeper的四字命令
ZooKeeper 四字命令
功能描述
conf
输出相关服务配置的详细信息。
cons
列出所有连接到服务器的客户端的完全的连接 / 会话的详细信息。包括“接受 / 发送”的包数量、会话 id 、操作延迟、最后的操作执行等等信息。
dump
列出未经处理的会话和临时节点。
envi
输出关于服务环境的详细信息(区别于 conf 命令)。
reqs
列出未经处理的请求
ruok
测试服务是否处于正确状态。如果确实如此,那么服务返回“imok ”,否则不做任何相应。
stat
输出关于性能和连接的客户端的列表。
wchs
列出服务器 watch 的详细信息。
wchc
通过 session 列出服务器 watch 的详细信息,它的输出是一个与watch 相关的会话的列表。
wchp
通过路径列出服务器 watch 的详细信息。它输出一个与 session相关的路径。
使用示例:
1. 可以通过命令:echo stat|nc 127.0.0.1 2181 来查看哪个节点被选择作为follower或者leader
2. 使用echo ruok|nc 127.0.0.1 2181 测试是否启动了该Server,若回复imok表示已经启动。
3. echo dump| nc 127.0.0.1 2181 ,列出未经处理的会话和临时节点。
4. echo kill | nc 127.0.0.1 2181 ,关掉server
5. echo conf | nc 127.0.0.1 2181 ,输出相关服务配置的详细信息。
6. echo cons | nc 127.0.0.1 2181 ,列出所有连接到服务器的客户端的完全的连接 / 会话的详细信息。
7. echo envi |nc 127.0.0.1 2181 ,输出关于服务环境的详细信息(区别于 conf 命令)。
8. echo reqs | nc 127.0.0.1 2181 ,列出未经处理的请求。
9. echo wchs | nc 127.0.0.1 2181 ,列出服务器 watch 的详细信息。
10. echo wchc | nc 127.0.0.1 2181 ,通过 session 列出服务器 watch 的详细信息,它的输出是一个与 watch 相关的会话的列表。
11. echo wchp | nc 127.0.0.1 2181 ,通过路径列出服务器 watch 的详细信息。它输出一个与 session 相关的路径。 

2.3 zookeeper配置文件说明

#客户端连接server的端口,即对外服务端口,一般设置为2181吧
clientPort
#存储快照文件snapshot的目录。默认情况下,事务日志也会存储在这里。建议同时配置参数dataLogDir, 事务日志的写性能直接影响zk性能
dataDir
#ZK中的一个时间单元。ZK中所有时间都是以这个时间单元为基础,进行整数倍配置的。例如,session的最小超时时间是2*tickTime
tickTime
#事务日志输出目录。尽量给事务日志的输出配置单独的磁盘或是挂载点,这将极大的提升ZK性能(No Java system property)
dataLogDir
#最大请求堆积数。默认是1000。ZK运行的时候, 尽管server已经没有空闲来处理更多的客户端请求了,但是还是允许客户端将请求提交到服务器上来,以提高吞吐性能。当然,为了防止Server内存溢出,这个请求堆积数还是需要限制下的(Java system property:zookeeper.globalOutstandingLimit. )
globalOutstandingLimit
#预先开辟磁盘空间,用于后续写入事务日志。默认是64M,每个事务日志大小就是64M。如果ZK的快照频率较大的话,建议适当减小这个参数(Java system property:zookeeper.preAllocSize )
preAllocSize
#每进行snapCount次事务日志输出后,触发一次快照(snapshot), 此时,ZK会生成一个snapshot.*文件,同时创建一个新的事务日志文件log.*。默认是100000.(真正的代码实现中,会进行一定的随机数处理,以避免所有服务器在同一时间进行快照而影响性能)(Java system property:zookeeper.snapCount )
snapCount
#用于记录所有请求的log,一般调试过程中可以使用,但是生产环境不建议使用,会严重影响性能(Java system property:? requestTraceFile )
traceFile
#单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制。请注意这个限制的使用范围,仅仅是单台客户端机器与单台ZK服务器之间的连接数限制,不是针对指定客户端IP,也不是ZK集群的连接数限制,也不是单台ZK对所有客户端的连接数限制。指定客户端IP的限制策略(No Java system property)
maxClientCnxns
#对于多网卡的机器,可以为每个IP指定不同的监听端口。默认情况是所有IP都监听 clientPort 指定的端口 new in 3.3.0
clientPortAddress
#Session超时时间限制,如果客户端设置的超时时间不在这个范围,那么会被强制设置为最大或最小时间。默认的Session超时时间是在2 *  tickTime ~ 20 * tickTime 这个范围 New in 3.3.0
minSessionTimeoutmaxSessionTimeout
#事务日志输出时,如果调用fsync方法超过指定的超时时间,那么会在日志中输出警告信息。默认是1000ms(Java system property:  fsync.warningthresholdms )New in 3.3.4
fsync.warningthresholdms
#在上文中已经提到,3.4.0及之后版本,ZK提供了自动清理事务日志和快照文件的功能,这个参数指定了清理频率,单位是小时,需要配置一个1或更大的整数,默认是0,表示不开启自动清理功能(No Java system property)  New in 3.4.0
autopurge.purgeInterval
#这个参数和上面的参数搭配使用,这个参数指定了需要保留的文件数目。默认是保留3个(No Java system property)  New in 3.4.0
autopurge.snapRetainCount
#在之前的版本中, 这个参数配置是允许我们选择leader选举算法,但是由于在以后的版本中,只会留下一种“TCP-based version of fast leader election”算法,所以这个参数目前看来没有用了,这里也不详细展开说了(No Java system property)
electionAlg
#Follower在启动过程中,会从Leader同步所有最新数据,然后确定自己能够对外服务的起始状态。Leader允许F在 initLimit 时间内完成这个工作。通常情况下,我们不用太在意这个参数的设置。如果ZK集群的数据量确实很大了,F在启动的时候,从Leader上同步数据的时间也会相应变长,因此在这种情况下,有必要适当调大这个参数了(No Java system property)
initLimit
#在运行过程中,Leader负责与ZK集群中所有机器进行通信,例如通过一些心跳检测机制,来检测机器的存活状态。如果L发出心跳包在syncLimit之后,还没有从F那里收到响应,那么就认为这个F已经不在线了。注意:不要把这个参数设置得过大,否则可能会掩盖一些问题(No Java system property)
syncLimit
#默认情况下,Leader是会接受客户端连接,并提供正常的读写服务。但是,如果你想让Leader专注于集群中机器的协调,那么可以将这个参数设置为no,这样一来,会大大提高写操作的性能(Java system property: zookeeper. leaderServes )
leaderServes
#这里的x是一个数字,与myid文件中的id是一致的。右边可以配置两个端口,第一个端口用于F和L之间的数据同步和其它通信,第二个端口用于Leader选举过程中投票通信(No Java system property)
server.x=[hostname]:nnnnn[:nnnnn]
#对机器分组和权重设置,可以查看官网(No Java system property)
group.x=nnnnn[:nnnnn]weight.x=nnnnn
#Leader选举过程中,打开一次连接的超时时间,默认是5s(Java system property: zookeeper.  cnxTimeout )
cnxTimeout
#ZK权限设置相关
zookeeper.DigestAuthenticationProvider.superDigest
#对所有客户端请求都不作ACL检查。如果之前节点上设置有权限限制,一旦服务器上打开这个开头,那么也将失效(Java system property:  zookeeper.skipACL )
skipACL
#这个参数确定了是否需要在事务日志提交的时候调用 FileChannel .force来保证数据完全同步到磁盘(Java system property: zookeeper.forceSync )
forceSync
#每个节点最大数据量,是默认是1M。这个限制必须在server和client端都进行设置才会生效(Java system property: jute.maxbuffer )
jute.maxbuffer

2.4 Kafka 集群部署

#下载与解压安装
wget -P /usr/local/ https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz && tar -zxf kafka_2.11-2.2.0.tgz && mv kafka_2.11-2.2.0 kafka-cluster

#创建启动用户与目录授权
useradd -s /sbin/nologin -M -U kafka && chown -R Kafka:Kafka /usr/local/kafka-cluster

kafka集群配置

grep -Ev '^$|^[#;]' /usr/local/kafka-cluster/config/server.properties

broker.id=0 #ID标识号具有唯一性,同zookeeper标识
listeners=PLAINTEXT://192.168.0.225:9092  #配置本机网络地址,可写域名,只要能解析就行
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs  #配置日志输出目录
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.240:2181,192.168.0.250:2181,192.168.0.225:2181,192.168.0.220:2181  #配置zookeeper集群连接参数
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

其他节点参照上文配置文件,只需修改listeners地址

  • 启动Kafka
# 第一种方式(推荐)
kafka-server-start.sh -daemon ../config/server.properties
# 第二种方式
nohup kafka-server-start.sh config/server.properties &
# 停止
kafka-server-stop.sh

#可观察进程
ps aux|grep kafka

日志输出说明

日志名称说明
server.log是kafka系统日志,很常用
state-change.logleader切换日志,就是broker宕机副本切换
controller.logkafka集群中有一台机器是控制器,那么控制器角色的日志就记录在这里

节点注册信息

[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 1] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.0.225:9092"],"jmx_port":-1,"host":"192.168.0.225","timestamp":"1555575867031","port":9092,"version":4}
cZxid = 0x200000018
ctime = Thu Apr 18 16:24:27 CST 2019
mZxid = 0x200000018
mtime = Thu Apr 18 16:24:27 CST 2019
pZxid = 0x200000018
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100004e046d0000
dataLength = 196
numChildren = 0
[zk: localhost:2181(CONNECTED) 2] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.0.250:9092"],"jmx_port":-1,"host":"192.168.0.250","timestamp":"1555576901830","port":9092,"version":4}
cZxid = 0x2000000a3
ctime = Thu Apr 18 16:41:41 CST 2019
mZxid = 0x2000000a3
mtime = Thu Apr 18 16:41:41 CST 2019
pZxid = 0x2000000a3
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100004e046d0001
dataLength = 196
numChildren = 0
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/2
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.0.240:9092"],"jmx_port":-1,"host":"192.168.0.240","timestamp":"1555577082477","port":9092,"version":4}
cZxid = 0x2000000b3
ctime = Thu Apr 18 16:44:42 CST 2019
mZxid = 0x2000000b3
mtime = Thu Apr 18 16:44:42 CST 2019
pZxid = 0x2000000b3
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x100004e046d0002
dataLength = 196
numChildren = 0
[zk: localhost:2181(CONNECTED) 4] 

测试验证生产者与消费者之间的消息发送

#创建topic
 [root@zk_ka_node3 bin]#./kafka-topics.sh --create --zookeeper 192.168.0.225:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.

#生产消息
[root@zk_ka_node1 bin]#.kafka-console-producer.sh --broker-list 192.168.0.240:9092 --topic test
>hello world!

#消费消息
[root@zk_ka_node2 bin]#.kafka-console-consumer.sh --bootstrap-server 192.168.0.250:9092 --topic test --from-beginning
hello world!

2.5 Kafka常用命令

说明:所有命令都要在bin目录下执行,除非你配置了环境变量。  –zookeeper 192.168.0.240:2181/之所以版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!后面加一个/这个是因为我在kafka配置文件中指明使用这个名称空间。

# 创建主题
./kafka-topics.sh --create --zookeeper 192.168.0.240:2181/ --replication-factor 2 --partitions 1 --topic test6
--replication-factor
该主题每个分区的副本数,不能大于broker数量。这个副本用于备份,假设每个分区有2个副本,那么只有一个是leader负责读写,follower只是负责同步内容,对外提供读写的也是由leader提供。Leader宕机follower接管成为新的leader。这里的数量是包括Leader+Follwer的
--partitions
分区数量,控制将主题切分成多少个LOG。消费者数量应该和分区数量相等,如果超过则毫无意义。
--topic主题名称
# 删除主题
./kafka-topics.sh --delete --zookeeper 192.168.0.240:2181/ --topic test6
# 查看所有主题
./kafka-topics.sh --list --zookeeper 192.168.0.240:2181/
# 查看指定topic信息,从下图可以看出当时建立这个topic的时候我们设置3个分区,每个分区的副本是3.
./kafka-topics.sh --describe --zookeeper 192.168.0.240:2181/ --topic test
PartiticonCount 显示分区数量一共有多少
ReplicationFactor 副本因子是多少
Partition 分区编号
Leader
显示Leader副本在哪个Broker上,这里是不同分区会有不同,表示Leader在broker.id=0的服务器上。三个分区每个分区有三个副本,分区编号从0开始,所以这个Leader是说后面Replicas副本里面哪个是Leader。Leader副本提供读写,非Leader副本只做数据备份。
从下图可以看出分区0和1对外提供读写的副本都在broker 2上。当然这不是一个好现象,意味着这个服务器将处理一个主题的2个分区读写,我们要平均分开。
Replicas
显示该partitions所有副本存储在哪些节点上 broker.id 这个是配置文件中设置的,它包括leader和follower节点
Isr
显示副本都已经同步的节点集合,这个集合的所有节点都是存活的,并且跟LEADER节点同步
[root@zk_ka_node1 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.0.240:2181/ --topic test
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
# 平衡读写
./kafka-preferred-replica-election.sh --zookeeper 192.168.0.240:2181/

再次查看test3主题发现Leader均衡了,3个分区的Leader副本由3个broker各自承担一个。不像之前0和1分区的Leader副本都在broker 2上

[root@zk_ka_node1 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.0.240:2181/ --topic test3
Topic:test3	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: test3	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1,0
	Topic: test3	Partition: 1	Leader: 0	Replicas: 0,2,1	Isr: 2,1,0
	Topic: test3	Partition: 2	Leader: 1	Replicas: 1,0,2	Isr: 2,1,0
# 线上集群所有主题情况
./kafka-topics.sh --describe --zookeeper 192.168.0.240:2181/

2.6 kafka配置文件说明

# ----------------------系统相关----------------------
# broker的全局唯一编号,不能重复,和zookeeper的myid是一个意思
broker.id=0

# broker机架感知的全局唯一标示,不能重复,以此向代理配置添加属性来指定代理属于特定机架
broker.rack=rack_01

# broker监听IP和端口也可以是域名
listeners=PLAINTEXT://192.168.0.225:9092

# 用于接收请求的线程数量
num.network.threads=3

# 用于处理请求的线程数量,包括磁盘IO请求,这个数量和log.dirs配置的目录数量有关,这里的数量不能小于log.dirs的数量,
# 虽然log.dirs是配置日志存放路径,但是它可以配置多个目录后面用逗号分隔
num.io.threads=8

# 发送缓冲区大小,也就是说发送消息先发送到缓冲区,当缓冲区满了之后一起发送出去
socket.send.buffer.bytes=102400

# 接收缓冲区大小,同理接收到缓冲区,当到达这个数量时就同步到磁盘
socket.receive.buffer.bytes=102400

# 向kafka套接字请求最大字节数量,防止服务器OOM,也就是OutOfMemery,这个数量不要超过JAVA的堆栈大小,
socket.request.max.bytes=104857600

# 日志路径也就是分区日志存放的地方,你所建立的topic的分区就在这里面,但是它可以配置多个目录后面用逗号分隔
log.dirs=/tmp/kafka-logs

# 消息体(也就是往Kafka发送的单条消息)最大大小,单位是字节,必须小于socket.request.max.bytes值
message.max.bytes =5000000

# 自动平衡由于某个broker故障会导致Leader副本迁移到别的broker,当之前的broker恢复后也不会迁移回来,有时候我们需要
# 手动进行平衡避免同一个主题不同分区的Leader副本在同一台broker上,下面这个参数就是开启自动平衡功能
auto.leader.rebalance.enable=true

# 设置了上面的自动平衡,当故障转移后,隔300秒(默认)触发一个定时任务进行平衡操作,而只有代理的不均衡率为10%以上才会执行
leader.imbalance.check.interval.seconds=300

# 设置代理的不均衡率,默认是10%
leader.imbalance.per.broker.percentage=10

# ---------------分区相关-------------------------

# 默认分区数量,当建立Topic时不指定分区数量,默认就1
num.partitions=1

# 是否允许自动创建topic ,若是false,就需要通过命令创建topic
auto.create.topics.enable =true
 
# 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数
default.replication.factor =2

# ---------------日志相关-------------------------

# segment文件默认会被保留7天的时间,超时的话就会被清理,那么清理这件事情就需要有一些线程来做。
# 这里就是用来设置恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

# 日志文件中每个segment的大小,默认为1G。topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,当超过这个大小会建立一个新日志文件
# 这个参数会被topic创建时的指定参数覆盖,如果你创建Topic的时候指定了这个参数,那么你以你指定的为准。
log.segment.bytes=1073741824

# 数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
# log.retention.bytes和log.retention.minutes|hours任意一个达到要求,都会执行删除
# 如果你创建Topic的时候指定了这个参数,那么你以你指定的为准
log.retention.hours|minutes=168

# 这个参数会在日志segment没有达到log.segment.bytes设置的大小默认1G的时候,也会强制新建一个segment会被
# topic创建时的指定参数覆盖
log.roll.hours=168 

# 上面的参数设置了每一个segment文件的大小是1G,那么就需要有一个东西去定期检查segment文件有没有达到1G,多长时间去检查一次,
# 就需要设置一个周期性检查文件大小的时间(单位是毫秒)。
log.retention.check.interval.ms=300000

# 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,
# 如果你创建Topic的时候指定了这个参数,那么你以你指定的为准
log.cleanup.policy = delete

# 是否启用日志清理功能,默认是启用的且清理策略为compact,也就是压缩。
log.cleaner.enable=false

# 日志清理时所使用的缓存空间大小
log.cleaner.dedupe.buffer.size=134217728

# log文件"sync"到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个"数据可靠性"的必要手段
# 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.
# 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)
# 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.
# 物理server故障,将会导致没有fsync的消息丢失.
log.flush.interval.messages=9223372036854775807

# 检查是否需要固化到硬盘的时间间隔
log.flush.scheduler.interval.ms =3000
 
# 仅仅通过interval来控制消息的磁盘写入时机,是不足的.
# 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔
# 达到阀值,也将触发.
log.flush.interval.ms = None

# --------------------------复制(Leader、replicas) 相关-------------------
# partition leader与replicas之间通讯时,socket的超时时间
controller.socket.timeout.ms =30000

# replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),
# 并认为它是死的,不会再加入管理中
replica.lag.time.max.ms =10000

# follower与leader之间的socket超时时间
replica.socket.timeout.ms=300000

# leader复制时候的socket缓存大小
replica.socket.receive.buffer.bytes=65536

# replicas每次获取数据的最大大小
replica.fetch.max.bytes =1048576

# replicas同leader之间通信的最大等待时间,失败了会重试
replica.fetch.wait.max.ms =500

# fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
replica.fetch.min.bytes =1

# leader 进行复制的线程数,增大这个数值会增加follower的IO
num.replica.fetchers=1

# 最小副本数量
min.insync.replicas = 2

二、集群的调优方向

  1>.调大zookeeper的heap内存,默认是1G,可以根据服务器大小配置其堆内存为2G或者4G足矣(kafka实时传输的数据如果达到PB级别的话,得观察一下YGC和FGC的值可以适当再次调大);

  2>.修改kafka的副本数,默认的副本数是1,建议修改为2,如果副本数为版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!2,那么容灾能力就是1,如果副本数3,则容灾能力就是2,当然副本数越多,可能会导致集群的性能下降,但是可靠性更强,各有利弊,我这里推荐副本数为2;

  3>.kafka推荐分区数,默认的分区数是1,理论上来说,parition的数量小于core的数量的话,值越大,kafka的吞吐量就越高,但是你必须得考虑你的磁盘IO的瓶颈,因此我不推荐你将分区数这只过大,我建议这个值大于broker的数量,比如我的集群broker的只有5台,我的集群的partition数量是20;

  4>.kafka的heap内存,默认也是1G,生成环境中建议将它调大,不知道大家有没有发现,你broker的heap内存不管有多的,它都能给你吃满!我在生成环境中给kafka的heap内存是6G(k版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!afka主要使用堆外内存,即大量使用操作系统的页缓存,因此其并不需要分配太多的堆内存空间),zookeeper给的是2G,剩下的全部给操作系统预留着,否则你的机器会非常的卡顿;

0 条回应