kafka 3.x安装及相关命令
jdk 安装 参考 java111
kafka3.x不再支持JDK8,建议安装JDK11或JDK17。
wget https://downloads.apache.org/kafka/3.9.1/kafka_2.12-3.9.1.tgz
tar -zxvf kafka_2.12-3.9.1.tgz
sh zookeeper-server-start.sh ../config/zookeeper.properties &
sh kafka-server-start.sh -daemon ../config/server.properties
kafka管理ui
docker run -d -p 9007:9000 -e HTTP_PORT=9000 --name zoonavigator --restart unless-stopped elkozmon/zoonavigator:latest
docker run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-ui
docker run --restart=always -itd -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true -e AUTH_TYPE=LOGIN_FORM -e SPRING_SECURITY_USER_NAME=admin -e SPRING_SECURITY_USER_PASSWORD=2DnBnGHgnnGHgnG docker.727204.xyz/provectuslabs/kafka-ui
docker run --restart=always -d --name kowl -p 9220:8080 -e KAFKA_BROKERS=172.23.50.113:9092 quay.io/cloudhut/kowl:latest
docker run --restart=always -d --name kowl -p 9220:8080 -e KAFKA_BROKERS=pkc-4r000:9092 -e KAFKA_TLS_ENABLED=true -e KAFKA_SASL_ENABLED=true -e KAFKA_SASL_USERNAME=xxx -e KAFKA_SASL_PASSWORD=xxx quay.io/cloudhut/kowl:master
公共变量
export kafka_home=/data/kafka_2.12-2.8.2
export kafka_home=$(ps -ef|grep kafka|grep bin|grep server.properties|sed "s/:/\\n/g" |grep kafka-clients-|sed "s#/bin/#\\n#g"|grep -v jar)
export local_ip=$(ip a show eth0|awk '/inet /{print $2}'|awk -F"/" '{print $1}')
查看集群节点数
$kafka_home/bin/kafka-broker-api-versions.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092
查看topics列表
$kafka_home/bin/kafka-topics.sh --list --zookeeper localhost:2181
$kafka_home/bin/kafka-topics.sh --list --bootstrap-server $(hostname -I|sed 's/ //g'):9092
查看有那些 group ID 正在进行消费
$kafka_home/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --list
#没有指定 topic ,查看的所有的 topic 的 消费者 的 group.id 的列表。重名的 group.id 只会显示一次
删除消费组
$kafka_home/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --delete --group {消费组}
查看指定group.id 的消费者消费情况
$kafka_home/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --describe --group ee
TOPIC topic名字
PARTITION 分区id
CURRENT-OFFSET 当前已消费的条数
LOG-END-OFFSET 总条数
LAG 未消费的条数
CONSUMER-ID 消费id
HOST 主机ip
CLIENT-ID 客户端id
动态设置topic的配置
$kafka_home/bin/kafka-configs.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --alter --entity-type topics --entity-name ee --add-config retention.ms=259200000 3days
$kafka_home/bin/kafka-configs.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --describe --entity-type topics --entity-name ee
各个 topic-partition 在 disk 上的占用空间情况
$kafka_home/bin/kafka-log-dirs.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --describe
创建test的topic,有3个分区,每个分区需分配3个副本
$kafka_home/bin/kafka-topics.sh --create --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --replication-factor 3 --partitions 3 --topic test
$kafka_home/bin/kafka-topics.sh --alter --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --partitions 8 --topic yunzhangfang
修改topic的分区数
$kafka_home/bin/kafka-topics.sh --alter --zookeeper 127.0.0.1:2181 --topic daizhang --partitions 12
$kafka_home/bin/kafka-topics.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --alter --topic topic-dics-long --partitions 16
查看topic详细信息
$kafka_home/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic daizhang
$kafka_home/bin/kafka-topics.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --describe --topic finsight-client-heartbeat
生产消息
$kafka_home/bin/kafka-console-producer.sh --broker-list $(hostname -I|sed 's/ //g'):9092 --topic sqlhint
消费消息
kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --group group_test --topic ods-task-logs --reset-offsets --to-datetime 2023-04-07T17:34:00.000 -execute
$kafka_home/bin/kafka-console-consumer.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --topic sqlhint --from-beginning
Kafka Consumer重置Offset
更新到当前group最初的offset位置
$kafka_home/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --group test-group --reset-offsets --all-topics --to-earliest --execute
更新到指定的offset位置
$kafka_home/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
更新到当前offset位置(解决offset的异常)
$kafka_home/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --group test-group --reset-offsets --all-topics --to-current --execute
offset设置到指定时刻开始
$kafka_home/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --group zhangyu6 --reset-offsets --all-topics --to-datetime 2023-07-03T14:20:00.000 --execute
java11
wget https://download.oracle.com/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz tar -zxvf openjdk-11.0.2_linux-x64_bin.tar.gz /etc/profile export JAVA_HOME=/data/jdk-11.0.2 export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$JAVA_HOME/bin:$PATH source /etc/profile
↩︎