Spring Boot整合Kafka的简单用例(@KafkaListener注解)
阅读原文时间:2021年04月23日阅读:1

第一步、启动zookeeper server和kafka server

启动zookeeper server:bin/zookeeper-server-start.sh config/zookeeper.properties
启动两个kafka server:bin/kafka-server-start.sh config/server-1.properties;
bin/kafka-server-start.sh config/server.properties
zookeeper会选举一个作为leader,另外一个作为slave

第二步、创建一个maven项目

这一篇中修改了Spring Boot的版本为2.0.0,pom.xml如下:

<dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.0.0.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.1.4.RELEASE</version>
        </dependency>

第三步、kafka配置

@Configuration
@EnableKafka
public class KafkaConfig {

    /* --------------producer configuration-----------------**/
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /* --------------consumer configuration-----------------**/
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }


    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean//消息监听器
    public MyListener myListener() {
        return new MyListener();
    }


    /* --------------kafka template configuration-----------------**/
    @Bean
    public KafkaTemplate<String,String> kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }

}

第四步、topic的配置

自动创建的topic分区数是1,复制因子是0

@Configuration
@EnableKafka
public class TopicConfig {
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic foo() {
        /第一个是参数是topic名字,第二个参数是分区个数,第三个是topic的复制因子个数
        //当broker个数为1个时会创建topic失败,
        //提示:replication factor: 2 larger than available brokers: 1
        //只有在集群中才能使用kafka的备份功能
        return new NewTopic("foo", 10, (short) 2);
    }

    @Bean
    public NewTopic bar() {
        return new NewTopic("bar", 10, (short) 2);
    }

    @Bean
    public NewTopic topic1(){
        return new NewTopic("topic1", 10, (short) 2);
    }

    @Bean
    public NewTopic topic2(){
        return new NewTopic("topic2", 10, (short) 2);
    }
}

第五步、使用@KafkaListener注解

topicPartitions和topics、topicPattern不能同时使用

public class MyListener {
    @KafkaListener(id = "myContainer1",//id是消费者监听容器
            topicPartitions =//配置topic和分区:监听两个topic,分别为topic1、topic2,topic1只接收分区0,3的消息,
                    //topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为5
            { @TopicPartition(topic = "topic1", partitions = { "0", "3" }),
                    @TopicPartition(topic = "topic2", partitions = "0",
                            partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4"))
            })
    public void listen(ConsumerRecord<?, ?> record) {
        System.out.println("topic" + record.topic());
        System.out.println("key:" + record.key());
        System.out.println("value:"+record.value());
    }


    @KafkaListener(id = "myContainer2",topics = {"foo","bar"})
    public void listen2(ConsumerRecord<?, ?> record){
        System.out.println("topic:" + record.topic());
        System.out.println("key:" + record.key());
        System.out.println("value:"+record.value());
    }
}

第六步、创建发送消息的接口

@RestController
public class KafkaController {
    private final static Logger logger = LoggerFactory.getLogger(KafkaController.class);

    @Autowired
    private  KafkaTemplate<String,String> kafkaTemplate;

    @RequestMapping(value = "/{topic}/send",method = RequestMethod.GET)
    public void sendMeessageTotopic1(@PathVariable String topic,@RequestParam(value = "partition",defaultValue = "0") int partition) {
        logger.info("start send message to {}",topic);
        kafkaTemplate.send(topic,partition,"你","好");
    }
}

第七步、启动程序、调用接口

消息监听器只监听订阅的topic的特定分区的消息

源码:https://github.com/NapWells/java_framework_learn/tree/master/springkafka2