GeoMesa Spark
阅读原文时间:2023年07月09日阅读:3

GeoMesa Spark

geomesa目前支持spark版本2.2.x、2.3.x或2.4.x。

geomesa spark允许使用存储在geomesa中的数据、其他geotools数据存储或geomesa转换器库可读的文件在apache spark上执行作业。该库允许创建spark RDD和数据帧,将spark RDD和数据帧写入geomesa accumulo和其他地理工具数据存储,并使用kryo对简单功能进行序列化。

1、GeoMesa Spark最底层为geomesa-spark-jts模块

2、geomesa-spark-core模块是spark core的扩展,支持支持geotools的Query,生成系列化好的simplefeature类型的rdd

3、geomesa-spark-sql模块允许使用sql方式进行查询,会将sql语句转换为Query对象进行查询

一、Spark JTS

spark JTS模块提供了一组用户定义函数(UDF)和用户定义类型(UDT),这些函数允许在spark中执行SQL查询,从而对地理空间数据类型执行地理空间操作。

geomesa spark sql支持基于spark sql模块中存在的数据集/数据帧API,以提供地理空间功能。这包括自定义地理空间数据类型和函数、从地理工具数据存储创建数据帧的能力,以及改进SQL查询性能的优化。

此功能位于geomesa spark/geomesa spark jts模块中:

<dependency>
  <groupId>org.locationtech.geomesa</groupId>
  <artifactId>geomesa-spark-jts_2.11</artifactId>
  // version, etc.
</dependency>

以下是使用用户定义类型加载数据帧的scala示例:

import org.locationtech.jts.geom._import org.apache.spark.sql.types._import org.locationtech.geomesa.spark.jts._
import spark.implicits._
val schema = StructType(Array(
  StructField("name",StringType, nullable=false),
  StructField("pointText", StringType, nullable=false),
  StructField("polygonText", StringType, nullable=false),
  StructField("latitude", DoubleType, nullable=false),
  StructField("longitude", DoubleType, nullable=false)))
val dataFile = this.getClass.getClassLoader.getResource("jts-example.csv").getPathval df = spark.read
  .schema(schema)
  .option("sep", "-")
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
  .csv(dataFile)
val alteredDF = df
  .withColumn("polygon", st_polygonFromText($"polygonText"))
  .withColumn("point", st_makePoint($"latitude", $"longitude"))

请注意,最初的模式没有userdefinedType,但是在将用户定义的函数应用到相应的列之后,我们将得到一个具有地理空间列类型的数据框架。

还可以从地理空间对象列表中构建数据帧:

import spark.implicits._

val point = new GeometryFactory().createPoint(new Coordinate(3.4, 5.6))
val df = Seq(point).toDF("point")

若要启用此行为,请导入org.locationtech.geomesa.spark.jts.,创建一个sparksession并调用.withjts。这将为这些操作注册UDF和UDT以及一些催化剂优化。或者,可以调用initjts(sqlcontext)。

import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.SQLContextimport org.locationtech.geomesa.spark.jts._
val spark: SparkSession = SparkSession.builder() // ... initialize spark session
spark.withJTS

Spple JTS模块需要代表几何对象的几个类(如OGC OpenGISS简单特征访问通用体系结构规范和Java拓扑结构套件实现的),并将它们注册为用户定义类型(UDT)在SparkSQL中。例如,geometry类注册为geometryudt。已注册以下类型:

几何图形 点udt 线形UDT 多边形 多点式

多重删除 多元论 几何集合

spark jts还实现了OGC OpenGIS Simple Feature Access SQL选项规范中描述的作为sparkSQL用户定义函数(UDF)的函数子集。这些功能包括创建几何图形、访问几何图形的属性、将几何图形对象铸造到更具体的子类、以其他格式输出几何图形、测量几何图形之间的空间关系以及处理几何图形。

例如,下面的SQL查询:

select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)

使用两个UDF(st_contains和st_makebbox)查找芝加哥数据框中的行,其中列geom包含在指定的边界框中。

UDF还公开用于数据框架API,这意味着上述示例也可以通过以下代码实现:

import org.locationtech.geomesa.spark.jts._
import spark.implicits. _
chicagoDF.where(st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), $"geom"))

spark jts模块还提供了将数据帧导出到geojson字符串的方法。这允许在许多支持geojson输入的前端映射库(如传单或开放层)中快速可视化数据。

要转换数据帧,请导入隐式转换并调用togeojson方法。

import org.locationtech.geomesa.spark.jts.util.GeoJSONExtensions._
val df : DataFrame = // Some data frame
val geojsonDf = df.toGeoJSON

如果只给出模式,转换器可以推断哪个字段保存几何图形,但是在多个几何字段的情况下,它默认为第一个这样的字段。通过在模式中提供所需几何体的索引(从0开始),可以覆盖此行为。例如,如果所需的几何图形是模式的第三个字段,则为df.togeojson(2)。

如果结果可以存储在内存中,那么可以将其收集到驱动程序中并写入文件。如果不是这样,每个执行器都可以写入像hdfs这样的分布式文件系统。

val geoJsonString = geojsonDF.collect.mkString("[",",","]")

NOTE:为了实现这一点,数据帧应该有一个几何字段,这意味着它的模式应该有一个结构字段,它是本模块中提供的JTS几何类型之一。但是,如果某些行的几何图形为空,则可以接受。在这种情况下,空值将作为geojson中几何体的值写入。

通过以下命令,可以独立于geomesa构建和使用此模块:

$ mvn install -pl geomesa-spark/geomesa-spark-jts

二、Spark Core

geomesa spark core用于直接处理geomesa和其他地理空间数据存储中的特征RDD。

以下是通过地理空间查询对geomesa数据存储创建RDD的完整scala示例:

// DataStore params to a hypothetical GeoMesa Accumulo tableval dsParams = Map(
  "accumulo.instance.id"   -> "instance",
  "accumulo.zookeepers"    -> "zoo1,zoo2,zoo3",
  "accumulo.user"          -> "user",
  "accumulo.password"      -> "*****",
  "accumulo.catalog"       -> "geomesa_catalog",
  "geomesa.security.auths" -> "USER,ADMIN")
// set SparkContext
val conf = new SparkConf().setMaster("local[*]").setAppName("testSpark")
val sc = SparkContext.getOrCreate(conf)

// create RDD with a geospatial query using GeoMesa functions
val spatialRDDProvider = GeoMesaSpark(dsParams)
val filter = ECQL.toFilter("CONTAINS(POLYGON((0 0, 0 90, 90 90, 90 0, 0 0)), geom)")
val query = new Query("chicago", filter)
val resultRDD = spatialRDDProvider.rdd(new Configuration, sc, dsParams, query)
resultRDD.collect
// Array[org.opengis.feature.simple.SimpleFeature] = Array(
//    ScalaSimpleFeature:4, ScalaSimpleFeature:5, ScalaSimpleFeature:6,
//    ScalaSimpleFeature:7, ScalaSimpleFeature:9)

geomesa spark core通过定义一个名为spaceialrddprovider的接口,提供了一个用于访问spark中地理空间数据的API。此接口的不同实现连接到不同的输入源。这些不同的提供者在下面的用法中有更详细的描述。

geomesa为几个jar提供了依赖项,以简化Spark类路径的设置。要在spark中使用这些库,可以通过–jars选项将适当的阴影jar(例如)传递给spark submit命令:

--jars file://path/to/geomesa-accumulo-spark-runtime_2.11-$VERSION.jar

或者通过笔记本服务器(如Jupyter)中的适当机制传递给Spark(请参见部署geomesa spark with Jupyter笔记本)或Zeppelin。

阴影JAR还应提供转换器RDD提供程序和geotools RDD提供程序所需的依赖项,因此这些JAR也可以简单地添加到–jar中(尽管在后一种情况下,可能需要额外的JAR来实现访问的geotools数据存储)。

要在集群节点之间序列化SimpleFeatures的RDD,spark必须配置geomesa spark core中提供的kryo序列化注册器。

Note:在本地模式下运行spark时不需要配置kryo序列化,因为作业将在单个JVM中执行。

将这两个条目添加到$spark\u home/conf/spark-defaults.conf(或将它们作为–conf参数传递给spark submit):

spark.serializer        org.apache.spark.serializer.KryoSerializerspark.kryo.registrator
org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator

或者,可以在用于创建SparkContext的SparkConf对象中设置这些参数:

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[GeoMesaSparkKryoRegistrator].getName)

在笔记本服务器中使用Spark时,需要禁用SparkContext的自动创建。

设置配置选项后,几何空间RDDProvider实现创建的RDD将正确注册到序列化程序提供程序。

geomesa spark core提供的功能的主要入口点是geomesaspark对象:

val spatialRDDProvider = GeoMesaSpark(params)

当类路径中包含适当的JAR时,geomesaspark通过SPI加载一个spaceardProvider实现。geomesaspark返回的实现是根据作为参数传递的参数选择的,如下面的scala代码所示:

// parameters to pass to the SpatialRDDProvider implementation
val params = Map(
  "param1" -> "foo",
  "param2" -> "bar")
// GeoTools Query; may be used to filter results retrieved from the data store
val query = new Query("foo")
// val query = new Query("foo", ECQL.toFilter("name like 'A%'"))
// get the RDD, using the SparkContext configured as above
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)

要保存功能,请使用save()方法:

GeoMesaSpark(params).save(rdd, params, "gdelt")

请注意,某些提供程序可能是只读的。

三、空间RDD提供程序

AccumuloSpatialRDD提供程序是Accumulo数据存储的空间RDD提供程序。核心代码在geomesa accumulo spark模块中,并且geomesa accumulo spark运行时模块中提供带依赖项(包含执行所需的所有依赖项)的带阴影JAR。

此提供程序可以读写geomesa accumulodatastore。配置参数与传递给datastorefinder.getdatastore()的参数相同。有关详细信息,请参阅Accumulo数据存储参数。

要在geomesa中访问的功能类型作为传递给rdd()方法的查询的类型名称传递。例如,要从geomesa accumulo表中加载gdelt类型特征的RDD:

val params = Map(
  "accumulo.instance.id" -> "mycloud",
  "accumulo.user"        -> "user",
  "accumulo.password"    -> "password",
  "accumulo.zookeepers"  -> "zoo1,zoo2,zoo3",
  "accumulo.catalog"     -> "geomesa")
val query = new Query("gdelt")
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)

hbaspatialrddprovider是用于hbase数据存储的空间RDD提供程序。核心代码在geomesa-hbase-spark模块中,并且geomesa-hbase-spark运行时模块中提供带依赖项(包含执行所需的所有依赖项)的带阴影JAR。

此提供程序可以读取和写入geomesa hbaedatastore。配置参数与传递给datastorefinder.getdatastore()的参数相同。有关详细信息,请参阅HBase数据存储参数。

Note:

连接到HBase通常需要在Spark类路径上提供hbase-site.xml文件。这可以通过指定–jars来完成。例如:

$ spark-shell --jars file:///opt/geomesa/dist/spark/geomesa-hbase-spark-runtime_2.11-${VERSION}.jar,file:///usr/lib/hbase/conf/hbase-site.xml

或者,您可以在数据存储参数映射中指定zookee。但是,这可能不适用于每个HBase设置。

要在geomesa中访问的功能类型作为传递给rdd()方法的查询的类型名称传递。例如,要从geomesa hbase表中加载gdelt类型的功能的RDD:

val params = Map("hbase.zookeepers" -> "zoo1,zoo2,zoo3", "hbase.catalog" -> "geomesa")
val query = new Query("gdelt")
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)

filesystemrddprovider是用于geomesa文件系统数据存储的空间RDD提供程序。核心代码在geomesa fs spark模块中,并且geomesa fs spark运行时模块中提供带依赖项(其中包含执行所需的所有依赖项)的带阴影JAR。

此提供程序可以读取和写入geomesa文件系统数据存储区。配置参数与传递给datastorefinder.getdatastore()的参数相同。有关详细信息,请参阅文件系统数据存储参数。

要在geomesa中访问的功能类型作为传递给rdd()方法的查询的类型名称传递。例如,要从S3存储桶加载gdelt类型功能的RDD:

val params = Map("fs.path" -> "s3a://mybucket/geomesa/datastore")
val query = new Query("gdelt")
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)

变矩器空间RDD提供器由几何A火花变矩器模块提供。

ConvertersPatialRDDProvider从一个或多个数据文件中读取几何转换库可读取格式的功能,包括定界和固定宽度的文本、AVRO、JSON和XML文件。它采用以下配置参数:

geomesa.converter-转换器定义为类型安全配置字符串

geomesa.converter.inputs-输入文件路径,逗号分隔

geomeria.sft-simpleFeatureType,作为规范字符串、配置字符串或环境查找名称

geomesa.sft.name-(可选)simpleFeatureType的名称

考虑geomesa convert文档的示例用法部分中描述的示例数据。如果文件example.csv包含示例数据,example.conf包含转换器的类型安全配置文件,则可以使用以下scala代码将此数据加载到RDD中:

val exampleConf = ConfigFactory.load("example.conf").root().render()
val params = Map(
  "geomesa.converter"        -> exampleConf,
  "geomesa.converter.inputs" -> "example.csv",
  "geomesa.sft"              -> "phrase:String,dtg:Date,geom:Point:srid=4326",
  "geomesa.sft.name"         -> "example")
val query = new Query("example")
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)

也可以通过maven或sbt为公共数据源(gdelt、地名等)加载预打包的转换器。有关详细信息,请参阅预打包转换器定义。

警告:

ConvertSpatialRDDProvider是只读的,不支持向数据文件写入功能。

Geotoolsspatialrddprovider由Geometsa Spark Geotools模块提供。

geotoolspatialrddprovider生成并保存存储在通用geotools数据存储中的功能的RDD。传递的配置参数与传递给datastorefinder.getdatastore()以创建感兴趣的数据存储的配置参数相同,还需要一个名为“geotools”的布尔参数来指示SPI加载geotoolspatialrddprovider。例如,geotools contentdatastore教程中描述的csvdatastore采用一个名为“file”的参数。要将此数据存储与geomesa spark一起使用,请执行以下操作:

val params = Map(
  "geotools" -> "true",
  "file"     -> "locations.csv")
val query = new Query("locations")
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)

要在数据存储中访问的功能类型的名称将作为传递给rdd()方法的查询的类型名称传递。在csvdatastore的示例中,这是作为参数传递的文件名的基名称。

警告:

不要将geotools rdd提供程序与具有提供程序实现的geomesa数据存储一起使用。上面描述的提供程序提供了额外的优化以提高读写性能。

如果数据存储支持,请使用save()方法保存功能:

GeoMesaSpark(params).save(rdd, params, "locations")

四、SparkSQL

geomesa spark sql支持基于spark sql模块中存在的数据集/数据帧API,以提供地理空间功能。这包括自定义地理空间数据类型和函数、从地理工具数据存储创建数据帧的能力,以及改进SQL查询性能的优化。

geomesa spark sql代码由geomesa spark sql模块提供:

<dependency>
  <groupId>org.locationtech.geomesa</groupId>
  <artifactId>geomesa-spark-sql_2.11</artifactId>
  // version, etc.
</dependency>

以下是通过sparksql连接到geomesa accumulo的scala示例:

// DataStore params to a hypothetical GeoMesa Accumulo table
val dsParams = Map(
  "accumulo.instance.id"   -> "instance",
  "accumulo.zookeepers"    -> "zoo1,zoo2,zoo3",
  "accumulo.user"          -> "user",
  "accumulo.password"      -> "*****",
  "accumulo.catalog"       -> "geomesa_catalog",
  "geomesa.security.auths" -> "USER,ADMIN")
// Create SparkSession
val sparkSession = SparkSession.builder()
  .appName("testSpark")
  .config("spark.sql.crossJoin.enabled", "true")
  .master("local[*]")
  .getOrCreate()
// Create DataFrame using the "geomesa" format
val dataFrame = sparkSession.read
  .format("geomesa")
  .options(dsParams)
  .option("geomesa.feature", "chicago")
  .load()dataFrame.createOrReplaceTempView("chicago")
// Query against the "chicago" schema
val sqlQuery = "select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)"
val resultDataFrame = sparkSession.sql(sqlQuery)
resultDataFrame.show
/*+-------+------+-----------+--------------------+-----------------+
|__fid__|arrest|case_number|                 dtg|             geom|
+-------+------+-----------+--------------------+-----------------+
|      4|  true|          4|2016-01-04 00:00:...|POINT (76.5 38.5)|
|      5|  true|          5|2016-01-05 00:00:...|    POINT (77 38)|
|      6|  true|          6|2016-01-06 00:00:...|    POINT (78 39)|
|      7|  true|          7|2016-01-07 00:00:...|    POINT (20 20)|
|      9|  true|          9|2016-01-09 00:00:...|    POINT (50 50)|
+-------+------+-----------+--------------------+-----------------+*/

由于geomesa sparksql堆栈位于geomesa spark core模块的顶部,因此类路径中必须包含一个或多个spaceardprovider实现。有关设置Spark类路径的详细信息,请参见配置。

注意:

在大多数情况下,在使用sparksql时,不需要设置简单功能序列化中描述的kryo序列化。但是,使用geotools RDD提供程序时可能需要这样做。

如果要将多个数据帧连接在一起,则在创建SparkSession对象时需要添加spark.sql.crossjoin.enabled属性:

val spark = SparkSession.builder().
   // ...
   config("spark.sql.crossJoin.enabled", "true").
   // ...
   getOrCreate()

警告:

交叉连接可能非常、非常低效。注意确保连接的一组或两组数据非常小,并考虑使用broadcast()方法确保至少一个连接的数据帧在内存中。

要使用与特定功能类型对应的数据创建geomesa sparksql启用的数据帧,请执行以下操作:

// dsParams contains the parameters to pass to the data store
val dataFrame = sparkSession.read
  .format("geomesa")
  .options(dsParams)
  .option("geomesa.feature", typeName)
  .load()

具体来说,调用格式(“geomesa”)注册geomesa sparksql数据源,选项(“geomesa.feature”,typename)告诉geomesa使用名为typename的功能类型。这还注册在geomesa sparksql中实现的自定义用户定义类型和函数。

通过将数据帧注册为临时视图,可以在随后的SQL调用中访问此数据帧。例如:

dataFrame.createOrReplaceTempView("chicago")

可以通过别名“chicago”调用此数据帧:

val sqlQuery = "select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)"
val resultDataFrame = sparkSession.sql(sqlQuery)

也可以通过在Spark会话的sqlContext对象上调用sqltypes.init()手动注册用户定义的类型和函数:

SQLTypes.init(sparkSession.sqlContext)

还可以将Spark数据框写入几何表,其中包括:

dataFrame.write.format("geomesa").options(dsParams).option("geomesa.feature", "featureName").save()

这将自动将数据帧的基础RDD[行]转换为RDD[简单功能],并并行写入数据存储。要使其工作,功能类型featurename必须已经存在于数据存储中。

写回功能时,可以通过特殊的_uuu fid_uuuu列指定功能ID:

dataFrame
    .withColumn("__fid__", $"custom_fid")
    .write
    .format("geomesa")
    .options(dsParams)
    .option("geomesa.feature", "featureName")
    .save

GeMeSSa SPARKSQL模块采用几何对象表示的几个类(如OGC OpenGISS简单特征访问通用体系结构规范和Java拓扑结构套件实现的),并将其注册为用户定义类型(UDT)在SparkSQL中。例如,geometry类注册为geometryudt。在geomesa sparksql中,注册了以下类型:

几何图形

点udt

线形UDT

多边形

多点式

多重删除

多元论

几何集合

geomesa sparksql还实现了OGC OpenGIS Simple Feature Access SQL选项规范中描述的作为sparksql用户定义函数(UDF)的函数子集。这些功能包括创建几何图形、访问几何图形的属性、将几何图形对象铸造到更具体的子类、以其他格式输出几何图形、测量几何图形之间的空间关系以及处理几何图形。

例如,下面的SQL查询:

select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)

使用两个UDF(st_contains和st_makebbox)查找芝加哥数据框中的行,其中列geom包含在指定的边界框中。

如果您的数据足够小,可以放入执行器的内存中,那么您可以告诉geomesa sparksql在内存中持久化RDD,并利用cqengine作为内存索引数据存储。为此,请在创建数据帧时添加option(“cache”, “true”)。这将对每个属性(不包括FID和几何体)放置索引。要基于几何图形进行索引,请添加option(“indexGeom”, “true”)。对此关系的查询将自动命中缓存的RDD,并查询位于每个分区上的内存中数据存储,这可能会显著加快速度。

考虑到您对数据的一些了解,还可以通过应用初始查询来确保数据适合内存。这可以通过query来完成。例如,

option("query", "dtg AFTER 2016-12-31T23:59:59Z")

还可以通过对数据进行空间分区来实现额外的加速。添加option(“spatial”, “true”)将确保空间上彼此相邻的数据将放置在同一分区上。默认情况下,您的数据将被分区到一个NXN网格中,但总共有4个分区策略,每个分区策略都可以用option(“strategy”, strategyName)的名称指定。

相等-计算数据的边界并将其划分为大小相等的nxn网格,其中N = sqrt(numPartitions)

加权类似于相等,但确保沿每个轴的数据在每个网格单元中的比例相等。

像地球一样平等,但使用整个地球作为边界,而不是根据数据计算它们。

r tree-基于数据样本构造一个R树,并使用边界矩形的子集作为分区信封。

空间分区的优点有两个方面:

1)具有完全位于一个分区中的空间谓词的查询可以直接转到该分区,跳过扫描分区的开销,这些分区肯定不会包含所需的数据。

2)如果两个数据集按相同的方案进行分区,导致两个关系的分区信封相同,那么空间联接可以使用分区信封作为联接中的键。这大大减少了完成连接所需的比较数量。

其他数据帧选项允许更好地控制分区的创建方式。对于需要数据样本(加权和rtree)的策略,可以使用sampleSize和threshold乘数来控制决策过程中使用了多少基础数据,以及rtree信封中允许多少项。

其他有用的选项如下:

option(“partitions”, “n”)—指定基础RDD的分区数(覆盖默认并行度)

option(“bounds”, “POLYGON in WellKnownText”)—限制WEIGHTED和EQUAL策略使用的网格的边界。所有不在这些边界内的数据都将放置在单独的分区中。

option(“cover”, “true”)—由于只有相等和地球分割策略才能保证跨关系的分区信封是相同的,因此具有此选项集的数据帧将强制与其连接的数据帧的分区方案与其自身匹配。

五、sparkSQL函数

下面是由geomesa spark sql模块定义的空间sparksql用户定义函数的列表。

https://www.geomesa.org/documentation/user/spark/sparksql_functions.html

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章