SpringBoot学习:kafka批量并发消费配置(consumer+producer)
阅读原文时间:2021年04月22日阅读:1

springboot使用的是2.0.0版本,spring-kafka使用的是2.1.4版本,配置的时候遇到了一些问题,在此总结一下:
1. session-timeout连接超时时间,之前 配置的是3000(ms),一直报异常,堆栈信息提示 连接超时时间不能大于“某时间”,这里一直没弄懂“某时间”是指哪个时间,后改为6000(ms)(若有大佬知道的,欢迎骚扰!!!)。
ps:忘记“ ”里的时间是什么了,可能是我英语太差的原因。
2. 是否开启自动提交enable-auto-commit: false,不开启需手动提交偏移量(offset)

    enable-auto-commit: false          #是否开启自动提交
    #auto-commit-interval: 1000        #自动提交的间隔时间,自动提交去掉#号


    //设置偏移量的方式
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);


    //kafkalistener中加入
     ack.acknowledge();//手动提交偏移量
  1. 向kafka发送数据以及消费kafka中的数据,json的序列化和反序列化使用了不同的json框架,我在此就分别用了jackson和fastjson,导致消费抛出异常。

springboot整合kafka配置:

  • pom文件;

    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
     </dependencies>
  • 配置文件:

    #kafka配置信息
    kafka:
    producer:
    bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667
    batch-size: 16785 #一次最多发送数据量
    retries: 1 #发送失败后的重复发送次数
    buffer-memory: 33554432 #32M批处理缓冲区
    linger: 1
    consumer:
    bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667
    auto-offset-reset: latest #最早未被消费的offset earliest
    max-poll-records: 3100 #批量消费一次最大拉取的数据量
    enable-auto-commit: false #是否开启自动提交
    auto-commit-interval: 1000 #自动提交的间隔时间
    session-timeout: 20000 #连接超时时间
    max-poll-interval: 15000 #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
    max-partition-fetch-bytes: 15728640 #设置拉取数据的大小,15M
    listener:
    batch-listener: true #是否开启批量消费,true表示批量消费
    concurrencys: 3,6 #设置消费的线程数
    poll-timeout: 1500 #只限自动提交,

  • 生产者配置:

    /**

    • @Auther: hs

    • @Date: 2019/3/6 21:57

    • @Description:
      */
      @Configuration
      @EnableKafka
      public class KafkaProducerConfig {

      @Value("${kafka.producer.bootstrap-servers}")
      private String bootstrapServers;

      @Value("${kafka.producer.retries}")
      private Integer retries;

      @Value("${kafka.producer.batch-size}")
      private Integer batchSize;

      @Value("${kafka.producer.buffer-memory}")
      private Integer bufferMemory;

      @Value("${kafka.producer.linger}")
      private Integer linger;

      private Map producerConfigs() {
      Map props = new HashMap<>(7);
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
      props.put(ProducerConfig.RETRIES_CONFIG, retries);
      props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
      props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
      props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      return props;
      }

      private ProducerFactory producerFactory() {
      DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
      /producerFactory.transactionCapable(); producerFactory.setTransactionIdPrefix("hous-");/
      return producerFactory;
      }

      /@Bean public KafkaTransactionManager transactionManager() { KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory()); return manager; }/

      @Bean
      public KafkaTemplate kafkaTemplate() {
      return new KafkaTemplate<>(producerFactory());
      }
      }

  • KafkaSender:

    /**

    • @Auther: hs

    • @Date: 2019/3/6 23:46

    • @Description:
      */
      @Component
      @Slf4j
      public class KafkaSender {

      private final KafkaTemplate KAFKA_TEMPLATE;

      @Autowired
      public KafkaSender(KafkaTemplate kafkaTemplate) {
      this.KAFKA_TEMPLATE = kafkaTemplate;
      }

      public void sendMessage(String topic, String message){

      ListenableFuture<SendResult<String, String>> sender = KAFKA_TEMPLATE.send(new ProducerRecord<>(topic, message));

      // //发送成功
      // SuccessCallback successCallback = result -> log.info("数据发送成功!");
      // //发送失败回调
      // FailureCallback failureCallback = ex -> log.error("数据发送失败!");

      sender.addCallback(result -> {}, ex -> log.error("数据发送失败!"));

      }

    }

  • 消费者配置:

    /**

    • @author: hs

    • @Date: 2019/3/5 19:50

    • @Description:
      */
      @Configuration
      @EnableKafka
      public class KafkaConsumerConfig {

      @Value("${kafka.consumer.bootstrap-servers}")
      private String bootstrapServers;

      @Value("${kafka.consumer.enable-auto-commit}")
      private Boolean autoCommit;

      @Value("${kafka.consumer.auto-commit-interval}")
      private Integer autoCommitInterval;

      @Value("${kafka.consumer.max-poll-records}")
      private Integer maxPollRecords;

      @Value("${kafka.consumer.auto-offset-reset}")
      private String autoOffsetReset;

      @Value("#{'${kafka.listener.concurrencys}'.split(',')[0]}")
      private Integer concurrency3;

      @Value("#{'${kafka.listener.concurrencys}'.split(',')[1]}")
      private Integer concurrency6;

      @Value("${kafka.listener.poll-timeout}")
      private Long pollTimeout;

      @Value("${kafka.consumer.session-timeout}")
      private String sessionTimeout;

      @Value("${kafka.listener.batch-listener}")
      private Boolean batchListener;

      @Value("${kafka.consumer.max-poll-interval}")
      private Integer maxPollInterval;

      @Value("${kafka.consumer.max-partition-fetch-bytes}")
      private Integer maxPartitionFetchBytes;

      /**

      • 并发数6
        *
      • @return
        */
        @Bean
        @ConditionalOnMissingBean(name = "kafkaBatchListener6")
        public KafkaListenerContainerFactory> kafkaBatchListener6() {
        ConcurrentKafkaListenerContainerFactory factory = kafkaListenerContainerFactory();
        factory.setConcurrency(concurrency6);
        return factory;
        }

      /**

      • 并发数3
        *
      • @return
        */
        @Bean
        @ConditionalOnMissingBean(name = "kafkaBatchListener3")
        public KafkaListenerContainerFactory> kafkaBatchListener3() {
        ConcurrentKafkaListenerContainerFactory factory = kafkaListenerContainerFactory();
        factory.setConcurrency(concurrency3);
        return factory;
        }

      private ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
      ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(consumerFactory());
      //批量消费
      factory.setBatchListener(batchListener);
      //如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
      // 如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
      //手动提交无需配置
      factory.getContainerProperties().setPollTimeout(pollTimeout);
      //设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
      factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
      return factory;
      }

      private ConsumerFactory consumerFactory() {
      return new DefaultKafkaConsumerFactory<>(consumerConfigs());
      }

      private Map consumerConfigs() {
      Map props = new HashMap<>(10);
      props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
      props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
      props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
      props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
      props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      return props;
      }

    }

  • KafkaListener:

    /**

    • @Auther: hs

    • @Date: 2019/3/7 00:12

    • @Description:
      */
      @Slf4j
      @Component
      public class KafkaListeners {

      @KafkaListener(containerFactory = "kafkaBatchListener6",topics = {"#{'${kafka.listener.topics}'.split(',')[0]}"})
      public void batchListener(List> records,Acknowledgment ack){

      List<User> userList = new ArrayList<>();
      try {
          records.forEach(record -> {
              User user = JSON.parseObject(record.value().toString(),User.class);
              user.getCreateTime().format(DateTimeFormatter.ofPattern(Contants.DateTimeFormat.DATE_TIME_PATTERN));
              userList.add(user);
          });
      } catch (Exception e) {
          log.error("Kafka监听异常"+e.getMessage(),e);
      } finally {
          ack.acknowledge();//手动提交偏移量
      }

      }

    }

  • 定时任务:

    /**

    • @Auther: hs

    • @Date: 2019/3/7 00:53

    • @Description:
      */
      @Component
      public class UserTask {

      @Value("#{'${kafka.listener.topics}'.split(',')}")
      private List topics;

      private final MultiService MUlTI_SERVICE;

      private final KafkaSender KAFKA_SENDER;

      @Autowired
      public UserTask(MultiService multiService, KafkaSender kafkaSender){
      this.MUlTI_SERVICE = multiService;
      this.KAFKA_SENDER = kafkaSender;
      }

      @Scheduled(fixedRate = 10 * 1000)
      public void addUserTask() {
      User user=new User();
      user.setUserName("HS");
      user.setDescription("text");
      user.setCreateTime(LocalDateTime.now());
      String JSONUser = JSON.toJSONStringWithDateFormat(user,
      Contants.DateTimeFormat.DATE_TIME_PATTERN,//日期格式化
      SerializerFeature.PrettyFormat);//格式化json
      for (int i = 0; i < 700; i++) {
      KAFKA_SENDER.sendMessage(topics.get(0), JSONUser);
      }
      MUlTI_SERVICE.addUser(user);
      }
      }

  • 其他类:

    /**

    • @Auther: hs

    • @Date: 2019/3/6 20:21

    • @Description:
      */
      public class Contants {

      public static class DateTimeFormat{

      public static final String DATE_TIME_PATTERN="yyyy-MM-dd HH:mm:ss";
      
      public static final String DATE_PATTERN="yyyy-MM-dd";
      
      public static final String TIME_PATTERN="HH:mm:ss";
      
      public static final String DATE_TIME_STAMP="yyyyMMddHHmmss";
      
      public static final String DATE_STAMP="yyyyMMdd";
      
      public static final String TIME_STAMP="HHmmss";

      }
      }

    /**

    • @Auther: hs
    • @Date: 2019/2/23 17:53
    • @Description:
      */
      @Data
      public class User {
      private Integer id;
      private String userName;
      private String description;
      //@JSONField(format = "yyyy-MM-dd HH:mm:ss")
      private LocalDateTime createTime;
      }
  • 启动类:

    @ComponentScan("com.*")
    @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
    @EnableScheduling
    @EnableAsync
    public class MysqldbApplication implements CommandLineRunner {
    public static void main(String[] args) {
    SpringApplication.run(MysqldbApplication.class, args);
    }

    @Override
    public void run(String... strings) throws Exception {
    JSON.DEFFAULT_DATE_FORMAT= Contants.DateTimeFormat.DATE_TIME_PATTERN;
    }

    }

总结:自动提交,在服务启停时,会有重复数据被生产到kafka中,保证吞吐量的同时,降低了kafka的原子性;手动提交,保证了kafka的原子性,同时降低了kafka的吞吐量,实际开发中,可跟随数据量的大小,自行分析配置。
error:

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:721)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:599)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1242)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.ackImmediate(KafkaMessageListenerContainer.java:789)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processAck(KafkaMessageListenerContainer.java:772)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2400(KafkaMessageListenerContainer.java:314)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ConsumerBatchAcknowledgment.acknowledge(KafkaMessageListenerContainer.java:1342)
    at com.staryea.servicelevel.adjustment.kafka.KafkaListeners.spanBatchListener(KafkaListeners.java:66)
    at sun.reflect.GeneratedMethodAccessor76.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:248)
    ... 9 common frames omitted

若调整session-timeout不起效果,请调整max-poll-interval参数,一条业务线没有跑完,时间已经超出了要poll的时间,就会报如上错误。 若数据积压,可以调整max-partition-fetch-bytes参数,默认1M(1_1024_1024),只会拉取1M的数据,配合max-poll-records可以限制批量拉取数据的数量

AckMode :
    RECORD
    每处理一条commit一次
    BATCH(默认)
    每次poll的时候批量提交一次,频率取决于每次poll的调用频率
    TIME 
    每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
    COUNT 
    累积达到ackCount次的ack去commit
    COUNT_TIME
    ackTime或ackCount哪个条件先满足,就commit
    MANUAL
    listener负责ack,但是背后也是批量上去
    MANUAL_IMMEDIATE
    listner负责ack,每调用一次,就立即commit