Spark练习之创建RDD(集合、本地文件),RDD持久化及RDD持久化策略
阅读原文时间:2023年07月08日阅读:1

Spark练习之创建RDD(集合、本地文件)

一、创建RDD

二、并行化集合创建RDD

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;

import java.util.Arrays;
import java.util.List;

/**
 * 并行创建RDD
 * eg:计算1-10的累加和
 */
public class JavaParallelizeCollection {
    public static void main(String[] args) {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("JavaParallelizeCollection")
                .setMaster("local");

        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        //要通过并行化集合的方式创建RDD,那么就调用SparkContext以及其子类的parallelize()方法
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> integerJavaRDD = sc.parallelize(numbers);

        //执行reduce算子操作
        //相当于,先进行1+2=3;然后在用3+3=6,然后在用6+4=10,以此类推
        int sum = integerJavaRDD.reduce(new Function2<Integer, Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer num1, Integer num2) throws Exception {
                return num1 + num2;
            }
        });

        //输出累加和
        System.out.println("1到10的累加和为:" + sum);

        //关闭JavaSparkContext
        sc.close();

    }
}


import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;

object ScalaParallelizeCollection {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("JavaParallelizeCollection")
      .setMaster("local")

    val sc = new SparkContext(conf)

    val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

    val numberRDD = sc.parallelize(numbers, 5)

    val sum = numberRDD.reduce(_ + _)

    println("1到10的累加和:" + sum)

  }
}

三、使用本地文件和HDFS创建RDD

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

/**
 * 使用本地文件创建RDD
 * eg:统计文本文件字数
 */
public class LocalFile {
    public static void main(String[] args) {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("LocalFile")
                .setMaster("local");

        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        //使用SparkContext以及其子类的textFile()方法,针对本地文件创建RDD
        JavaRDD<String> lines = sc.textFile("C://Users//xxx//Desktop//spark.txt");

        //统计文本文件内的字数
        JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() {
            private static final long servialVersionUID = 1L;

            @Override
            public Integer call(String s) throws Exception {
                return s.length();
            }
        });

        int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() {
            private static final long servialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        System.out.println("文件总字数是:" + count);

        //关闭JavaSparkContext
        sc.close();

    }

}



import org.apache.spark.{SparkConf, SparkContext}

/**
  * 使用本地文件创建RDD
  * eg:统计文本文件字数
  */
object ScalaLocalFile {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("ScalaLocalFile")
      .setMaster("local")

    val sc = new SparkContext(conf)

    val lines = sc.textFile("C://Users//xxx//Desktop//spark.txt", 1)

    val count = lines.map { line => line.length() }.reduce(_ + _)

    println("统计文本文件字数:" + count)
  }

}

四、RDD持久化原理

五、不使用RDD持久化的问题的原理

六、RDD持久化工作的原理

七、RDD持久化策略

八、如何选择RDD持久化策略

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器