Zookeeper+Kafka+Spark streaming单机整合开发
阅读原文时间:2023年07月09日阅读:2

环境准备:

ubuntu 开发环境:

jdk 1.8

scala:2.11.0

spark 2.0

zookeeper 3.4.6

kafka  2.12-0.10.2.0

开始整合:

1 zookeeper的安装,这里我使用的zookeeper版本为3.4.6

a, 下载zookeeper安装包zookeeper-3.4.6.tar.gz

  b, 解压安装文件到/usr/local/ 这是我的安装目录,具体可根据自己的情况而定:

sudo tar -zxvf zookeeper-3.4.6.tar.gz -C /usr/local

c, 进入/usr/local并重命名解压的文件名

  cd /usr/local  
  sudo mv zookeeper-3.4.6/ zookeeper  
  # 为当前用户加入授权  
  sudo chown -R hadoop ./zookeeper  # 我这里的用户名为hadoop,具体情况根据自己的情况写

d,进入zookeeper安装文件夹的配置文件所在文件夹conf

#创建存数据的文件
mkdir /home/hadoop/zookeeper
cd /zookeeper/conf
# 为zookeeper设置配置文件
cp -a zoo_sample.cfg zoo.cfg

修改配置文件

    vim zoo.cfg   

心跳时间,为了确保连接存在的,以毫秒为单位,最小超时时间为两个心跳时间

        tickTime=2000  
        # 多少个心跳时间内,允许其他server连接并初始化数据,如果ZooKeeper管理的数据较大,则应相应增大这个值  
        initLimit=10  
        #  leader 与 follower 之间发送消息,请求和应答时间长度。如果 follower 在设置的时间内不能与 leader 进行通信,那么此 follower 将被丢弃。这里应答的时间长度为5  
        syncLimit=5  
        # 用于存放内存数据库快照的文件夹,同时用于集群的myid文件也存在这个文件夹里,这里设为之前创建的文件夹  
        dataDir=/home/hadoop/zookeeper  
        # 服务的监听端口  
        clientPort=2181

2 kafka的安装以及配置:我使用的版本为2.12-0.10.2.0

    a,下载kafka安装包kafka_2.12-0.10.2.0.tgz

    b, 解压安装包至指定安装目录,这里我的安装目录是/usr/local/

  sudo tar -zxvf kafka\_2.12-0.10.2.0.tgz -C /usr/local/  
  cd /usr/local  
  sudo mv kafka\_2.12-0.10.2.0/ kafka  
  sudo chwon -R hadoop ./kafka  # 把当前用户授权于kafka文件

c, 修改配置文件

cd config  
# 创建kafka数据存放的地址  
mkdir /home/hadoop/kafka  
vim server.properties

  # 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况,这里由于是伪分布式,直接设为0即可  
  broker.id =0  
  #broker server服务端口  
  port =9092  
  # 配置zookeeper的集群的地址  
  zookeeper.connect=localhost:2181  
  # kafka数据的存放地址  
  log.dirs=/home/hadoop/kafka  
  # 其他的配置不用修改!!

3 为方便在以后的使用,添加环境变量到~/.bashrc中,并创建简单的启动shell

   a,配置环境变量

vim ~/.bashrc

 # 在文件的开头添加  
 # Kafka  
   export KAFKA\_HOME=/usr/local/kafka #这里是我的安装目录,实际情况,根据自己的安装目录来定  
   export PATH=${KAFKA\_HOME}/bin:$PATH  
   export PATH=${KAFKA\_HOME}/config:$PATH  
 # zookeeper  
  export ZOOKEEPER\_HOME=/usr/local/zookeeper  
  export PATH=$PATH:$ZOOKEEPER\_HOME/bin

b,创建简单的启动shell,启动zookeeper和kafka

vim Start-Server.sh
#!/bin/bash
# 启动 zookeeper
/usr/local/zookeeper/bin/zkServer.sh start
# 启动 kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
vim Stop-Server.sh
#!/bin/bash
# 关闭 kafka
/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties
# 由于可能关闭kafka时有延迟,暂定1毫秒再执行关闭zookeeper的命令
sleep 1
# 关闭 zookeeper
/usr/local/zookeeper/bin/zkServer.sh stop
# 为写的脚本授权
sudo chmod 777 Start-Server.sh
sudo chmod 777 Stop-Server.sh

4 streaming 和 kafka 的整合使用

   a,使用maven建立kafka所用到包的依赖

org.apache.spark spark-streaming-kafka-0-8_2.11 2.1.1

××Buid Path相关spark下面的jar包

   b,方式一 命令行方式

      创建一个topic test

  kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test  
  发送数据  
  kafka-topics.sh --list --zookeeper localhost:2181

方式二 创建scala程序Producer类,读取hdfs上的文件,并发送到kafka消息队列上

import java.util.Properties;
import java.lang.Runnable;
import java.net.URI;
import java.io.*;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
/**
* 向kafka上面发送数据
* 读取hdfs上的文件,并写入本地文件,再发送到kafka消息队列上
* */
class TestKafkaProducer implements Runnable{

/\*\*  
 \* 对于kafka producer的相关配置文件项  
 \*\*/  
public static Properties getProducerCnfig() {  
    Properties props = new Properties();  
    props.put("bootstrap.servers", "localhost:9092");  
    props.put("acks", "all");  
    props.put("retries", 0);  
    props.put("batch.size", 16384);  
    props.put("linger.ms", 1);  
    props.put("buffer.memory", 33554432);  
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
    return props;  
}

public void run(){  
        // 获取配置文件  
        Properties props = getProducerCnfig();  
        // 创建生产者  
        Producer<String, String> producer = new KafkaProducer(props);  
        try{  
            //读取保存的文件  
            FileInputStream fis=new FileInputStream("/home/hadoop/text.txt");  
            InputStreamReader isr= new InputStreamReader(fis, "UTF-8");  
            BufferedReader br = new BufferedReader(isr);  
            String line = "";  
            String\[\] arrs = null;  
            while ((line = br.readLine())!=null) {  
                producer.send(new ProducerRecord<String, String>("test", line));  
            }  
            br.close();  
            isr.close();  
            fis.close();  
        }catch (Exception e){e.printStackTrace();}  
}  

}
public class Tess{
public static void main(String[] args) throws IOException{
Tess.getFIle();
new Thread(new TestKafkaProducer()).start();
System.out.println("发送结束");
}
/**
* 获取hdfs上的文件,并保存制定文件夹下
* */
public static void getFIle() throws IOException{
String uri = "hdfs://localhost:9000/test/zookeeper.properties";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);

    InputStream in = null;  
    FileOutputStream fos = new FileOutputStream(new File("/home/hadoop/text.txt"));  
    try {  
        in = fs.open(new Path(uri));  
        in.skip(100);  
        IOUtils.copyBytes(in, fos, 4096, false);  
    }catch (Exception e){  
        e.printStackTrace();  
    }finally {  
        IOUtils.closeStream(in);  
        IOUtils.closeStream(fos);  
    }  
}  

}

**streaming使用打印kafka上的消息,注意在运行这个程序时,需要配置args的参数,第一个为brokers 第二个为topic,

    我这在单机下,配置的args为:localhost:9092 test**

package com.learn

import kafka.serializer.StringDecoder

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object KafkaAndPrintInSpark {

//判断设置的时输入参数,是否包含brokers 和 topic 至少参数的长度为2,即单机运行一个test的topic: broker=localhost:9092 topic=test
def main(args: Array[String]) {
if (args.length < 2) { System.err.println(s""" |Usage: DirectKafkaWordCount
| is a list of one or more Kafka brokers
| is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}

//将参数args读入到数组中  
val Array(brokers, topics) = args

// 用2秒批间隔创建上下文  
val sparkConf = new SparkConf().setMaster("local\[2\]").setAppName("DirectKafkaWordCount")  
val ssc = new StreamingContext(sparkConf, Seconds(2))

// 创建kafka流与brokers和topic  
val topicsSet = topics.split(",").toSet  
val kafkaParams = Map\[String, String\]("metadata.broker.list" -> brokers)  
val messages = KafkaUtils.createDirectStream\[String, String, StringDecoder, StringDecoder\](  
  ssc, kafkaParams, topicsSet)

//打印获取到的数据  
val lines = messages.map(\_.\_2)  
lines.print()

// 开始计算  
ssc.start()  
ssc.awaitTermination()  

}
}