大数据实战手册-开发篇之IO
阅读原文时间:2023年08月29日阅读:4
  • 2.4 sparkContext IO:读
  • 2.4.1 textFile
 # Load a text file and convert each line to a Row.
 lines = sc.textFile("examples/src/main/resources/people.txt")
  • 2.4.2 hadoopFile
  • 2.4.3 newAPIHadoopFile
parquet_rdd = sc.newAPIHadoopFile(
        path,
        'org.apache.parquet.avro.AvroParquetInputFormat',
        'java.lang.Void',
        'org.apache.avro.generic.IndexedRecord',
        valueConverter='org.apache.spark.examples.pythonconverters.IndexedRecordToJavaConverter')
  • 2.4.4 pickleFile

    备注:Load an RDD previously saved using RDD.saveAsPickleFile method.

  • 2.4.5 parallelize

  • 2.4.6 broadcast

  • 2.5 sparkSql IO

  • 2.5.1 DataFrameReader

**parquet**
df = spark.read.parquet("examples/src/main/resources/users.parquet")

**JSON**
peopleDF = spark.read.json("examples/src/main/resources/people.json")

**ORC**
df = spark.read.orc("examples/src/main/resources/users.orc")

**JDBC支持的db**
jdbcDF = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql:dbserver") \
        .option("dbtable", "schema.tablename") \
        .option("user", "username") \
        .option("password", "password") \
        .load()
  • 2.5.2 DataFrameWriter
**parquet**
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

**JSON**
 (df.write
        .partitionBy("favorite_color")
        .bucketBy(42, "name")
        .saveAsTable("people_partitioned_bucketed"))

**ORC**
(df.write.format("orc")
        .option("orc.bloom.filter.columns", "favorite_color")
        .option("orc.dictionary.key.threshold", "1.0")
        .save("users_with_options.orc"))

**JDBC支持的db**
  jdbcDF.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql:dbserver") \
        .option("dbtable", "schema.tablename") \
        .option("user", "username") \
        .option("password", "password") \
        .save()