Spark SQL 自定义函数类型
阅读原文时间:2023年07月08日阅读:6

Spark SQL 自定义函数类型

一、spark读取数据

前段时间一直在研究GeoMesa下的Spark JTS,Spark JTS支持用户自定义函数,然后有一份数据,读取文件:

package com.geomesa.spark.SparkCore

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{ArrayType, DataTypes, StringType, StructField, StructType}

object test {

  def main(args: Array[String]): Unit = {
    import org.locationtech.geomesa.spark.jts._
    //spark
    val spark: SparkSession = {
      SparkSession.builder()
        .appName("test")
        .master("local[*]")
        .getOrCreate()
        //需注入spark.jts._包
        .withJTS
    }

    val dataFile = this.getClass.getClassLoader.getResource("gsmc.txt").getPath
    val df = spark.read
      .schema(schema)
      .json(dataFile)
    //.show(5, false)
    //.printSchema()

  }
}

二、自定义函数结构

然后打印出来的数据结构如下,通过spark sql的自定义函数构建这个结构的数据,主要构建features下的相关数据结构,之前耗时N久,各种不会构建以及构建错误,后,皇天不负有心人,搞就是了,搞出来了。

    root
   |-- crs: struct (nullable = true)
   |    |-- properties: struct (nullable = true)
   |    |    |-- name: string (nullable = true)
   |    |-- type: string (nullable = true)
   |-- features: array (nullable = true)
   |    |-- element: struct (containsNull = true)
   |    |    |-- geometry: struct (nullable = true)
   |    |    |    |-- coordinates: array (nullable = true)
   |    |    |    |    |-- element: array (containsNull = true)
   |    |    |    |    |    |-- element: array (containsNull = true)
   |    |    |    |    |    |    |-- element: double (containsNull = true)
   |    |    |    |-- type: string (nullable = true)
   |    |    |-- geometry_name: string (nullable = true)
   |    |    |-- id: string (nullable = true)

自定义格式如下:

    val schema = StructType(Array(
      StructField("crs", StringType),
      StructField("features", ArrayType(
        StructType(Array(StructField("geometry",
          StructType(Array(StructField("coordinates",
            ArrayType(DataTypes.createArrayType(ArrayType((DataTypes.DoubleType)))))
          )))))))
    ))

经过printSchema()方法测试,结构如上面的features结构一模一样,nice。

三、附上长长的各种pom

<properties>
        <geospark.version>1.2.0</geospark.version>
        <geotools.version>14.1</geotools.version>
        <spark.version>2.3.1</spark.version>
        <encoding>UTF-8</encoding>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.datasyslab</groupId>
            <artifactId>geospark</artifactId>
            <version>${geospark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-geometry</artifactId>
            <version>20.0</version>
        </dependency>
        <dependency>
            <groupId>com.vividsolutions</groupId>
            <artifactId>jts</artifactId>
            <version>1.13</version>
        </dependency>
        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20180813</version>
        </dependency>
        <dependency>
            <groupId>com.esri.geometry</groupId>
            <artifactId>esri-geometry-api</artifactId>
            <version>2.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-geojson</artifactId>
            <version>${geotools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-api</artifactId>
            <version>${geotools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-referencing</artifactId>
            <version>${geotools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.locationtech.geomesa</groupId>
            <artifactId>geomesa-spark-jts_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-epsg-hsql</artifactId>
            <version>${geotools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.locationtech.jts.io</groupId>
            <artifactId>jts-io-common</artifactId>
            <version>1.16.0</version>
        </dependency>
        <dependency>
            <groupId>org.locationtech.jts</groupId>
            <artifactId>jts-core</artifactId>
            <version>1.16.0</version>
        </dependency>
        <dependency>
            <groupId>org.locationtech.spatial4j</groupId>
            <artifactId>spatial4j</artifactId>
            <version>0.6</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

        <!--redis-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

    </dependencies>