kafka 3.x安装及相关命令

jdk 安装 参考 java111

kafka3.x不再支持JDK8,建议安装JDK11或JDK17。

wget  https://downloads.apache.org/kafka/3.9.1/kafka_2.12-3.9.1.tgz
tar -zxvf kafka_2.12-3.9.1.tgz 

sh zookeeper-server-start.sh  ../config/zookeeper.properties &

sh kafka-server-start.sh -daemon ../config/server.properties 

kafka管理ui

docker run   -d   -p 9007:9000   -e HTTP_PORT=9000   --name zoonavigator   --restart unless-stopped   elkozmon/zoonavigator:latest
docker run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-ui

docker run --restart=always -itd -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true -e AUTH_TYPE=LOGIN_FORM  -e SPRING_SECURITY_USER_NAME=admin -e SPRING_SECURITY_USER_PASSWORD=2DnBnGHgnnGHgnG docker.727204.xyz/provectuslabs/kafka-ui 

docker run  --restart=always -d --name kowl -p 9220:8080 -e KAFKA_BROKERS=172.23.50.113:9092   quay.io/cloudhut/kowl:latest
docker run  --restart=always -d --name kowl -p 9220:8080 -e KAFKA_BROKERS=pkc-4r000:9092 -e KAFKA_TLS_ENABLED=true -e KAFKA_SASL_ENABLED=true -e KAFKA_SASL_USERNAME=xxx -e KAFKA_SASL_PASSWORD=xxx quay.io/cloudhut/kowl:master

公共变量

export kafka_home=/data/kafka_2.12-2.8.2
export kafka_home=$(ps -ef|grep kafka|grep bin|grep server.properties|sed  "s/:/\\n/g" |grep kafka-clients-|sed "s#/bin/#\\n#g"|grep -v jar)
export local_ip=$(ip a show eth0|awk '/inet /{print $2}'|awk -F"/" '{print $1}')

查看集群节点数

$kafka_home/bin/kafka-broker-api-versions.sh --bootstrap-server  $(hostname -I|sed 's/ //g'):9092

查看topics列表

$kafka_home/bin/kafka-topics.sh  --list --zookeeper localhost:2181
$kafka_home/bin/kafka-topics.sh  --list --bootstrap-server $(hostname -I|sed 's/ //g'):9092

查看有那些 group ID 正在进行消费

$kafka_home/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092   --list
#没有指定 topic ,查看的所有的 topic 的 消费者 的 group.id  的列表。重名的 group.id 只会显示一次

删除消费组

$kafka_home/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --delete --group {消费组}

查看指定group.id 的消费者消费情况

$kafka_home/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --describe --group ee
TOPIC topic名字
PARTITION 分区id
CURRENT-OFFSET 当前已消费的条数
LOG-END-OFFSET 总条数
LAG  未消费的条数
CONSUMER-ID 消费id
HOST 主机ip
CLIENT-ID 客户端id

动态设置topic的配置

$kafka_home/bin/kafka-configs.sh  --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --alter --entity-type topics --entity-name ee --add-config retention.ms=259200000    3days

$kafka_home/bin/kafka-configs.sh  --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --describe  --entity-type topics --entity-name  ee

各个 topic-partition 在 disk 上的占用空间情况

$kafka_home/bin/kafka-log-dirs.sh  --bootstrap-server $(hostname -I|sed 's/ //g'):9092  --describe

创建test的topic,有3个分区,每个分区需分配3个副本

$kafka_home/bin/kafka-topics.sh --create --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --replication-factor 3 --partitions 3 --topic test

$kafka_home/bin/kafka-topics.sh --alter --bootstrap-server $(hostname -I|sed 's/ //g'):9092  --partitions 8  --topic yunzhangfang 

修改topic的分区数

$kafka_home/bin/kafka-topics.sh --alter --zookeeper 127.0.0.1:2181 --topic daizhang --partitions 12
$kafka_home/bin/kafka-topics.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092  --alter  --topic topic-dics-long  --partitions 16

查看topic详细信息

$kafka_home/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181  --describe   --topic daizhang 
$kafka_home/bin/kafka-topics.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --describe --topic finsight-client-heartbeat

生产消息

$kafka_home/bin/kafka-console-producer.sh --broker-list $(hostname -I|sed 's/ //g'):9092 --topic sqlhint

消费消息

kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --group group_test --topic ods-task-logs --reset-offsets --to-datetime 2023-04-07T17:34:00.000  -execute
$kafka_home/bin/kafka-console-consumer.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --topic sqlhint --from-beginning

Kafka Consumer重置Offset

更新到当前group最初的offset位置
$kafka_home/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --group test-group --reset-offsets --all-topics --to-earliest --execute

更新到指定的offset位置
$kafka_home/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute

更新到当前offset位置(解决offset的异常)
$kafka_home/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --group test-group --reset-offsets --all-topics --to-current --execute

offset设置到指定时刻开始
$kafka_home/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname -I|sed 's/ //g'):9092 --group zhangyu6 --reset-offsets --all-topics --to-datetime 2023-07-03T14:20:00.000 --execute


  1. java11

    wget https://download.oracle.com/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz
    tar -zxvf openjdk-11.0.2_linux-x64_bin.tar.gz 
    
    
    /etc/profile
    export JAVA_HOME=/data/jdk-11.0.2 
    export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
    export PATH=$JAVA_HOME/bin:$PATH
    source /etc/profile
    

    ‍ ↩︎