Kafka客户端Producer与Consumer
阅读原文时间:2023年07月10日阅读:1

Kafka客户端Producer与Consumer

一、pom.xml

 <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.8.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.8.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.3</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <configuration>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <finalName>${project.artifactId}-${project.version}</finalName>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>META-INF/LICENSE*</exclude>
                                        <exclude>META-INF/NOTICE*</exclude>
                                        <exclude>license/*</exclude>
                                        <exclude>LICENSE*</exclude>
                                        <exclude>NOTICE*</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.bigData.DataProducer</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

二、相关配置文件

#acks=1
bootstrap.servers=alary001:9092,alary002:9092,alary003:9092
retries=2
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer


# Output pattern : date [thread] priority category - message   FATAL 0  ERROR 3  WARN 4  INFO 6  DEBUG 7
log4j.rootLogger=INFO, Console

#Console
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d %-5p [%c{5}] - %m%n



topic=Data_Server

三、Producer客户端

在集群上启动zookeeper

zkServer.sh start

查看zookeeper的状态

zkServer.sh status

启动kafka集群:

kafka-server-start.sh config/server.properties &

创建新的topic

kafka-topics.sh --create --zookeeper alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0 --replication-factor 3 --partitions 3 --topic Data_Server

查看topic副本信息

kafka-topics.sh --describe alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0 --replication-factor 3 --partitions 3 --topic Data_Server

查看已经创建的topic信息

kafka-topics.sh --list --zookeeper alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0

测试生产者发送消息

bin/kafka-console-producer.sh --broker-list alary001:9092,alary002:9092,alary003:9092 --topic Data_Server

测试消费者消费消息

kafka-console-consumer.sh --bootstrap-server alary001:9092,alary002:9092,alary003:9092 --from-beginning --topic Data_Server

删除topic

bin/kafka-topics.sh --zookeeper alary001:2181/home/hadoop/app/kafka_2.12-2.2.0,alary002:2181/home/hadoop/app/kafka_2.12-2.2.0,alary003:2181/home/hadoop/app/kafka_2.12-2.2.0 --delete --topic Data_Server

需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

停止Kafka服务

kafka-server-stop.sh stop

停止zookeeper集群

zkServer.sh stop

package com.zlkj.producer;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

public class DataTransmission {

    private static final Logger logger = LoggerFactory.getLogger(com.zlkj.producer.DataTransmission.class);

    public static void main(String[] args) {
        Properties baseConfiguration = new Properties();
        Properties producerConfiguration = new Properties();
        try {
            baseConfiguration.load(com.zlkj.producer.DataTransmission.class.getResourceAsStream("/base.properties"));
            if (args != null && args.length > 0 && StringUtils.isNoneBlank(args[0])) {
                producerConfiguration.load(new FileInputStream(args[0]));
            } else {
                producerConfiguration.load(com.zlkj.producer.DataTransmission.class.getResourceAsStream("/producer.properties"));
            }
        } catch (IOException e) {
            logger.error("=================加载配置异常=================");
        }

        //发送消息
        Producer producer = new KafkaProducer<String, String>(producerConfiguration);
        for (int i = 1; i <= 10; i++) {
            String value = "value_" + i;
            logger.info("发送的消息: {}", value);
            ProducerRecord<String, String> msg = new ProducerRecord<String, String>(baseConfiguration.getProperty("topic"), value);
            producer.send(msg);
        }
        producer.close();
    }
}