springboot2.x +kafka使用和源码分析五(消费者配置使用)
阅读原文时间:2021年04月21日阅读:1

上一章描述springboot对于kafka事务的支持,本章主要叙说springboot对于consumer支持。

这里通过两种方式

第一种:由springboot框架来初始化基础bean,我们只需要在yml配置文件中编写配置即可。如下图所示(常规配置 具体所有配置可参考http://kafka.apache.org/documentation/ 的consumer):

springboot初始化bena源码 :

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
/**
 *初始化consumer工厂类 创建Consumer
 */
public class KafkaAutoConfiguration {
    。。。。
    @Bean
    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
    }
    。。。。
}

/**
*注入并行监听容器
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {
    。。。。
    /**
    *创建MessageListenerContainer 工厂类
    */
    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        return factory;
    }

    @Configuration(proxyBeanMethods = false)
    @EnableKafka
    @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    static class EnableKafkaConfiguration {
    }
}

对于kafkaListenerContainerFactory  的用于创建MessageListenerContainer 消息监听容器类

MessageListenerContainer有两个实现类KafkaMessageListenerContainer,ConcurrentMessageListenerContainer

KafkaMessageListenerContainer:以单线程的方式消费topic中所有partition数据

ConcurrentMessageListenerContainer:以并行的方式消费topic中所有partition数据(开启多线程),每一个线程会对应一个partition所有建议对于ConcurrentMessageListenerContainer的并发数与topic的partition数保持一致

MessageListenerContainer的作用在于对于MessageListener的管理

MessageListener这个接口的作用又是什么呢?用于消费topic中的数据,并对offset进行管理(2.3以后默认是手动提交)

springboot 对于MessageListener又默认8中实现:

1:public interface MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data);
}

2:public interface AcknowledgingMessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}

3:public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}

4:public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

5:public interface BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data);
}

6:public interface BatchAcknowledgingMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}

7:public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}

8:public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

注:

  1. offset提交方式为自动提交,使用此接口可以处理从Kafka consumer poll操作接收的单个ConsumerRecord实例。
  2. offset提交方式为手动提交,使用此接口可以处理从Kafka consumer poll操作接收的单个ConsumerRecord实例。
  3. offset提交方式为自动提交,使用此接口可以处理从Kafka consumer poll操作接收的单个ConsumerRecord实例,并提供对提供对Consumer对象的访问。
  4. offset提交方式为手动提交,使用此接口可以处理从Kafka consumer poll操作接收的单个ConsumerRecord实例,并提供对提供对Consumer对象的访问。
  5. offset提交方式为自动提交,使用此接口可以处理从Kafka consumer poll操作接收的批量ConsumerRecord实例。
  6. offset提交方式为手动提交,使用此接口可以处理从Kafka consumer poll操作接收的批量ConsumerRecord实例。
  7. offset提交方式为自动提交,使用此接口可以处理从Kafka consumer poll操作接收的批量ConsumerRecord实例,并提供对提供对Consumer对象的访问。
  8. offset提交方式为手动提交,使用此接口可以处理从Kafka consumer poll操作接收的批量ConsumerRecord实例,并提供对提供对Consumer对象的访问。

KafkaMessageListenerContainer使用demo:

public KafkaMessageListenerContainer messageListenerContainer1(@Qualifier("consumerFactory") ConsumerFactory<Integer,String> consumerFactory){


        //创建topic分区
        TopicPartition topicPartition1 = new TopicPartition("springboot_test_topic",0);

        //设置需要消费的分区偏移量
        TopicPartitionOffset tpo1 = new TopicPartitionOffset(topicPartition1,0L,TopicPartitionOffset.SeekPosition.BEGINNING);

        //创建topic分区
        TopicPartition topicPartition2 = new TopicPartition("springboot_test_topic",1);

        //设置需要消费的分区偏移量
        TopicPartitionOffset tpo2 = new TopicPartitionOffset(topicPartition2,0L,TopicPartitionOffset.SeekPosition.BEGINNING);

        //创建topic分区
        TopicPartition topicPartition3 = new TopicPartition("springboot_test_topic",2);

        //设置需要消费的分区偏移量
        TopicPartitionOffset tpo3 = new TopicPartitionOffset(topicPartition3,0L,TopicPartitionOffset.SeekPosition.BEGINNING);

        //设置topic的主题和分区 可以指定从哪个分区哪个offset开始消费
        //容器配置
        ContainerProperties containerProperties = new ContainerProperties(tpo1,tpo2,tpo3);
        /**
         * 指定
         */
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        containerProperties.setClientId("springboot_test_topic_id");
        containerProperties.setGroupId("springboot_test_topic_group");
        containerProperties.setAckTime(6000);
        containerProperties.getKafkaConsumerProperties().setProperty("enable.auto.commit","false");
        containerProperties.setPollTimeout(3000);
        containerProperties.setAckOnError(false);

        //绑定message 消息监听 手动提交 单条消费
        containerProperties.setMessageListener((AcknowledgingMessageListener<Integer, String>) (consumerRecord, acknowledgment) -> {

            System.out.println("partition============>"+consumerRecord.partition());
            System.out.println("offset============>"+consumerRecord.offset());
            System.out.println("key============>"+consumerRecord.key());
            System.out.println("value============>"+consumerRecord.value());
            //确定消费完成 commit offset
            acknowledgment.acknowledge();
        });


        //构建kafka消费者监听容器
        KafkaMessageListenerContainer<Integer,String>  kafkaMessageListenerContainer =
                new KafkaMessageListenerContainer<Integer,String>(consumerFactory,containerProperties);

        //启动消费监听
        kafkaMessageListenerContainer.start();

        return kafkaMessageListenerContainer;
    }

ConcurrentMessageListenerContainer使用demo:

public ConcurrentMessageListenerContainer messageListenerContainer(@Qualifier("consumerFactory")  ConsumerFactory<Integer,String> consumerFactory){


        String topicName = "springboot_test_topic";

        //创建topic分区
        TopicPartition topicPartition1 = new TopicPartition(topicName,0);

        //设置需要消费的分区偏移量
        TopicPartitionOffset tpo1 = new TopicPartitionOffset(topicPartition1,0L,TopicPartitionOffset.SeekPosition.BEGINNING);

        //创建topic分区
        TopicPartition topicPartition2 = new TopicPartition(topicName,1);

        //设置需要消费的分区偏移量
        TopicPartitionOffset tpo2 = new TopicPartitionOffset(topicPartition2,0L,TopicPartitionOffset.SeekPosition.BEGINNING);

        //创建topic分区
        TopicPartition topicPartition3 = new TopicPartition(topicName,2);

        //设置需要消费的分区偏移量
        TopicPartitionOffset tpo3 = new TopicPartitionOffset(topicPartition3,0L,TopicPartitionOffset.SeekPosition.BEGINNING);

        //ConsumerFactory设置topic的主题和分区
        //容器配置
        ContainerProperties containerProperties = new ContainerProperties(tpo1,tpo2,tpo3);

        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        containerProperties.setClientId("springboot_test_topic_concurrent_id");
        containerProperties.setGroupId("springboot_test_topic_concurrent_group");
        containerProperties.setAckTime(6000);
        containerProperties.getKafkaConsumerProperties().setProperty("enable.auto.commit","false");
        containerProperties.setPollTimeout(3000);
        containerProperties.setAckOnError(false);

        //绑定message 消息监听 自定提交 单条消费 存在丢失数据 以及重复消费的问题
        containerProperties.setMessageListener((AcknowledgingMessageListener<Integer, String>) (consumerRecord, acknowledgment) -> {

            System.out.println("partition============>"+consumerRecord.partition());
            System.out.println("offset============>"+consumerRecord.offset());
            System.out.println("key============>"+consumerRecord.key());
            System.out.println("value============>"+consumerRecord.value());

            //确定消费完成 commit offset
            acknowledgment.acknowledge();
        });

        containerProperties.getTopicPartitionsToAssign();


        ConcurrentMessageListenerContainer<Integer,String> cmlc = new ConcurrentMessageListenerContainer(consumerFactory,containerProperties);


        //是否设置虽容器自动启动
        cmlc.setAutoStartup(true);
        cmlc.setBeanName("concurrentMessageListenerContainer");

        //设置并发数
        cmlc.setConcurrency(3);
        //启动消费监听
        cmlc.start();

        return cmlc;
    }

上述两个demo都是通过手动commit offset(建议使用这种方式,保证数据的一致性),通过containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);AckMode设置commit offset方式,有以下几种:

  • RECORD: 当listen每处理一条就commit offset 自动提交

  • BATCH: 在listen处理完poll()返回的所有记录后提交偏移量,针对于批处理。自动提交

  • TIME: 每次间隔ackTime的时间去commit

  • COUNT: 累积达到ackCount次的ack去commit

  • COUNT_TIME: TIME和COUNT的都满足则执行提交.

  • MANUAL: 需要手动确认消息(Acknowledgement.acknowledge()),其它机制与BATCH机制相同

  • MANUAL_IMMEDIATE: 当listenr中调用Acknowledgement.acknowledge()方法时立即提交偏移量

测试结果

使用@KafkaListener注解方式

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {

    //配置消费者id
    String id() default "";

    //设置容器工厂类 默认为ConcurrentKafkaListenerContainerFactory 并行
    String containerFactory() default "";

    //设置需要消费的topic名 支持多topic消费
    String[] topics() default {};

    //支持topic名正则设置
    String topicPattern() default "";

    //topic分区类,设置主题和分区(以及可选的初始offset)
    TopicPartition[] topicPartitions() default {};

    //容器组如果该值被设置,将该容器添加到一个集合中并保存到该bean中,在这个容器集合中可以开启或关闭容器
    String containerGroup() default "";

    //提供在消费message时报错时Handler类
    String errorHandler() default "";

    //消费者组id
    String groupId() default "";

    //2.x ,该id属性(如果存在)将用作Kafka消费者group.id属性覆盖消费者工厂中的已配置属性
    boolean idIsGroup() default true;
    //客户端id前缀
    String clientIdPrefix() default "";

    //在spring容器对象中的bean名
    String beanRef() default "__listener";

    //消费并行度 此值最好等于topic的partition数使用 ${listen.concurrency:3}
    String concurrency() default "";

    //是否设置为自动启动 "${listen.auto.start:true}" 容器需要实现SmartLifecycle
    String autoStartup() default "";
    //设置kafka消费者配置 
    //支持k=v,k:v,k v 模式 如max.poll.interval.ms=3000
    String[] properties() default {};
}

@kafkaListener使用demo:

@KafkaListener(id = "springboot_test_topic_id"
            //,topics = {"springboot_test_topic"}
            ,topicPartitions = {
                 //设置 消费topic springboot_test_topic partition为0数据,1分区起始offset为0 注意:partitions或partitionOffsets属性都可以指定分区,但不能两个都指定。
                @TopicPartition(topic = "springboot_test_topic",partitionOffsets = {
                        @PartitionOffset(partition = "0",initialOffset = "0"),
                        @PartitionOffset(partition = "1",initialOffset = "0"),
                        @PartitionOffset(partition = "2",initialOffset = "0"),
                })
             }
            ,containerFactory = "kafkaListenerContainerFactory"
            ,clientIdPrefix = "_listener"
            ,concurrency = "${listen.concurrency:3}"
            ,idIsGroup = false
            ,groupId ="springboot_test_topic-group"
            ,beanRef = "springboot_test_topic"
            ,autoStartup = "${listen.auto.start:true}"
//            ,errorHandler = "validationErrorHandler"
            ,properties = {"max.poll.interval.ms=3000",
            "max.poll.records=30",
            "enable.auto.commit=false",
            "bootstrap.servers=localhost:9092"
    }
    )
    public void listen(ConsumerRecord<Integer, String> record
                        //如果需要使用Acknowledgment 需要在factory中指定ACKmodel
                        //    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
                        , Acknowledgment ack,
                       @Header(KafkaHeaders.GROUP_ID) String groupId,
                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                       @Header(KafkaHeaders.OFFSET) int offset,
                       @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                       @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {

        //消息消息
        System.out.println("=======key=========>"+record.key());
        System.out.println("=======value=========>"+record.value());
        //
        System.out.println("=======topic=========>"+topic);
        System.out.println("=======groupId=========>"+groupId);
        System.out.println("=======partition=========>"+partition);
        System.out.println("=======offset=========>"+offset);
        System.out.println("=======timestamp=========>"+timestamp);

        //手动确定 提交offset
        ack.acknowledge();
    }

有关message的元数据可从消息头获得,这里通过@Header注解获取:

KafkaHeaders.OFFSET //&nbsp;获取当前信息的offset

KafkaHeaders.RECEIVED_MESSAGE_KEY //&nbsp;获取message key信息

KafkaHeaders.RECEIVED_TOPIC //获取当前信息所在topic信息

KafkaHeaders.RECEIVED_PARTITION_ID //获取当前信息所在partitionId信息

KafkaHeaders.RECEIVED_TIMESTAMP //获取当前信息的时间戳

KafkaHeaders.TIMESTAMP_TYPE //获取当前信息的时间戳类型类型

KafkaHeaders.GROUP_ID //获取当前消费组id

测试结果:

kafka对于批次消费支持:

第一步:构建consumer批次监听容器

/**
     * 构建并行消费监听容器批次处理
     * @param consumerFactory
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>> bitchFactory(ConsumerFactory<Integer,String> consumerFactory){

        //构建kafka并行消费监听类工厂类 此类通过topic名称创建该topic消费监听
        ConcurrentKafkaListenerContainerFactory<Integer,String> concurrentKafkaListenerContainerFactory =
                new ConcurrentKafkaListenerContainerFactory<>();

        //可通过注解的方式进行设置
        concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
        //设置拉取时间超时数
        concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(3000);
        //是否开启 批次处理
        concurrentKafkaListenerContainerFactory.setBatchListener(true);

        //设置ack模型机制 当发生error时 不同处理机制针对与offset有不同处理机制
        concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        return concurrentKafkaListenerContainerFactory;

    }


  /**
     * 注册监听bean
     * @return
     */
    @Bean("bitchListeners")
    public BitchListeners bitchListeners(){
        return new BitchListeners("springboot_test_topic");
    }

第二步:使用

/**
 * 使用SpEL表达式支持特殊的标记:__bean名称
 * @author fangyuan
 */
public class BitchListeners {

    /**
     * topic名称
     *
     */
    private final String topic;

    public BitchListeners(String topic) {
        this.topic = topic;
    }

    public String getTopic() {
        return this.topic;
    }

    /**
     * 批处理
     *
     */
    @KafkaListener(id = "bitchConsumer",
            topicPartitions = {
                    //设置 消费topic springboot_test_topic partition为0数据,1分区起始offset为0 注意:partitions或partitionOffsets属性都可以指定分区,但不能两个都指定。
                    @TopicPartition(topic = "#{bitchListeners.topic}",partitionOffsets = {
                            @PartitionOffset(partition = "0",initialOffset = "0"),
                            @PartitionOffset(partition = "1",initialOffset = "0"),
                            @PartitionOffset(partition = "2",initialOffset = "0"),
                    })
            }
            , containerFactory = "bitchFactory"
//            , clientIdPrefix = ""
            , idIsGroup = false
            , concurrency = "${listen.concurrency:3}"
            , groupId = "bitchConsumer-group"
            , beanRef = "bitchConsumer_"
            , autoStartup = "${listen.auto.start:true}"
            , properties = {
                            "max.poll.interval.ms=3000",
                            //设置一个批次拉取最大消息数
                             "max.poll.records=5"})
    public void listen(List<ConsumerRecord<Integer, String>> records, Acknowledgment ack) {

        System.out.println("该批次拉取消息数====================>"+records.size());
        //遍历消息
        records.forEach(record->{
            //消息消息
            System.out.print("=======key=========>"+record.key());
            System.out.print("=======value=========>"+record.value());
            System.out.print("=======topic=========>"+record.partition());
            System.out.print("=======topic=========>"+record.offset());
            System.out.println();
        });


        //手动确定 提交offset
        ack.acknowledge();
    }
//
//    /**
//     * @param messages
//     */
//    @
//    KafkaListener(id = "bitchConsumer2"
//            , topics = {"__listener.topic"}
//            , containerFactory = "bitchFactory"
//            , idIsGroup = false
//            , concurrency = "${listen.concurrency:3}"
//            , groupId = "bitchConsumer-1")
//    public void listMsg(List<Message<String>> messages) {
//        //
//    }

    /**
     * 添加批次ack确认机制
     *
     * @param messages
     * @param ack
     */
    public void listMsgAck(List<Message<String>> messages, Acknowledgment ack) {

    }

    /**
     * @param messages
     * @param ack
     * @param consumer
     */

    public void listMsgAckConsumer(List<Message<String>> messages, Acknowledgment ack,
                                   Consumer<Integer, String> consumer) {
        //
    }

    /**
     * 如果使用使用ConsumerRecord 接收消息方法入参数只能为ConsumerRecord (如果启用ack机制可以包含Acknowledgment)
     * @param list
     */
    public void listCRs(List<ConsumerRecord<Integer, String>> list) {
    }

}

测试结果:

Demo项目github地址:https://github.com/fangyuan94/kafkaDemo