Kafka之 API实战
阅读原文时间:2023年07月08日阅读:1

Kafka之 API实战

  1)启动zk和kafka集群,在kafka集群中打开一个消费者

[hadoop1 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop1:2181 --topic first

  2)导入pom依赖

org.apache.kafka kafka-clients 2.0.0

package com.libt.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class OldProducer {

@SuppressWarnings("deprecation")  
public static void main(String\[\] args) {

    Properties properties = new Properties();  
    properties.put("metadata.broker.list", "hadoop1:9092");  
    properties.put("request.required.acks", "1");  
    properties.put("serializer.class", "kafka.serializer.StringEncoder");

    Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties));

    KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world");  
    producer.send(message );  
}  

}

package com.libt.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class NewProducer {
public static void main(String[] args) {

//kafka所需要的配置信息
Properties props = new Properties();
// Kafka集群 服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop2:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);  
    for (int i = 0; i < 50; i++) {  
        producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));  
    }

    producer.close();  
}  

}

package com.libt.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CallBackProducer {

public static void main(String\[\] args) {

Properties props = new Properties();
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop2:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 增加服务端请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 //创建生产者  
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

    for (int i = 0; i < 50; i++) {

        kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {

            @Override  
            public void onCompletion(RecordMetadata metadata, Exception exception) {

                if (metadata != null) {

                    System.err.println(metadata.partition() + "---" + metadata.offset());  
                }  
            }  
        });  
    }

    kafkaProducer.close();  
}  

}

package com.libt.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CallBackProducer {

public static void main(String\[\] args) {

Properties props = new Properties();
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop2:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 增加服务端请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 //创建生产者  
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

    for (int i = 0; i < 50; i++) {

        kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {

            @Override  
            public void onCompletion(RecordMetadata metadata, Exception exception) {

                if (metadata != null) {

                    System.err.println(metadata.partition() + "---" + metadata.offset());  
                }  
            }  
        });  
    }

    kafkaProducer.close();  
}  

}

0)需求:将所有数据存储到topic的第0号分区上
1)定义一个类实现Partitioner接口,重写里面的方法(过时API)

package com.libt.kafka;
import java.util.Map;
import kafka.producer.Partitioner;

public class CustomPartitioner implements Partitioner {

public CustomPartitioner() {  
    super();  
}

@Override  
public int partition(Object key, int numPartitions) {  
    // 控制分区  
    return 0;  
}  

}

2)自定义分区(新API)

package com.libt.kafka;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class CustomPartitioner implements Partitioner {

@Override  
public void configure(Map<String, ?> configs) {

}

@Override  
public int partition(String topic, Object key, byte\[\] keyBytes, Object value, byte\[\] valueBytes, Cluster cluster) {  
    // 控制分区  
    return 0;  
}

@Override  
public void close() {

}  

}

3)在代码中调用

package com.libt.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class PartitionerProducer {

public static void main(String\[\] args) {

    Properties props = new Properties();  
    // Kafka服务端的主机名和端口号  
    props.put("bootstrap.servers", "hadoop2:9092");  
    // 等待所有副本节点的应答  
    props.put("acks", "all");  
    // 消息发送最大尝试次数  
    props.put("retries", 0);  
    // 一批消息处理大小  
    props.put("batch.size", 16384);  
    // 增加服务端请求延时  
    props.put("linger.ms", 1);  
    // 发送缓存区内存大小  
    props.put("buffer.memory", 33554432);  
    // key序列化  
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
    // value序列化  
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
    // 自定义分区  
    props.put("partitioner.class", "com.atguigu.kafka.CustomPartitioner");

    Producer<String, String> producer = new KafkaProducer<>(props);  
    producer.send(new ProducerRecord<String, String>("first", "1", "atguigu"));

    producer.close();  
}  

}

4)测试

(1)在hadoop1上监控/home/bigdata/kafka/logs/目录下first主题3个分区的log日志动态变化情况

[hadoop1 first-0]$ tail -f 00000000000000000000.log
[hadoop1 first-1]$ tail -f 00000000000000000000.log
[hadoop1 first-2]$ tail -f 00000000000000000000.log

(2)发现数据都存储到指定的分区了。

0)在控制台创建发送者

[hadoop1 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop1:9092 --topic first

hello world

1)创建消费者(过时API)

package com.libt.kafka.consume;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class CustomConsumer {

@SuppressWarnings("deprecation")  
public static void main(String\[\] args) {  
    Properties properties = new Properties();

    properties.put("zookeeper.connect", "hadoop1:2181");  
    properties.put("group.id", "g1");  
    properties.put("zookeeper.session.timeout.ms", "500");  
    properties.put("zookeeper.sync.time.ms", "250");  
    properties.put("auto.commit.interval.ms", "1000");

    // 创建消费者连接器  
    ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

    HashMap<String, Integer> topicCount = new HashMap<>();  
    topicCount.put("first", 1);

    Map<String, List<KafkaStream<byte\[\], byte\[\]>>> consumerMap = consumer.createMessageStreams(topicCount);

    KafkaStream<byte\[\], byte\[\]> stream = consumerMap.get("first").get(0);

    ConsumerIterator<byte\[\], byte\[\]> it = stream.iterator();

    while (it.hasNext()) {  
        System.out.println(new String(it.next().message()));  
    }  
}  

}

2)官方提供案例(自动维护消费情况)(新API)

package com.libt.kafka.consume;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class CustomNewConsumer {

public static void main(String\[\] args) {

    Properties props = new Properties();  
    // 定义kakfa 服务的地址,不需要将所有broker指定上  
    props.put("bootstrap.servers", "hadoop1:9092");  
    // 制定consumer group  
    props.put("group.id", "test");  
    // 是否自动确认offset  
    props.put("enable.auto.commit", "true");  
    // 自动确认offset的时间间隔  
    props.put("auto.commit.interval.ms", "1000");  
    // key的序列化类  
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
    // value的序列化类  
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
    // 定义consumer  
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

    // 消费者订阅的topic, 可同时订阅多个  
    consumer.subscribe(Arrays.asList("first", "second","third"));

    while (true) {  
        // 读取数据,读取超时时间为100ms  
        ConsumerRecords<String, String> records = consumer.poll(100);

        for (ConsumerRecord<String, String> record : records)  
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());  
    }  
}  

}

实现使用低级API读取指定topic,指定partition,指定offset的数据。
1)消费者使用低级API 的主要步骤:

步骤

主要工作

1

根据指定的分区从主题元数据中找到主副本

2

获取分区最新的消费进度

3

从主副本拉取分区的消息

4

识别主副本的变化,重试

2)方法描述:

findLeader()

客户端向种子节点发送主题元数据,将副本集加入备用节点

getLastOffset()

消费者客户端发送偏移量请求,获取分区最近的偏移量

run()

消费者低级AP I拉取消息的主要方法

findNewLeader()

当分区的主副本节点发生故障,客户将要找出新的主副本

3)代码:

package com.libt;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class SimpleExample {
private List m_replicaBrokers = new ArrayList<>();

public SimpleExample() {  
    m\_replicaBrokers = new ArrayList<>();  
}

public static void main(String args\[\]) {  
    SimpleExample example = new SimpleExample();  
    // 最大读取消息数量  
    long maxReads = Long.parseLong("3");  
    // 要订阅的topic  
    String topic = "test1";  
    // 要查找的分区  
    int partition = Integer.parseInt("0");  
    // broker节点的ip  
    List<String> seeds = new ArrayList<>();  
    seeds.add("192.168.9.102");  
    seeds.add("192.168.9.103");  
    seeds.add("192.168.9.104");  
    // 端口  
    int port = Integer.parseInt("9092");  
    try {  
        example.run(maxReads, topic, partition, seeds, port);  
    } catch (Exception e) {  
        System.out.println("Oops:" + e);  
        e.printStackTrace();  
    }  
}

public void run(long a\_maxReads, String a\_topic, int a\_partition, List<String> a\_seedBrokers, int a\_port) throws Exception {  
    // 获取指定Topic partition的元数据  
    PartitionMetadata metadata = findLeader(a\_seedBrokers, a\_port, a\_topic, a\_partition);  
    if (metadata == null) {  
        System.out.println("Can't find metadata for Topic and Partition. Exiting");  
        return;  
    }  
    if (metadata.leader() == null) {  
        System.out.println("Can't find Leader for Topic and Partition. Exiting");  
        return;  
    }  
    String leadBroker = metadata.leader().host();  
    String clientName = "Client\_" + a\_topic + "\_" + a\_partition;

    SimpleConsumer consumer = new SimpleConsumer(leadBroker, a\_port, 100000, 64 \* 1024, clientName);  
    long readOffset = getLastOffset(consumer, a\_topic, a\_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);  
    int numErrors = 0;  
    while (a\_maxReads > 0) {  
        if (consumer == null) {  
            consumer = new SimpleConsumer(leadBroker, a\_port, 100000, 64 \* 1024, clientName);  
        }  
        FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a\_topic, a\_partition, readOffset, 100000).build();  
        FetchResponse fetchResponse = consumer.fetch(req);

        if (fetchResponse.hasError()) {  
            numErrors++;  
            // Something went wrong!  
            short code = fetchResponse.errorCode(a\_topic, a\_partition);  
            System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);  
            if (numErrors > 5)  
                break;  
            if (code == ErrorMapping.OffsetOutOfRangeCode()) {  
                // We asked for an invalid offset. For simple case ask for  
                // the last element to reset  
                readOffset = getLastOffset(consumer, a\_topic, a\_partition, kafka.api.OffsetRequest.LatestTime(), clientName);  
                continue;  
            }  
            consumer.close();  
            consumer = null;  
            leadBroker = findNewLeader(leadBroker, a\_topic, a\_partition, a\_port);  
            continue;  
        }  
        numErrors = 0;

        long numRead = 0;  
        for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a\_topic, a\_partition)) {  
            long currentOffset = messageAndOffset.offset();  
            if (currentOffset < readOffset) {  
                System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);  
                continue;  
            }  
            readOffset = messageAndOffset.nextOffset();  
            ByteBuffer payload = messageAndOffset.message().payload();

            byte\[\] bytes = new byte\[payload.limit()\];  
            payload.get(bytes);  
            System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));  
            numRead++;  
            a\_maxReads--;  
        }

        if (numRead == 0) {  
            try {  
                Thread.sleep(1000);  
            } catch (InterruptedException ie) {  
            }  
        }  
    }  
    if (consumer != null)  
        consumer.close();  
}

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {  
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);  
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();  
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));  
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);  
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {  
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));  
        return 0;  
    }  
    long\[\] offsets = response.offsets(topic, partition);  
    return offsets\[0\];  
}

private String findNewLeader(String a\_oldLeader, String a\_topic, int a\_partition, int a\_port) throws Exception {  
    for (int i = 0; i < 3; i++) {  
        boolean goToSleep = false;  
        PartitionMetadata metadata = findLeader(m\_replicaBrokers, a\_port, a\_topic, a\_partition);  
        if (metadata == null) {  
            goToSleep = true;  
        } else if (metadata.leader() == null) {  
            goToSleep = true;  
        } else if (a\_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {  
            // first time through if the leader hasn't changed give  
            // ZooKeeper a second to recover  
            // second time, assume the broker did recover before failover,  
            // or it was a non-Broker issue  
            //  
            goToSleep = true;  
        } else {  
            return metadata.leader().host();  
        }  
        if (goToSleep) {  
                Thread.sleep(1000);  
        }  
    }  
    System.out.println("Unable to find new leader after Broker failure. Exiting");  
    throw new Exception("Unable to find new leader after Broker failure. Exiting");  
}

private PartitionMetadata findLeader(List<String> a\_seedBrokers, int a\_port, String a\_topic, int a\_partition) {  
    PartitionMetadata returnMetaData = null;  
    loop:  
    for (String seed : a\_seedBrokers) {  
        SimpleConsumer consumer = null;  
        try {  
            consumer = new SimpleConsumer(seed, a\_port, 100000, 64 \* 1024, "leaderLookup");  
            List<String> topics = Collections.singletonList(a\_topic);  
            TopicMetadataRequest req = new TopicMetadataRequest(topics);  
            kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

            List<TopicMetadata> metaData = resp.topicsMetadata();  
            for (TopicMetadata item : metaData) {  
                for (PartitionMetadata part : item.partitionsMetadata()) {  
                    if (part.partitionId() == a\_partition) {  
                        returnMetaData = part;  
                        break loop;  
                    }  
                }  
            }  
        } catch (Exception e) {  
            System.out.println("Error communicating with Broker \[" + seed + "\] to find Leader for \[" + a\_topic + ", " + a\_partition + "\] Reason: " + e);  
        } finally {  
            if (consumer != null)  
                consumer.close();  
        }  
    }  
    if (returnMetaData != null) {  
        m\_replicaBrokers.clear();  
        for (BrokerEndPoint replica : returnMetaData.replicas()) {  
            m\_replicaBrokers.add(replica.host());  
        }  
    }  
    return returnMetaData;  
}  

}