Spark入门之idea编写Scala脚本
阅读原文时间:2023年07月10日阅读:3

一、安装Scala插件

1、File->Settings

2、Plugins->Msrketplace->搜索Scala并安装

(或者自己下载合适的scala版本,教程:自己给idea下载Scala插件 - 我试试这个昵称好使不 - 博客园 (cnblogs.com)

3、重启idea

二、新建Scala项目

1、新建Maven项目File->new->Project

2、pom.xml


http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0

<groupId>org.example</groupId>  
<artifactId>hello\_spark</artifactId>  
<version>1.0-SNAPSHOT</version>

<repositories>  
    <repository>  
        <id>aliyun</id>  
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>  
    </repository>  
    <repository>  
        <id>apache</id>  
        <url>https://repository.apache.org/content/repositories/snapshots/</url>  
    </repository>  
    <repository>  
        <id>cloudera</id>  
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    </repository>  
</repositories>  
<properties>  
    <encoding>UTF-8</encoding>  
    <maven.compiler.source>1.8</maven.compiler.source>  
    <maven.compiler.target>1.8</maven.compiler.target>  
    <scala.version>2.12.11</scala.version>  
    <spark.version>3.0.1</spark.version>  
    <hadoop.version>2.7.5</hadoop.version>  
</properties>  
<dependencies>  
    <!--依赖Scala语言-->  
    <dependency>  
        <groupId>org.scala-lang</groupId>  
        <artifactId>scala-library</artifactId>  
        <version>${scala.version}</version>  
    </dependency>

    <!--SparkCore依赖-->  
    <dependency>  
        <groupId>org.apache.spark</groupId>  
        <artifactId>spark-core\_2.12</artifactId>  
        <version>${spark.version}</version>  
    </dependency>

    <!-- spark-streaming-->  
    <dependency>  
        <groupId>org.apache.spark</groupId>  
        <artifactId>spark-streaming\_2.12</artifactId>  
        <version>${spark.version}</version>  
    </dependency>

    <!--spark-streaming+Kafka依赖-->  
    <dependency>  
        <groupId>org.apache.spark</groupId>  
        <artifactId>spark-streaming-kafka-0-10\_2.12</artifactId>  
        <version>${spark.version}</version>  
    </dependency>

    <!--SparkSQL依赖-->  
    <dependency>  
        <groupId>org.apache.spark</groupId>  
        <artifactId>spark-sql\_2.12</artifactId>  
        <version>${spark.version}</version>  
    </dependency>

    <!--SparkSQL+ Hive依赖-->  
    <dependency>  
        <groupId>org.apache.spark</groupId>  
        <artifactId>spark-hive\_2.12</artifactId>  
        <version>${spark.version}</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.spark</groupId>  
        <artifactId>spark-hive-thriftserver\_2.12</artifactId>  
        <version>${spark.version}</version>  
    </dependency>

    <!--StructuredStreaming+Kafka依赖-->  
    <dependency>  
        <groupId>org.apache.spark</groupId>  
        <artifactId>spark-sql-kafka-0-10\_2.12</artifactId>  
        <version>${spark.version}</version>  
    </dependency>

    <!-- SparkMlLib机器学习模块,里面有ALS推荐算法-->  
    <dependency>  
        <groupId>org.apache.spark</groupId>  
        <artifactId>spark-mllib\_2.12</artifactId>  
        <version>${spark.version}</version>  
    </dependency>

    <dependency>  
        <groupId>org.apache.hadoop</groupId>  
        <artifactId>hadoop-client</artifactId>  
        <version>2.7.5</version>  
    </dependency>

    <dependency>  
        <groupId>com.hankcs</groupId>  
        <artifactId>hanlp</artifactId>  
        <version>portable-1.7.7</version>  
    </dependency>

    <dependency>  
        <groupId>mysql</groupId>  
        <artifactId>mysql-connector-java</artifactId>  
        <version>5.1.38</version>  
    </dependency>

    <dependency>  
        <groupId>redis.clients</groupId>  
        <artifactId>jedis</artifactId>  
        <version>2.9.0</version>  
    </dependency>

    <dependency>  
        <groupId>com.alibaba</groupId>  
        <artifactId>fastjson</artifactId>  
        <version>1.2.47</version>  
    </dependency>

    <dependency>  
        <groupId>org.projectlombok</groupId>  
        <artifactId>lombok</artifactId>  
        <version>1.18.2</version>  
        <scope>provided</scope>  
    </dependency>  
</dependencies>

<build>  
    <sourceDirectory>src/main/scala</sourceDirectory>  
    <plugins>  
        <!-- 指定编译java的插件 -->  
        <plugin>  
            <groupId>org.apache.maven.plugins</groupId>  
            <artifactId>maven-compiler-plugin</artifactId>  
            <version>3.5.1</version>  
        </plugin>  
        <!-- 指定编译scala的插件 -->  
        <plugin>  
            <groupId>net.alchim31.maven</groupId>  
            <artifactId>scala-maven-plugin</artifactId>  
            <version>3.2.2</version>  
            <executions>  
                <execution>  
                    <goals>  
                        <goal>compile</goal>  
                        <goal>testCompile</goal>  
                    </goals>  
                    <configuration>  
                        <args>  
                            <arg>-dependencyfile</arg>  
                            <arg>${project.build.directory}/.scala\_dependencies</arg>  
                        </args>  
                    </configuration>  
                </execution>  
            </executions>  
        </plugin>  
        <plugin>  
            <groupId>org.apache.maven.plugins</groupId>  
            <artifactId>maven-surefire-plugin</artifactId>  
            <version>2.18.1</version>  
            <configuration>  
                <useFile>false</useFile>  
                <disableXmlReport>true</disableXmlReport>  
                <includes>  
                    <include>\*\*/\*Test.\*</include>  
                    <include>\*\*/\*Suite.\*</include>  
                </includes>  
            </configuration>  
        </plugin>  
        <plugin>  
            <groupId>org.apache.maven.plugins</groupId>  
            <artifactId>maven-shade-plugin</artifactId>  
            <version>2.3</version>  
            <executions>  
                <execution>  
                    <phase>package</phase>  
                    <goals>  
                        <goal>shade</goal>  
                    </goals>  
                    <configuration>  
                        <filters>  
                            <filter>  
                                <artifact>\*:\*</artifact>  
                                <excludes>  
                                    <exclude>META-INF/\*.SF</exclude>  
                                    <exclude>META-INF/\*.DSA</exclude>  
                                    <exclude>META-INF/\*.RSA</exclude>  
                                </excludes>  
                            </filter>  
                        </filters>  
                        <transformers>  
                            <transformer  
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">  
                                <mainClass></mainClass>  
                            </transformer>  
                        </transformers>  
                    </configuration>  
                </execution>  
            </executions>  
        </plugin>  
    </plugins>  
</build>

3、src like this(data可以忽视)

4、新建WordCound.scala

package org.example.hello

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* Author itcast
* Desc 演示Spark入门案例-WordCount
*/
object WordCount {
def main(args: Array[String]): Unit = {
if(args.length < 2){
println("请指定input和output")
System.exit(1)//非0表示非正常退出程序
}
//TODO 1.env/准备sc/SparkContext/Spark上下文执行环境
val conf: SparkConf = new SparkConf().setAppName("wc")//.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")

//TODO 2.source/读取数据  
//RDD:A Resilient Distributed Dataset (RDD):弹性分布式数据集,简单理解为分布式集合!使用起来和普通集合一样简单!  
//RDD\[就是一行行的数据\]  
val lines: RDD\[String\] = sc.textFile(args(0))//注意提交任务时需要指定input参数

//TODO 3.transformation/数据操作/转换  
//切割:RDD\[一个个的单词\]  
val words: RDD\[String\] = lines.flatMap(\_.split(" "))  
//记为1:RDD\[(单词, 1)\]  
val wordAndOnes: RDD\[(String, Int)\] = words.map((\_,1))  
//分组聚合:groupBy + mapValues(\_.map(\_.\_2).reduce(\_+\_)) ===>在Spark里面分组+聚合一步搞定:reduceByKey  
val result: RDD\[(String, Int)\] = wordAndOnes.reduceByKey(\_+\_)

//TODO 4.sink/输出  
//直接输出  
//result.foreach(println)  
//收集为本地集合再输出  
//println(result.collect().toBuffer)  
//输出到指定path(可以是文件/夹)  
//如果涉及到HDFS权限问题不能写入,需要执行:  
//hadoop fs -chmod -R 777  /  
//并添加如下代码  
System.setProperty("HADOOP\_USER\_NAME", "hadoop")  
result.repartition(1).saveAsTextFile(args(1))//注意提交任务时需要指定output参数

//为了便于查看Web-UI可以让程序睡一会  
//Thread.sleep(1000 \* 60)

//TODO 5.关闭资源  
sc.stop()  

}
}

三、打包并上传

在下面找到jar包输出路径

将jar包上传至虚拟机

四、虚拟机

1、新建words.txt

vim /data/words.txt

hello me you her
hello me you
hello me
hello

2、新建hdfs目录并上传words.txt

hadoop fs -mkdir -p /wordcount/input

hadoop fs -put /data/words.txt /wordcount/input/words.txt

3、提交任务

SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--num-executors 1 \
--class cn.itcast.hello.WordCount \
/data/wc.jar \
hdfs://node01:8020/wordcount/input/words.txt \
hdfs://node01:8020/wordcount/output47_3

4、查看任务进程

http://node01:8088

5、查看结果

http://node01:50070/explorer.html#/wordcount/output47_3

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章