Kafka常用命令行使用简介

kafka-server-start.sh命令

kafka-server-start.sh命令用于启动kafka服务。一般用法

启动kafka服务,其中必须传递的参数是服务的配置文件路径。

kafka-server-start.sh  ../config/server.properties

一般启动服务需要以后台运行的模式,需要额外新增 -daemon参数

kafka-server-start.sh -daemon ../config/server.properties

此外还可以通过 –override参数覆盖设置server.properties的配置,不过这种方式不常用

kafka-server-start.sh ../config/server.properties --override broker.id=0 --override log.dirs=/tmp/kafka-logs-1

kafka-topics.sh命令

kafka-topics.sh命令主要用于管理kafka的topic,常用的操作包括查看、创建、删除、增加分区。

查看全部topic列表,其中 –bootstrap-server参数是kafka集群的地址。

需要注意的是只有在新版本的kafka中才能使用 –bootstrap-server参数,在旧版本的kafka中需要使用 –zookeeper参数查看

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

查看特定topic的配置信息使用如下命令,–topic参数用于指定topic的名称

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic test --describe

创建topic使用如下命令,–partitions参数用于指定分区数,–replication-factor参数用于指定备份数量

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create  --topic test_topic --replication-factor 1 --partitions 3

删除topic使用如下命令。

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic test_topic

为topic增加分区使用如下命令,–partitions参数指定的是新的分区数量,且分区只能新增不能减小。

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic test_topic --partitions 3

kafka-console-producer.sh

kafka-console-producer.sh主要用于生产消息相关操作

使用此命令需要如下两个固定参数:–broker-list参数指定集群地址,–topic参数指定消息的topic。

可以使用以为方式打开一个生产消息的输入控制台,然后就可以逐行输入消息,退出使用control + C组合键。

kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

除了手动输入以外,该命令也指定从文件中逐行读取数据发送消息。

kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test < file-input.txt

kafka-console-consumer.sh

kafka-console-consumer.sh主要用于消费消息相关操作

使用此命令需要如下两个固定参数:–bootstrap-server参数指定集群地址,–topic参数指定消息的topic。

可以使用如下命令一次性的topic里面的消费全部的消息

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

上述命令消费消息时没有指定消费者组,使用的是随机生成的名称,且不会记录offset,如果需要指定消费者组需要使用 –group参数

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning --group my_group

可以通过 –offset参数来特别指定消费的策略,–offset可以使用值为 earliest或者 latest

earliest表示从头开始消费,等价于使用–from-beginning

latest表示从最新的开始消费,这是默认的消费模式

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --group my_group --offset earliest

可以通过**–partition**参数指定消费的分区,如果不知道默认是消费全部分区的数据

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --group my_group --partition 0

kafka-consumer-groups.sh

kafka-consumer-groups.sh命令消费者组的相关操作,常用的包括查询消费者组的offset、重置offset、删除消费者组

查看全部消费者组列表,其中 –bootstrap-server参数是kafka集群的地址。

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

查看特定消费者组的消费offset情况,使用如下命令,–group参数用于指定消费者组的名称

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group my_group --describe

使用 –reset-offsets参数可以重置消费者组的offset,重置offset还必须通过 –topic制定重置的topic 重置策略一般有如下三种:–to-earliest、–to-latest、–to-offset

–to-earliest表示从重置offset到分区的最小offset

–to-latest表示从重置offset到分区的最大offset

–to-offset表示从重置offset到分区的特定offset

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group my_group --topic test --reset-offsets --to-earliest --execute

使用**–delete**参数对消费者组进行删除操作,主要删除的是offset记录

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group my_group --delete

kafka-log-dirs.sh

kafka-log-dirs.sh命令用于查询broker的日志保存信息,输出格式为json。

查询特定topic的日志保存信息,其中 –topic-list参数指定查询的topic

kafka-log-dirs.sh --bootstrap-server 127.0.01:9092 --describe --topic-list test

查询特定broker的日志保存信息,其中 –broker-list参数指定查询的broker标识符(server.properties配置的broker.id)

kafka-log-dirs.sh --bootstrap-server 127.0.01:9092 --describe --broker-list 1

kafka-delete-records.sh

kafka-delete-records.sh命令用于删除特定的日志记录。

对于kafka来说默认存在两种删除策略, 一种基于日志量, 二种基于日志保存的时间, 但是有时候我们需要手动的去指定删除到具体的位置, 这个脚本就比较有用了

–offset-json-file参数用于配置删除记录的配置信息,

kafka-delete-records.sh --bootstrap-server 127.0.01:9092 --offset-json-file kafka-delete-records.json

删除记录的配置需要满足如下配置格式:

{
    "partitions":
        [
            {
                "topic": "test",
                "partition": 0,
                "offset": 20
            }
        ],
    "version":1
}

kafka-run-class.sh

kafka-run-class.sh命令用于执行一些特殊的查询,例如查看查询topic的offset可以使用此命令。

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic test --time -1

最后的参数-1表示显示获取当前offset最大值,-2表示offset的最小值

参考资料

分类

开发
    --go (9)
    --java (5)
    --php (11)
    --mysql (9)
    --javascript (3)
    --html (1)
    --算法 (6)
架构
    --理论 (9)
    --网络 (3)
    --服务器 (2)
    --消息队列 (3)
    --容器 (5)
    --监控 (1)
    --搜索引擎 (3)
    --大数据 (0)
    --测试 (1)
系统
    --linux (10)
    --mac (2)
    --windows (1)
足球
    --世界杯 (60)
    --欧洲杯 (28)
    --文迷 (3)
大学时光
    --校园生活 (96)
    --假期生活 (17)
    --广院杯那些事 (14)
    --北京奥运 (6)
    --胡思乱写 (17)


最近发布

零拷贝技术介绍

服务网格技术简介

C语言标准和标准库简介

Kubernetes简介及环境搭建

Go语言开发的顶级项目


归档

2006 (109)
2007 (40)
2008 (47)
2009 (10)
2010 (6)
2012 (10)
2013 (14)
2014 (27)
2015 (15)
2016 (6)
2017 (8)
2018 (11)
2019 (17)
2020 (5)