半兽人 发表于: 2015-03-10   最后更新时间: 2019-12-02 00:33:46  
{{totalSubscript}} 订阅,36276 游览

检查消费者的位置(Checking consumer position)

Sometimes it's useful to see the position of your consumers. We have a tool that will show the position of all consumers in a consumer group as well as how far behind the end of the log they are. To run this tool on a consumer group named my-group consuming a topic named my-topic would look like this:

> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test

Group           Topic                          Pid Offset          logSize         Lag             Owner
my-group        my-topic                       0   0               0               0               test_jkreps-mn-1394154511599-60744496-0
my-group        my-topic                       1   0               0               0               test_jkreps-mn-1394154521217-1a0be913-0

NOTE: Since, the kafka.tools.ConsumerOffsetChecker tool has been deprecated. You should use the kafka.admin.ConsumerGroupCommand (or the bin/kafka-consumer-groups.sh script) to manage consumer groups, including consumers created with the new consumer API.
注意:在0.9.0.0,kafka.tools.ConsumerOffsetChecker已经不支持了。你应该使用kafka.admin.ConsumerGroupCommand 或 bin/kafka-consumer-groups.sh脚本来管理消费者组,包括用新消费者API创建的消费者。

## 0.9+
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group

## 0.10+
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

Managing Consumer Groups(管理消费者组)

With the ConsumerGroupCommand tool, we can list, describe, or delete consumer groups. Note that deletion is only available when the group metadata is stored in ZooKeeper. When using the new consumer API (where the broker handles coordination of partition handling and rebalance), the group is deleted when the last committed offset for that group expires. For example, to list all consumer groups across all topics:
用ConsumerGroupCommand工具,我们可以使用list,describe,或delete消费者组(注意,删除只有在分组元数据存储在zookeeper的才可用)。当使用新消费者API(broker协调处理分区和重新平衡),当该组的最后一个提交的偏移到期时,该组被删除。 例如,要列出所有主题中的所有用户组:

> bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list


To view offsets as in the previous example with the ConsumerOffsetChecker, we "describe" the consumer group like this:

> bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group test-consumer-group

GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
test-consumer-group            test-foo                       0          1               3               2               consumer-1_/

There are a number of additional "describe" options that can be used to provide more detailed information about a consumer group:

  • -members: This option provides the list of all active members in the consumer group.
    -members: 此选项提供使用者组中所有活动成员的列表。

    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
    CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS
    consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /      consumer1       2
    consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /      consumer4       1
    consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /      consumer2       3
    consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /      consumer3       0
  • --members --verbose: On top of the information reported by the "--members" options above, this option also provides the partitions assigned to each member.

    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
    CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
    consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /      consumer1       2               topic1(0), topic2(0)
    consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /      consumer4       1               topic3(2)
    consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /      consumer2       3               topic2(1), topic3(0,1)
    consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /      consumer3       0               -
  • --offsets: This is the default describe option and provides the same output as the "--describe" option.

  • --state: This option provides useful group-level information.

    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state
    localhost:9092 (0)        range                     Stable               4

To manually delete one or multiple consumer groups, the "--delete" option can be used:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

 Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.

To reset offsets of a consumer group, "--reset-offsets" option can be used. This option supports one consumer group at the time. It requires defining following scopes: --all-topics or --topic. One scope must be selected, unless you use '--from-file' scenario. Also, first make sure that the consumer instances are inactive. See KIP-122 for more details.
要重置消费者组的offset,可以使用“--reset-offsets”选项。此选项同一时间只支持一个消费者组操作。它需要定义以下范围:--all-topics--topic。 除非您使用"--from-file"方案,否则必须选择一个范围。另外,首先请确保消费者实例处于非活动状态。有关更多详细信息,请参见KIP-122。

It has 3 execution options:

  • (default) to display which offsets to reset.
  • --execute : to execute --reset-offsets process.
  • --export : to export the results to a CSV format.

--reset-offsets also has following scenarios to choose from (atleast one scenario must be selected):
--reset-offsets 还具有以下场景可供选择(必须选择至少一个场景):

  • --to-datetime : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'
    --reset-offsets :将offset重置为与日期时间的offset。 格式:'YYYY-MM-DDTHH:mm:SS.sss'

  • --to-earliest : Reset offsets to earliest offset.
    --to-earliest : 将offset重置为最早的offset。

  • --to-latest : Reset offsets to latest offset.
    --to-latest : 将offsets重置为最新的offsets。
  • --shift-by : Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.
    --shift-by : 重置offsets,通过移位“n”,其中“ n”可以为正或负。
  • --from-file : Reset offsets to values defined in CSV file.
    - --from-file : 将offset重置为CSV文件中定义的值。
  • --to-current : Resets offsets to current offset.
    --to-current : 将offset重置为当前的offset。
  • --by-duration : Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'
    --by-duration : 将offset重置为从当前时间戳重置为持续时间offset。格式:“ PnDTnHnMnS”
  • --to-offset : Reset offsets to a specific offset.
    --to-offset : 将offset重置为指定的。

Please note, that out of range offsets will be adjusted to available offset end. For example, if offset end is at 10 and offset shift request is of 15, then, offset at 10 will actually be selected.

For example, to reset offsets of a consumer group to the latest offset:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest

 TOPIC                          PARTITION  NEW-OFFSET
topic1                         0          0

If you are using the old high-level consumer and storing the group metadata in ZooKeeper (i.e. offsets.storage=zookeeper), pass --zookeeper instead of bootstrap-server:

> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

您需要解锁本帖隐藏内容请: 点击这里

I'm CxY 1年前

0.9.0版本的Kafka的consumer-groups.sh没有可以修改偏移量的--reset-offsets选项。是否意味着这个版本下的topic无法修改偏移量了? :(

I'm CxY 2年前

0.9.0版以后的kafka默认把offset放在consumer_offsets这个topic内 是不是就是--bootstrap-server这个选项?在实际使用时,好像用--zookeeper还是可以消费到数据的呀 它本质还是读的是consumer_offsets吗?是否有参数指定这个offset到底存在哪里?

半兽人 -> I'm CxY 2年前


违心者ღ 3年前

我用mirrormaker同步的时候发现,消费组的CURRENT-OFFSET 一个分区有unknown 状态,怎么能不换消费组的情况让他消费到

枣庄菜煎饼 4年前