kafka检查消费者位置

原创
半兽人 发表于: 2015-03-10   最后更新时间: 2019-12-02 00:33:46  
{{totalSubscript}} 订阅, 41,708 游览

检查消费者的位置(Checking consumer position)

Sometimes it's useful to see the position of your consumers. We have a tool that will show the position of all consumers in a consumer group as well as how far behind the end of the log they are. To run this tool on a consumer group named my-group consuming a topic named my-topic would look like this:
有时候需要去查看你的消费者的位置。我们有一个显示【消费者组中】所有消费者的位置的工具。显示日志其落后多远。消费者组名为my-group,消费者topic名为my-topic,如下:

> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test

Group           Topic                          Pid Offset          logSize         Lag             Owner
my-group        my-topic                       0   0               0               0               test_jkreps-mn-1394154511599-60744496-0
my-group        my-topic                       1   0               0               0               test_jkreps-mn-1394154521217-1a0be913-0

NOTE: Since 0.9.0.0, the kafka.tools.ConsumerOffsetChecker tool has been deprecated. You should use the kafka.admin.ConsumerGroupCommand (or the bin/kafka-consumer-groups.sh script) to manage consumer groups, including consumers created with the new consumer API.
注意:在0.9.0.0,kafka.tools.ConsumerOffsetChecker已经不支持了。你应该使用kafka.admin.ConsumerGroupCommand 或 bin/kafka-consumer-groups.sh脚本来管理消费者组,包括用新消费者API创建的消费者。

## 0.9+
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group

## 0.10+
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

Managing Consumer Groups(管理消费者组)

With the ConsumerGroupCommand tool, we can list, describe, or delete consumer groups. Note that deletion is only available when the group metadata is stored in ZooKeeper. When using the new consumer API (where the broker handles coordination of partition handling and rebalance), the group is deleted when the last committed offset for that group expires. For example, to list all consumer groups across all topics:
用ConsumerGroupCommand工具,我们可以使用list,describe,或delete消费者组(注意,删除只有在分组元数据存储在zookeeper的才可用)。当使用新消费者API(broker协调处理分区和重新平衡),当该组的最后一个提交的偏移到期时,该组被删除。 例如,要列出所有主题中的所有用户组:

> bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list

 test-consumer-group

To view offsets as in the previous example with the ConsumerOffsetChecker, we "describe" the consumer group like this:
要使用ConsumerOffsetChecker查看上一个示例中消费者组的偏移量,我们按如下所示“describe”消费者组:

> bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group test-consumer-group

GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
test-consumer-group            test-foo                       0          1               3               2               consumer-1_/127.0.0.1

There are a number of additional "describe" options that can be used to provide more detailed information about a consumer group:
还有一切其他的命令可以提供消费组更多详细信息:

  • -members: This option provides the list of all active members in the consumer group.
    -members: 此选项提供使用者组中所有活动成员的列表。

    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
    
    CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS
    consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2
    consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1
    consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3
    consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0
    
  • --members --verbose: On top of the information reported by the "--members" options above, this option also provides the partitions assigned to each member.
    除了“--members”展示信息之外,此选项还展示分配给每个成员的分区。

    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
    
    CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
    consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2               topic1(0), topic2(0)
    consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1               topic3(2)
    consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3               topic2(1), topic3(0,1)
    consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0               -
    
  • --offsets: This is the default describe option and provides the same output as the "--describe" option.
    --offsets:默认的describe选项,与“--describe”选项相同的输出。

  • --state: This option provides useful group-level information.
    --state:此选项提供有用的组级别信息

    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state
    
    COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
    localhost:9092 (0)        range                     Stable               4
    

To manually delete one or multiple consumer groups, the "--delete" option can be used:
要手动删除一个或多个消费者组,可以使用“--delete”:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

 Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.

To reset offsets of a consumer group, "--reset-offsets" option can be used. This option supports one consumer group at the time. It requires defining following scopes: --all-topics or --topic. One scope must be selected, unless you use '--from-file' scenario. Also, first make sure that the consumer instances are inactive. See KIP-122 for more details.
要重置消费者组的offset,可以使用“--reset-offsets”选项。此选项同一时间只支持一个消费者组操作。它需要定义以下范围:--all-topics--topic。 除非您使用"--from-file"方案,否则必须选择一个范围。另外,首先请确保消费者实例处于非活动状态。有关更多详细信息,请参见KIP-122。

It has 3 execution options:
它有3个执行操作命令选项:

  • (default) to display which offsets to reset.
    显示要重置的offset
  • --execute : to execute --reset-offsets process.
    执行--reset-offsets处理
  • --export : to export the results to a CSV format.
    --export:将结果导出为CSV格式。

--reset-offsets also has following scenarios to choose from (atleast one scenario must be selected):
--reset-offsets 还具有以下场景可供选择(必须选择至少一个场景):

  • --to-datetime : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'
    --reset-offsets :将offset重置为与日期时间的offset。 格式:'YYYY-MM-DDTHH:mm:SS.sss'

  • --to-earliest : Reset offsets to earliest offset.
    --to-earliest : 将offset重置为最早的offset。

  • --to-latest : Reset offsets to latest offset.
    --to-latest : 将offsets重置为最新的offsets。
  • --shift-by : Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.
    --shift-by : 重置offsets,通过移位“n”,其中“ n”可以为正或负。
  • --from-file : Reset offsets to values defined in CSV file.
    - --from-file : 将offset重置为CSV文件中定义的值。
  • --to-current : Resets offsets to current offset.
    --to-current : 将offset重置为当前的offset。
  • --by-duration : Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'
    --by-duration : 将offset重置为从当前时间戳重置为持续时间offset。格式:“ PnDTnHnMnS”
  • --to-offset : Reset offsets to a specific offset.
    --to-offset : 将offset重置为指定的。

Please note, that out of range offsets will be adjusted to available offset end. For example, if offset end is at 10 and offset shift request is of 15, then, offset at 10 will actually be selected.
请注意,超出范围的offset将调整为可用的offset。例如,如果offset最大为10,设置为15时,则实际上将选择offset将为10。

For example, to reset offsets of a consumer group to the latest offset:
例如,要将消费者组的offset重置为最新的offset:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest

 TOPIC                          PARTITION  NEW-OFFSET
topic1                         0          0

If you are using the old high-level consumer and storing the group metadata in ZooKeeper (i.e. offsets.storage=zookeeper), pass --zookeeper instead of bootstrap-server:
如果你使用是老的高级消费者并在zookeeper存储消费者组的元数据(即。offsets.storage=zookeeper),则通过--zookeeper,而不是bootstrap-server

> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
更新于 2019-12-02

余生若初 2年前

消费者组的最后一个提交的偏移到期 消费者删除 ==>如何判定最后一个提交的偏移量到期?

半兽人 -> 余生若初 2年前

默认保留7天,为何要关心最后一个offset到期呢?

I'm CxY 4年前

0.9.0版本的Kafka的consumer-groups.sh没有可以修改偏移量的--reset-offsets选项。是否意味着这个版本下的topic无法修改偏移量了? :(

I'm CxY 6年前

0.9.0版以后的kafka默认把offset放在consumer_offsets这个topic内 是不是就是--bootstrap-server这个选项?在实际使用时,好像用--zookeeper还是可以消费到数据的呀 它本质还是读的是consumer_offsets吗?是否有参数指定这个offset到底存在哪里?

半兽人 -> I'm CxY 6年前

offsets.storage。

违心者ღ 6年前

我用mirrormaker同步的时候发现,消费组的CURRENT-OFFSET 一个分区有unknown 状态,怎么能不换消费组的情况让他消费到

枣庄菜煎饼 8年前

这是旧的版本吧

0.8之前的offset存储在zookeeper的时候可以用这个查询,0.9以后要看你把offset存储在哪里了。

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章