当前位置 博文首页 > 百看不如一RUN:Kafka源码解析之Producer发送数据流程(三)

    百看不如一RUN:Kafka源码解析之Producer发送数据流程(三)

    作者:[db:作者] 时间:2021-07-29 09:39

    1. Kakfa生产者examples

    public class Producer extends Thread {
        private final KafkaProducer<Integer, String> producer;
        private final String topic;
        private final Boolean isAsync;
    
        /**
         * 构造方法,初始化生产者对象
         * @param topic
         * @param isAsync
         */
        public Producer(String topic, Boolean isAsync) {
            Properties props = new Properties();
            // 用户拉取kafka的元数据
            props.put("bootstrap.servers", "localhost:9092");
            props.put("client.id", "DemoProducer");
            //K,V
            //设置序列化的类
            //二进制的格式
            props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //消费者 消费数据的时候 就需要反序列化
            //TODO 初始化KafkaProducer
            producer = new KafkaProducer<>(props);
            this.topic = topic;
            this.isAsync = isAsync;
        }
    
        public void run() {
            int messageNo = 1;
            // 一直会往kafka 发送数据
            while (true) {
    
                String messageStr = "Message_" + messageNo;
                long startTime = System.currentTimeMillis();
                //isAsync kafka发送数据的时候有两种方式
                //1. 异步发送
                //2. 同步发送
                if (isAsync) { // Send asynchronously
                    //异步发送,一直发送,消息响应结果交给回调函数处理
                    //这样的样式性能比较好,生产中就是用的这种方式
                    producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr), new DemoCallBack(startTime, messageNo, messageStr));
                } else { // Send synchronously
                    try {
                        //同步发送
                        //发送一条消息,等这条消息的所有后续工作都完成以后才继续下一条消息的发送
                        producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr)).get();
                        System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                }
                ++messageNo;
            }
        }
    }
    
    class DemoCallBack implements Callback {
    
        private final long startTime;
        private final int key;
        private final String message;
    
        public DemoCallBack(long startTime, int key, String message) {
            this.startTime = startTime;
            this.key = key;
            this.message = message;
        }
    
        /**
         * A callback method the user can implement to provide asynchronous handling of request completion. This method will
         * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
         * non-null.
         *
         * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). Null if an error
         *                  occurred.
         * @param exception The exception thrown during processing of this record. Null if no error occurred.
         */
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (exception != null)
                //一般我们生产中,还会有其他的备用链路
                System.out.println("有异常发生");
            else
                System.out.println("说明没有异常信息,是成功的发送!");
            if (metadata != null) {
                System.out.println(
                    "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                        "), " +
                        "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
            } else {
                exception.printStackTrace();
            }
        }
    }
    
    

    2. Producer核心流程介绍

    2.1 回顾Producer发送消息概述

    image-20200725112711549

    1. 将我们的消息进行封装为ProducerRecord对象
    2. 进行序列化操作,为什么需要进行序列化呢,因为官方提供了不同类型的序列化,同时也有自定义序列化,其实官方提供的我们已经够用了,为什么呢,因为我们发送的都是消息,那么消息可以分为基本数据类型和引用数据类型,其实基本数据类型我们一般不用,因为我们需要一些描述信息,那么剩下的其实就是对象了,我们对一个对象序列化通用的方式其实就是json,那么我们这么看的话,是不是一个String就搞定了啊
    3. 将消息进行分区,这里就很关键了,我们肯定是想把消息发送到某个topic下面的某个分区里面的,那么这个时候我们并不知道我们集群的元数据,所以我们需要进行fetch元数据,然后根据元数据信息,进行分区
    4. 这个时候我们已经知道了这条消息应该发往那个主题下的那个分区了,那么我们直接开干不好吗,其实这里并不好,我们想像一下,我们假如有100w条消息,进行发送,这个时候我们是不是要进行100W次连接,这个对于我们的资源消耗太大了,所以kafka没有这个干,他把消息放进了我们的一个缓存里面,封装之后发
    5. 在后台会启动一个Sender线程进行轮训的去检查kafka分区中的数据,其实这里就是一种生产者消费者问题,sender线程将我们的消息进行封装为一个batch一个batch的进行发送,这样很明显的提升了我们的吞吐量,在封装batch的时候还可以进行batch压缩,这个压缩有好处也有坏处,进行压缩的时候会提高吞吐,但是会增加cpu的负担,没有最好的方式只有最适合自己的方式
    6. 把消息发送给我们的kafka集群

    3. KafkaProducer初始化

        private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
            try {
                log.trace("Starting the Kafka producer");
                // 配置一些用户自定义的参数
                Map<String, Object> userProvidedConfigs = config.originals();
                this.producerConfig = config;
                this.time = new SystemTime();
                // 配置 clinetId
                clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
                if (clientId.length() <= 0)
                    clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
                Map<String, String> metricTags = new LinkedHashMap<String, String>();
                metricTags.put("client-id", clientId);
                //metric一些东西,我们一般分析源码的时候 不需要关心
                MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                        .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                        .tags(metricTags);
                List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                        MetricsReporter.class);
                reporters.add(new JmxReporter(JMX_PREFIX));
                this.metrics = new Metrics(metricConfig, reporters, time);
    
                //TODO 设置分区器,分区器可以自定义
                this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
                //TODO 重试时间
                /**
                 * Producer发送消息的时候,我们的代码里面一般会设置重试机制的
                 * 什么呢,因为我们的是分布式网络情况,网络是不稳定的,所以我们需要重试机制,hdfs当中也有很多的重试机制
                 * 这里默认的重试时间是 100ms
                 * TODO RETRY_BACKOFF_MS_CONFIG retry.backoff.ms 默认100ms
                 *     public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
                 *     public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.";
                 *     这里我们使用DOC文档的模式 这个是值得学习的
                 */
                long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
                //String 类型就包含了所有的类型
                // 对象-> josn
                //TODO 设置序列化器
                if (keySerializer == null) {
                    this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                            Serializer.class);
                    this.keySerializer.configure(config.originals(), true);
                } else {
                    config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                    this.keySerializer = keySerializer;
                }
                if (valueSerializer == null) {
                    this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                            Serializer.class);
                    this.valueSerializer.configure(config.originals(
    
    下一篇:没有了