Spark(二十一)【SparkSQL读取Kudu,写入Kafka】
阅读原文时间:2023年07月10日阅读:1

目录

SparkSQL读取Kudu,写出到Kafka

背景:通过spark SQL读kudu表,写入到kafka

参考:1.spark向kafka写入数据 2.通过Spark向Kafka写入数据

1. pom.xml 依赖

    <dependencies>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>6.0.6</version>
        </dependency>

        <!-- scalikejdbc_2.11 -->
        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc_2.11</artifactId>
            <version>2.5.0</version>
        </dependency>
        <!-- scalikejdbc-config_2.11 -->
        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc-config_2.11</artifactId>
            <version>2.5.0</version>
        </dependency>
        <dependency>
            <groupId>c3p0</groupId>
            <artifactId>c3p0</artifactId>
            <version>0.9.1.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-client -->
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.9.0</version>
        </dependency>
        <!--spark集成kudu-->
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-spark2_2.11</artifactId>
            <version>1.9.0</version>
        </dependency>

        <!--执行sql脚本-->
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>3.5.3</version>
        </dependency>
        <!--读取配置-->
        <dependency>
            <groupId>commons-configuration</groupId>
            <artifactId>commons-configuration</artifactId>
            <version>1.8</version>
        </dependency>

        <!--clickhouse-->
        <dependency>
            <groupId>cc.blynk.clickhouse</groupId>
            <artifactId>clickhouse4j</artifactId>
            <version>1.4.4</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.36</version>
        </dependency>

        <!--druid-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <!--kafka-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 这是个编译scala代码的 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- 这是个编译java代码的 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <!--注意这里使用jdk8,否则不能使用java8的lambda表达式和流API-->
                    <source>8</source>
                    <target>8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!--maven-assembly-plugin不能打包spring Framework框架的项目,
            可以使用maven-shade-plugin插件-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.5</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.tal.mysql2kudu.MysqlToKudu_v1</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2.将KafkaProducer利用lazy val的方式进行包装, 创建KafkaSink

package com.tal.spark

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))

  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

  def send(topic: String, partition: Int, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, partition, key, value))
}

object KafkaSink {

  import scala.collection.JavaConversions._

  def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)
      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        println("KafkaSink close producer")
        producer.close()
      }
      producer
    }
    new KafkaSink(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}

3.利用广播变量,将KafkaProducer广播到每一个executor

完整代码

package com.tal.spark

import java.util.Properties
import com.alibaba.fastjson.JSONObject
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}

/**
 * @description: TODO 读kudu写入kafka
 * @author: HaoWu
 * @create: 2021年06月30日
 */
object Kudu2Kafka {
  private val logger: Logger = LoggerFactory.getLogger(Kudu2Kafka.getClass)
  //kudu配置信息
  private val kuduMaster = "hadoop101:7051"
  //private val kuduTableName1 = "impala::dwddb.rt_dwd_rm_distribution_res_data_detail_new"
  private val kuduTableName = "person"

  //kafka 配置信息
  private val bootstrapServers = "hadoop102:9092"
  private val topic = "kudu_to_kafka_test"
  val schema = Array("id", "CompanyId", "WorkId", "Name", "Gender", "Photo")

  def main(args: Array[String]): Unit = {

    // 构建SparkSession
    /*        val spark: SparkSession = SparkSession
              .builder()
              .config("spark.default.parallelism", 200)
              .config("spark.shuffle.file.buffer", "128k")
              .config("spark.reducer.maxSizeInFlight", "96m")
              .config("spark.shuffle.memoryFraction", 0.3)
              .config("spark.storage.memoryFraction", 0.3)
              .enableHiveSupport()
              .getOrCreate()*/
    //1. 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .master("local[2]")
      .config("spark.sql.shuffle.partitions", "2")
      .config("spark.default.parallelism", 200)
      .config("spark.shuffle.file.buffer", "128k")
      .config("spark.reducer.maxSizeInFlight", "96m")
      .config("spark.shuffle.memoryFraction", 0.3)
      .config("spark.storage.memoryFraction", 0.3)
      .getOrCreate()

    logger.info("加载kudu数据~~")
    val time1 = System.currentTimeMillis()
    val result: DataFrame = getKuduData(spark, kuduMaster, kuduTableName)
    val time2 = System.currentTimeMillis()
    val time = time2 - time1
    logger.info("加载完成~~~:耗时:" + time + " ms!")

    logger.info("数据开始发送到kafka")
    write2Kafka(result, spark, 3,bootstrapServers,topic)
    logger.info("-----> 数据发送到kafka完成完成!!! <-------")

    spark.stop()
  }

  /**
   * TODO 获取 kudu DF
   *
   * @param spark
   * @param kuduMaster
   * @param kuduTableName
   */
  def getKuduData(spark: SparkSession, kuduMaster: String, kuduTableName: String): DataFrame = {
    import spark.implicits._
    // 1. 定义 map 集合,封装 kudu的master地址.表名
    val options = Map(
      "kudu.master" -> kuduMaster,
      "kudu.table" -> kuduTableName,
      // 200M
      "kudu.batchSize" -> "419430400",
      // 10G
      "kudu.splitSizeBytes" -> "10737418240",
      "kudu.keepAlivePeriodMs" -> "36000000",
      "kudu.scanRequestTimeoutMs" -> "36000000",
      "kudu.faultTolerantScan" -> "true",
      "kudu.scanLocality" -> "leader_only" // 设置, 否则可能出现scanner not found异常
    )
    // 2. 从Kudu表加载数据
    val kuduDF: DataFrame = spark.read
      .format("kudu")
      .options(options)
      .load()
    //打印
//    kuduDF.printSchema()
//    kuduDF.show(10, truncate = false)
    val tempView = "person"
    kuduDF.createOrReplaceTempView(tempView)
    val sql =
      s"""
         |select
         |    row_number() over(order by CompanyId) as id,
         |    CompanyId,
         |    WorkId,
         |    Name,
         |    Gender,
         |    Photo
         |from ${tempView}
         |""".stripMargin
    val result: DataFrame = spark.sql(sql)
    result
  }

  /**
   * TODO 写出 kafka
   *
   * @param result
   * @param spark
   * @param numPartitions
   */
  def write2Kafka(result: DataFrame, spark: SparkSession, numPartitions: Int,bootstrapServers:String,topic:String): Unit = {

    // 广播KafkaSink
    val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.setProperty("bootstrap.servers",bootstrapServers)
        p.setProperty("key.serializer", classOf[StringSerializer].getName)
        p.setProperty("value.serializer", classOf[StringSerializer].getName)
        p
      }
      logger.warn("kafka producer init done!")
      spark.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
    }

    //输出到kafka
    try {
      result.foreach(
       row => {
            val jsonObj = new JSONObject()
            schema.foreach(field => {
              val fieldKey = field
              val fieldValue = row.getAs[Any](fieldKey)
              jsonObj.put(fieldKey, fieldValue)
            })
            kafkaProducer.value.send(topic,jsonObj.toString)
          })
    } catch {
      case ex:Exception =>
        logger.error("写入kafka异常,异常为:"+ex)
        throw ex
    }
  }
}