Springboot2.0整合Kafka,从Kafka并发、批量获取数据
阅读原文时间:2021年04月21日阅读:1

Springboot2.0整合Kafka,从Kafka并发、批量获取数据

Kafka安装

Kafka是由Apache软件基金会开发的一个开源流处理平台,是一种高吞吐量的分布式发布订阅消息系统。
主要包含几个组件:

  • Topic:消息主题,特定消息的发布接口,每个Topic都可以分成数个Partition,用于消息的并发发送。
  • Producer:生产者,信息的发布者,发布者可以指定数个Partition进行发布。
  • Consumer:消费者,信息的使用者,同一个Group的消费者数量,最好不好超过Partition的数量,对于分区的Topic,消费者使用时需要指定相应的分区号。
  • Broker:服务代理
    ##下载kafka

SpringBoot整合kafka

当前SpringBoot版本为2.0.2.RELEASE,打包工具为Maven

消费者

a. 引入Pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kafkatest</groupId>
    <artifactId>producer</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>kafka-producer</name>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/>
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <joda-time.version>2.3</joda-time.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

b.JAVA代码

@Service
public class KafkaProducerTest {
    @Autowired
    private KafkaTemplate<String,byte[]> kafkaTemplate;
    private final String topic = "byteArray_topic1";

    public void sendMessage(int key,String value){
        ProducerRecord<String,byte[]> record = new ProducerRecord<>(topic,
                key%3,String.valueOf(key),value.getBytes());
        kafkaTemplate.send(record);
    }
}

配置文件(YML)

spring:
  kafka:
      producer:
        bootstrap-servers: 172.169.0.109:9092
        batch-size: 16384
        retries: 0
        buffer-memory: 33554432
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer

这里有一个非常陷阱的问题需要特别注意:序列化类的路径是:org.apache.kafka.common.serialization.StringSerializer
而不是
org.apache.kafka.config.serialization.StringSerializer
否则会出现如下错误:

2019-01-31 11:35:14.794 [main] WARN  o.s.c.a.AnnotationConfigApplicationContext -
                Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaProducerTest': Unsatisfied dependency expressed through field 'kafkaTemplate'; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration': Unsatisfied dependency expressed through constructor parameter 0; nested exception is org.springframework.boot.context.properties.ConfigurationPropertiesBindException: Error creating bean with name 'spring.kafka-org.springframework.boot.autoconfigure.kafka.KafkaProperties': Could not bind properties to 'KafkaProperties' : prefix=spring.kafka, ignoreInvalidFields=false, ignoreUnknownFields=true; nested exception is org.springframework.boot.context.properties.bind.BindException: Failed to bind properties under 'spring.kafka.producer.key-serializer' to java.lang.Class<?>
2019-01-31 11:35:14.810 [main] ERROR o.s.b.d.LoggingFailureAnalysisReporter -


***************************
APPLICATION FAILED TO START
***************************

Description:

Failed to bind properties under 'spring.kafka.producer.key-serializer' to java.lang.Class<?>:

    Property: spring.kafka.producer.key-serializer
    Value: org.apache.kafka.config.serialization.StringSerializer
    Origin: class path resource [application.yml]:8:25
    Reason: No converter found capable of converting from type [java.lang.String] to type [java.lang.Class<?>]

Action:

Update your application's configuration

消费者

如果不使用并发获取、批量获取消费者的代码非常简单。

a.Pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kafkatest</groupId>
    <artifactId>consumer</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>kafka-consumer</name>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <joda-time.version>2.3</joda-time.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

b1.Java代码(无并发访问、无批量获取)

@Service
@Slf4j
public class Listener {
    private final String topic = "byteArray_topic1";

    public void listen(ConsumerRecord<String, byte[]> record){
        log.info("kafka的key: " + record.key());
        log.info("kafka的value: " + new String(record.value()));
    }
}

b2.配置文件

spring:
  kafka:
    consumer:
      enable-auto-commit: true
      group-id: gridMonitorGroup
      auto-commit-interval: 1000
      auto-offset-reset: latest
      bootstrap-servers: "172.169.0.109:9092"
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

c.Java代码(并发、批量获取)

  1. Kafka消费者配置类
    批量获取关键代码:
    ①factory.setBatchListener(true);
    ②propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50);
    并发获取关键代码:
    factory.setConcurrency(concurrency);

    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    @Value("${kafka.consumer.bootstrap-servers}")
    private String servers;
    @Value("${kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group-id}")
    private String groupId;
    @Value("${kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    @Bean
    public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    //并发数量
    factory.setConcurrency(concurrency);
    //批量获取
    factory.setBatchListener(true);
    factory.getContainerProperties().setPollTimeout(1500);
    return factory;
    }

    public ConsumerFactory<String, byte[]> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //最多批量获取50个
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50);
        return propsMap;
    }
    
    @Bean
    public Listener listener() {
        return new Listener();
    }

    }

  2. Kafka消费者Listener

    @Service
    @Slf4j
    public class Listener {
    private final String topic = "byteArray_topic1";

    @KafkaListener(id="myListener",
            topicPartitions ={@TopicPartition(topic = topic, partitions = { "0", "1" ,"2"})})
    public void listen(List<ConsumerRecord<String, byte[]>> recordList) {
        recordList.forEach((record)->{
            log.info("kafka的key: " + record.key());
            log.info("kafka的value: " + new String(record.value()));
        });
    }

    }

  3. 配置文件

    kafka:
    consumer:
    enable-auto-commit: true
    group-id: gridMonitorGroup
    auto-commit-interval: 1000
    auto-offset-reset: latest
    bootstrap-servers: "172.169.0.109:9092"
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
    concurrency: 3