Spark 读Hive并写入HBase
阅读原文时间:2023年07月09日阅读:3
package com.grady

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

object SparkHiveToHbase {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sparkHiveToHbase")
    val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    val sc = spark.sparkContext
    val rdd: RDD[Student] = readDataFromHive(spark)
    writeDataToHbase(rdd, sc)

    spark.stop()
  }

  def readDataFromHive(spark: SparkSession): RDD[Student] = {
    val dataFrame = spark.sql("select * from jiang.student")
    dataFrame.rdd.map(r =>
      Student(r(0).toString.toInt, r(1).toString, r(2).toString, r(3).toString.toInt, r(4).toString)
    )
  }

  def writeDataToHbase(rdd: RDD[Student], sc: SparkContext): Unit = {
    val tablename = "jiang:student"
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum","10.82.232.64")
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
    hbaseConf.set("zookeeper.znode.parent", "/hbase")
    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)

    val jobConf = new JobConf(hbaseConf)
    jobConf.setOutputFormat(classOf[TableOutputFormat])

    val hbaseRDD = rdd.map(student => {
      val put = new Put(Bytes.toBytes("hive:" + student.id))
      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(student.name))
      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(student.age.toString))
      (new ImmutableBytesWritable, put)
    })
    hbaseRDD.saveAsHadoopDataset(jobConf)
  }
}

case class Student(id: Int, name: String, sex: String, age: Int, department: String)

执行: spark-submit --master local[2] --num-executors 10 --class com.grady.SparkHiveToHbase /app/data/appdeploy/usehive1-1.0-SNAPSHOT.jar

日志:

hbase(main):011:0> scan 'jiang:student'
ROW                          COLUMN+CELL
 1                           column=cf:age, timestamp=2022-02-08T16:27:01.290, value=15
 1                           column=cf:name, timestamp=2022-02-08T16:27:01.290, value=jack
 2                           column=cf:age, timestamp=2022-02-08T16:27:01.290, value=16
 2                           column=cf:name, timestamp=2022-02-08T16:27:01.290, value=Lily
 3                           column=cf:age, timestamp=2022-02-08T16:27:01.290, value=16
 3                           column=cf:name, timestamp=2022-02-08T16:27:01.290, value=mike
 hive:1                      column=cf:age, timestamp=2022-02-08T17:24:24.477, value=10
 hive:1                      column=cf:name, timestamp=2022-02-08T17:24:24.477, value=xiaojiang
 hive:2                      column=cf:age, timestamp=2022-02-08T17:24:24.477, value=10
 hive:2                      column=cf:name, timestamp=2022-02-08T17:24:24.477, value=xiaochen