kafka消费者低级实例

半兽人 发表于: 2015-03-03   最后更新时间: 2016-10-25 22:23:01  
{{totalSubscript}} 订阅, 14,579 游览

kafka消费者低级实例


为什么使用SimpleConsumer?

使用“SimpleConsumer”的主要原因是你想比使用“消费者分组”更好的控制分区消费。

比如你想:

  1. 多次读取消息
  2. 在一个处理过程中只消费Partition其中的一部分消息
  3. 添加事务管理机制以保证消息被处理且仅被处理一次

使用SimpleConsumer有哪些弊端呢

这个SimpleConsumer确实需要很大的工作量:

  1. 必须在程序中跟踪offset值.
  2. 必须找出指定Topic(主题) Partition(分区)中的lead broker.
  3. 必须处理broker的变动.

使用SimpleConsumer的步骤

  1. 从所有活跃的broker中找出哪个是指定Topic(主题) Partition(分区)中的leader broker
  2. 找出指定Topic Partition中的所有备份broker
  3. 构造请求
  4. 发送请求获取数据
  5. 处理leader broker变更

寻找 Lead Broker 的Topic(主题)和Partition(分区)

这些不需要broker在集群中,你可以开始寻找活着的broker来查询Leader的信息。

private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
 
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
 
                List<TopicMetadata> metaData = resp.topicsMetadata();
 
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
                        + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }

调用topicMetadata()方法连接到broker中查找到我们感兴趣的topic.

partitionsMetadata循环所有分区,直到找到我们需要的。.

查找读取起始偏移量

现在定义从哪里开始读取数据。kafka包括两个常数,kafka.api.OffsetRequest.EarliestTime()发现日志中的数据的开始,kafka.api.OffsetRequest.LatestTime()将只流新消息。不要以为补偿0开始偏移。

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
 
        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

差错处理

由于SimpleConsumer不处理lead broker的失败你必须写一些代码来处理它。

if (fetchResponse.hasError()) {
     numErrors++;
     // Something went wrong!
     short code = fetchResponse.errorCode(a_topic, a_partition);
     System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
     if (numErrors > 5) break;
 
     if (code == ErrorMapping.OffsetOutOfRangeCode())  {
         // We asked for an invalid offset. For simple case ask for the last element to reset
         readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
         continue;
     }
     consumer.close();
     consumer = null;
     leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
     continue;
 }
在这里,一旦获取返回的一个错误,我们记录原因,关闭消费者,然后试图找出新的leader。
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
       for (int i = 0; i < 3; i++) {
           boolean goToSleep = false;
           PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
           if (metadata == null) {
               goToSleep = true;
           } else if (metadata.leader() == null) {
               goToSleep = true;
           } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
               // first time through if the leader hasn't changed give ZooKeeper a second to recover
               // second time, assume the broker did recover before failover, or it was a non-Broker issue
               //
               goToSleep = true;
           } else {
               return metadata.leader().host();
           }
           if (goToSleep) {
               try {
                   Thread.sleep(1000);
               } catch (InterruptedException ie) {
               }
           }
       }
       System.out.println("Unable to find new leader after Broker failure. Exiting");
       throw new Exception("Unable to find new leader after Broker failure. Exiting");
   }

此方法使用前面定义找到新领导者的findLeader()的逻辑,但在这里我们只尝试连接到该topic(主题)/partition(分区)的副本之一。这样,如果我们不能找到需要的数据,则退出。


因为它可能需要很短的时间内Zookeeper发现领袖失效并分配一个新的领导人,如果我们没有得到响应,则休眠。事实上,Zookeeper故障转移很快,所以你从不休眠。

读取数据

最后我们读取传输回来的数据,并把它写出来.

// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only.
// Setting the replicaId incorrectly will cause the brokers to behave incorrectly.
FetchRequest req = new FetchRequestBuilder()
        .clientId(clientName)
        .addFetch(a_topic, a_partition, readOffset, 100000)
        .build();
FetchResponse fetchResponse = consumer.fetch(req);
 
if (fetchResponse.hasError()) {
        // See code in previous section
}
numErrors = 0;
 
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
    long currentOffset = messageAndOffset.offset();
    if (currentOffset < readOffset) {
        System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
        continue;
    }
    readOffset = messageAndOffset.nextOffset();
    ByteBuffer payload = messageAndOffset.message().payload();
 
    byte[] bytes = new byte[payload.limit()];
    payload.get(bytes);
    System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
    numRead++;
    a_maxReads--;
}
 
if (numRead == 0) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException ie) {
    }
}

请注意,“readOffset”询问最后一次读取的消息的下一个偏移。当消息块被处理,我们就知道从哪里开始获取下一个。

还要注意的是,检查正在读取的偏移量不小于我们要求的偏移。这是必要的,因为如果kafka被压缩的消息,读取请求将返回整个压缩的块,即使请求偏移量不是压缩块的开头。还需要注意的是,我们要求fetchSize 100000bytes。如果kafka生产商正在大批量生产,这可能是不够的并可能会返回一个空的消息集。在这种情况下,应增加 fetchSize,直到返回非空的集合。.

最后,我们跟中读取信息#,如果我们没有在最后一次请求读取到消息,我们就进行休眠。

运行示例

该示例预计以下参数:

  • 消息的最大数量阅读(所以我们没有永远循环下去
  • Topic(主题)读取
  • 从Partition(分区)读取
  • 一个broker用于Metadata的查询
  • Broker端口监听

完整源码

package com.test.simple;
 
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
 
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
public class SimpleExample {
    public static void main(String args[]) {
        SimpleExample example = new SimpleExample();
        long maxReads = Long.parseLong(args[0]);
        String topic = args[1];
        int partition = Integer.parseInt(args[2]);
        List<String> seeds = new ArrayList<String>();
        seeds.add(args[3]);
        int port = Integer.parseInt(args[4]);
        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
             e.printStackTrace();
        }
    }
 
    private List<String> m_replicaBrokers = new ArrayList<String>();
 
    public SimpleExample() {
        m_replicaBrokers = new ArrayList<String>();
    }
 
    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        // find the meta data about the topic and partition we are interested in
        //
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;
 
        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
 
        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder()
                    .clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                    .build();
            FetchResponse fetchResponse = consumer.fetch(req);
 
            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5) break;
                if (code == ErrorMapping.OffsetOutOfRangeCode())  {
                    // We asked for an invalid offset. For simple case ask for the last element to reset
                    readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;
 
            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();
 
                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }
 
            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null) consumer.close();
    }
 
    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
 
        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }
 
    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give ZooKeeper a second to recover
                // second time, assume the broker did recover before failover, or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }
 
    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
 
                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
                        + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}


更新于 2016-10-25

查看history更多相关的文章或提一个关于history的问题,也可以与我们一起分享文章