目录
背景:通过spark SQL读kudu表,写入到kafka
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>
<!-- scalikejdbc_2.11 -->
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc_2.11</artifactId>
<version>2.5.0</version>
</dependency>
<!-- scalikejdbc-config_2.11 -->
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc-config_2.11</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-client -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.9.0</version>
</dependency>
<!--spark集成kudu-->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<!--执行sql脚本-->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.3</version>
</dependency>
<!--读取配置-->
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.8</version>
</dependency>
<!--clickhouse-->
<dependency>
<groupId>cc.blynk.clickhouse</groupId>
<artifactId>clickhouse4j</artifactId>
<version>1.4.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.36</version>
</dependency>
<!--druid-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 这是个编译scala代码的 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 这是个编译java代码的 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<!--注意这里使用jdk8,否则不能使用java8的lambda表达式和流API-->
<source>8</source>
<target>8</target>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!--maven-assembly-plugin不能打包spring Framework框架的项目,
可以使用maven-shade-plugin插件-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<archive>
<manifest>
<mainClass>com.tal.mysql2kudu.MysqlToKudu_v1</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
package com.tal.spark
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
def send(topic: String, partition: Int, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, partition, key, value))
}
object KafkaSink {
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
println("KafkaSink close producer")
producer.close()
}
producer
}
new KafkaSink(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}
完整代码
package com.tal.spark
import java.util.Properties
import com.alibaba.fastjson.JSONObject
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
/**
* @description: TODO 读kudu写入kafka
* @author: HaoWu
* @create: 2021年06月30日
*/
object Kudu2Kafka {
private val logger: Logger = LoggerFactory.getLogger(Kudu2Kafka.getClass)
//kudu配置信息
private val kuduMaster = "hadoop101:7051"
//private val kuduTableName1 = "impala::dwddb.rt_dwd_rm_distribution_res_data_detail_new"
private val kuduTableName = "person"
//kafka 配置信息
private val bootstrapServers = "hadoop102:9092"
private val topic = "kudu_to_kafka_test"
val schema = Array("id", "CompanyId", "WorkId", "Name", "Gender", "Photo")
def main(args: Array[String]): Unit = {
// 构建SparkSession
/* val spark: SparkSession = SparkSession
.builder()
.config("spark.default.parallelism", 200)
.config("spark.shuffle.file.buffer", "128k")
.config("spark.reducer.maxSizeInFlight", "96m")
.config("spark.shuffle.memoryFraction", 0.3)
.config("spark.storage.memoryFraction", 0.3)
.enableHiveSupport()
.getOrCreate()*/
//1. 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2")
.config("spark.default.parallelism", 200)
.config("spark.shuffle.file.buffer", "128k")
.config("spark.reducer.maxSizeInFlight", "96m")
.config("spark.shuffle.memoryFraction", 0.3)
.config("spark.storage.memoryFraction", 0.3)
.getOrCreate()
logger.info("加载kudu数据~~")
val time1 = System.currentTimeMillis()
val result: DataFrame = getKuduData(spark, kuduMaster, kuduTableName)
val time2 = System.currentTimeMillis()
val time = time2 - time1
logger.info("加载完成~~~:耗时:" + time + " ms!")
logger.info("数据开始发送到kafka")
write2Kafka(result, spark, 3,bootstrapServers,topic)
logger.info("-----> 数据发送到kafka完成完成!!! <-------")
spark.stop()
}
/**
* TODO 获取 kudu DF
*
* @param spark
* @param kuduMaster
* @param kuduTableName
*/
def getKuduData(spark: SparkSession, kuduMaster: String, kuduTableName: String): DataFrame = {
import spark.implicits._
// 1. 定义 map 集合,封装 kudu的master地址.表名
val options = Map(
"kudu.master" -> kuduMaster,
"kudu.table" -> kuduTableName,
// 200M
"kudu.batchSize" -> "419430400",
// 10G
"kudu.splitSizeBytes" -> "10737418240",
"kudu.keepAlivePeriodMs" -> "36000000",
"kudu.scanRequestTimeoutMs" -> "36000000",
"kudu.faultTolerantScan" -> "true",
"kudu.scanLocality" -> "leader_only" // 设置, 否则可能出现scanner not found异常
)
// 2. 从Kudu表加载数据
val kuduDF: DataFrame = spark.read
.format("kudu")
.options(options)
.load()
//打印
// kuduDF.printSchema()
// kuduDF.show(10, truncate = false)
val tempView = "person"
kuduDF.createOrReplaceTempView(tempView)
val sql =
s"""
|select
| row_number() over(order by CompanyId) as id,
| CompanyId,
| WorkId,
| Name,
| Gender,
| Photo
|from ${tempView}
|""".stripMargin
val result: DataFrame = spark.sql(sql)
result
}
/**
* TODO 写出 kafka
*
* @param result
* @param spark
* @param numPartitions
*/
def write2Kafka(result: DataFrame, spark: SparkSession, numPartitions: Int,bootstrapServers:String,topic:String): Unit = {
// 广播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers",bootstrapServers)
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
logger.warn("kafka producer init done!")
spark.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
//输出到kafka
try {
result.foreach(
row => {
val jsonObj = new JSONObject()
schema.foreach(field => {
val fieldKey = field
val fieldValue = row.getAs[Any](fieldKey)
jsonObj.put(fieldKey, fieldValue)
})
kafkaProducer.value.send(topic,jsonObj.toString)
})
} catch {
case ex:Exception =>
logger.error("写入kafka异常,异常为:"+ex)
throw ex
}
}
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章