当前位置 主页 > 网站技术 > 代码类 >

    kafka监控获取指定topic的消息总量示例

    栏目:代码类 时间:2019-12-23 12:07

    我就废话不多说了,直接 上代码吧!

    import kafka.api.PartitionOffsetRequestInfo;
    import kafka.common.TopicAndPartition;
    import kafka.javaapi.OffsetResponse;
    import kafka.javaapi.PartitionMetadata;
    import kafka.javaapi.TopicMetadata;
    import kafka.javaapi.TopicMetadataRequest;
    import kafka.javaapi.consumer.SimpleConsumer;
     
    import java.util.*;
    import java.util.Map.Entry;
     
    public class KafkaOffsetTools {
    public final static String KAFKA_TOPIC_NAME_ADAPTER = "sample";
    public final static String KAFKA_TOPIC_NAME_EXCEPTION = "exception";
    public final static String KAFKA_TOPIC_NAME_AUDIT = "audit";
    private static final String rawTopicTotal = "rawTopicTotalRecordCounter";
    private static final String avroTopicTotal = "avroTopicTotalRecordCounter";
    private static final String exceptionTopicTotal = "exceptionTopicTotalRecordCounter";
     
    public KafkaOffsetTools() {
    }
     
    public static long getLastOffset(SimpleConsumer consumer, String topic,
    int partition, long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
    partition);
    Map, PartitionOffsetRequestInfo> requestInfo = new HashMap, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
    whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
    requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
    clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);
     
    if (response.hasError()) {
    System.err.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
    return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
    }
     
    private TreeMap, PartitionMetadata> findLeader(List a_seedBrokers, String a_topic) {
    TreeMap, PartitionMetadata> map = new TreeMap, PartitionMetadata>();
    loop:
    for (String seed : a_seedBrokers) {
    SimpleConsumer consumer = null;
    try {
    String[] hostAndPort;
    hostAndPort = seed.split(":");
    consumer = new SimpleConsumer(hostAndPort[0], Integer.valueOf(hostAndPort[1]), 100000, 64 * 1024,
    "leaderLookup" + new Date().getTime());
    List topics = Collections.singletonList(a_topic);
    TopicMetadataRequest req = new TopicMetadataRequest(topics);
    kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
     
    List metaData = resp.topicsMetadata();
    for (TopicMetadata item : metaData) {
    for (PartitionMetadata part : item.partitionsMetadata()) {
    map.put(part.partitionId(), part);
    }
    }
    } catch (Exception e) {
    System.out.println("Error communicating with Broker [" + seed
    + "] to find Leader for [" + a_topic + ", ] Reason: " + e);
    } finally {
    if (consumer != null)
    consumer.close();
    }
    }
    return map;
    }
     
    public static void main(String[] args) {
    String kafkaBrokerList = System.getenv("metadata.broker.list");
    if(kafkaBrokerList == null || kafkaBrokerList.length() == 0){
    System.err.println("No config kafka metadata.broker.list,it is null .");
    //for test
    kafkaBrokerList = "localhost:9092,localhost:9093";
    System.err.println("Use this broker list for test,metadata.broker.list="+kafkaBrokerList);
    }
    //init topic,logSize = 0
    Map,Integer> topics = new HashMap,Integer>();
    topics.put(KAFKA_TOPIC_NAME_ADAPTER,0);
    topics.put(KAFKA_TOPIC_NAME_EXCEPTION,0);
    topics.put(KAFKA_TOPIC_NAME_AUDIT,0);
    //init kafka broker list
    String[] kafkaHosts;
    kafkaHosts = kafkaBrokerList.split(",");
    if (kafkaHosts == null || kafkaHosts.length == 0) {
    System.err.println("No config kafka metadata.broker.list,it is null .");
    System.exit(1);
    }
    List seeds = new ArrayList();
    for (int i = 0; i < kafkaHosts.length; i++) {
    seeds.add(kafkaHosts[i]);
    }
     
    KafkaOffsetTools kot = new KafkaOffsetTools();
     
    for(String topicName : topics.keySet()){
    TreeMap, PartitionMetadata> metadatas = kot.findLeader(seeds, topicName);
    int logSize = 0;
    for (Entry, PartitionMetadata> entry : metadatas.entrySet()) {
    int partition = entry.getKey();
    String leadBroker = entry.getValue().leader().host();
    String clientName = "Client_" + topicName + "_" + partition;
    SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000,
    64 * 1024, clientName);
    long readOffset = getLastOffset(consumer, topicName, partition,
    kafka.api.OffsetRequest.LatestTime(), clientName);
    logSize += readOffset;
    if (consumer != null) consumer.close();
    }
    topics.put(topicName,logSize);
    }
    System.out.println(topics.toString());
    System.out.println(rawTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_ADAPTER)+" "+System.currentTimeMillis());
    System.out.println(avroTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_AUDIT)+" "+System.currentTimeMillis());
    System.out.println(exceptionTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_EXCEPTION)+" "+System.currentTimeMillis());
    }
    }