best riven

0 声望

这家伙太懒,什么都没留下

个人动态
  • best riven 回复 半兽人kafka消费者Java客户端 中 :

    打扰了,再请教一下。如果采用了时间戳来控制消费,那么在提交 偏移量的时候,是自动提交设置auto comit为true好,还是手动提交偏移量comitSync,如果手动提交,应该同步还是异步呢

    4年前
  • best riven 回复 半兽人kafka消费者Java客户端 中 :

    你好,因为第一次用这个,为了搞清楚问题才注册的账户。多有不便,敬请谅解。

    4年前
  • 半兽人 回复 best rivenkafka消费者Java客户端 中 :

    如果你需要重新读取1-2点的数据,这个”小程序“的消费者组名不能和现有正在运行的消费者组名相同。从你的描述来看,因为你通过相同的消费者组名重置了你的offset,就导致你每次都从对应时间戳开始消费。

    所以重新读取丢失的消息,不要跟线上任何一个消费者组名相同,你只是重读了这些消息,拿到这些消息怎么处理,是你另外的逻辑了。


    问题最好在问题专区里提问,可以描述的清楚一些
    https://www.orchome.com/kafka/issues

    4年前
  • 订阅了 kafka 主题! · 4年前
  • best riven 回复 半兽人kafka消费者Java客户端 中 :

    你好,看完文章后受益匪浅。我最近遇到一个需要消费指定时间段数据的问题。比如今天凌晨1-2点的数据丢了,我需要重新消费1-2点的数据,发送出去。按照网上例子写了,也能实现效果,但会导致其他主题的消费停止。具体流程是这样,终端上传数据,数据来源包含两部分,一个是来自时间主题的指令信息,一个是来自工况主题要处理的数据信息,我消费完了数据之后发送到两个主题,一个是自己的备份主题,一个是根据指令发送特定时间段的数据到客户的主题。假如客户因为服务器原因没有收到我发送的数据,我就需要从自己的备份主题里拉取指定时间段的数据,发给他们。我拉取指定时间的数据,是用时间戳实现的。先获取topicPartition,然后用assign方法订阅。seek设置消费的分区的偏移量。然后while循环执行poll方法,判断满足1-2点的数据,将其发送出去。break条件是超过2.30了就停止消费。这么写完之后,我模拟一条时间指令,能够从备份把指定时间段内的数据拉出来,但是我消费工况主题的进程就会停止。查看卡夫卡的ui界面,发现消费者偏移量也没动,所以怀疑是出了问题。我看到你有说过,按照时间戳之后,消费者都是按照指定分区对应时间戳的Offset开始消费的。卡了两天,从网上也搜了不少的信息,还是没有头绪。目前比较怀疑的地方是,我设置了手动提交偏移量,是不是每发送一条消息都要手动提交一下。还是说我消费工况主题的方式也要改成assign才可以互不影响呢。代码有些长就没贴,不知道我是否将问题描述清楚,还请大佬指教。如有需要,我将代码贴出

    4年前
  • 贬了 best rivenkafka根据时间戳消费或者消费指定时间内的数据 的评论!
     props.put("log.message.timestamp.type","LogAppendTime");
    

    为什么设置了没反应

    4年前