当前位置 博文首页 > xzh_blog:Kafka两种配置文件方式

    xzh_blog:Kafka两种配置文件方式

    作者:[db:作者] 时间:2021-08-07 19:03

    1.yml配置文件(简单配置)

    spring:
      kafka:
        bootstrap-servers: ip:端口
        consumer:
          group-id: group-test
          enable-auto-commit: true
          auto-commit-interval: 1000ms
          auto-offset-reset: latest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    2.类配置方式(使用了SASL/PLAINTEXT安全认证协议)

    出现[Consumer clientId=consumer-1, groupId=group1] Bootstrap broker xxx (id: -1 rack: null) disconnected报错信息就需要用这种配置方式,加入安全认证

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author 向振华
     * @date 2021/05/10 15:58
     */
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(10);
            factory.getContainerProperties().setPollTimeout(1500);
            return factory;
        }
    
        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> propsMap = new HashMap<>(16);
            // 服务地址
            propsMap.put("bootstrap.servers", "ip:端口");
            // 安全认证协议
            propsMap.put("security.protocol", "SASL_PLAINTEXT");
            propsMap.put("sasl.mechanism", "PLAIN");
            // 填充安全认证用户名和密码
            propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"usnm\" password=\"pwd\";");
            propsMap.put("group.id", "group-test");
            propsMap.put("enable.auto.commit", true);
            propsMap.put("auto.commit.interval.ms", 1000);
            // latest: 从最新的偏移量开始消费
            propsMap.put("auto.offset.reset", "latest");
            // 反序列化方式
            propsMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            propsMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            return propsMap;
        }
    }

    再加一个配置文件sasl.jaas.config到resource

    KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafkaadmin"
        password="kafkaadminpwd"
        user_kafkaadmin="kafkaadminpwd"
        user_kafkaclient1="kafkaclient1pwd"
        user_kafkaclient2="kafkaclient2pwd";
    }; 

    使用:

    @Slf4j
    @Component
    public class KafkaConsumer {
    
        @KafkaListener(topics = {"xxx"})
        public void listener(ConsumerRecord<?, ?> record) {
            log.info("收到消息 ---> " + record.value().toString());
        }
    }

    ?

    cs