Spark | WordCount with idea
在idea中建立Scala工程有三种方式:普通idea工程,SBT和Maven
用的比较广泛的应该是SBT,打算先从本地运行调试spark代码开始,然后将jar包上传到服务器,再到用idea直接连接服务器调试。
环境配置
- 系统:Win10 64位
- Scala: 2.11.8
- Java: 1.8.0
- hadoop: 2.6.3
- hadoop windows环境下插件 winutils.exe ,hadoop.dll.zip ,密码8rcy
不同版本的Java对Scala的要求不一样,本来我的电脑是java 1.9,需要的是scala 2.12.6,但是为了和服务器的环境一致我就又装了一个java 1.8。
Java, scala和hadoop都要配置环境变量。环境变量的配置可参见这里: java ; scala ; hadoop
普通方式
1)打开idea并安装scala插件(File--Setting--Plugins),重启。
2)新建project,选择scala,IDEA,选择java和scala的路径。
3)在项目根目录放入待统计的txt文件test.txt。
4)src目录右键,新建package命名为spark.wc,在package右键,新建scala class,选择object,命名为WordCount。
5)WordCount代码。
package spark.wc
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("wordcount")
val sc = new SparkContext(conf)
val rdd = sc.textFile("test.txt")
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
//这里只去除了空格,默认的输入文本是经过处理统一大小写并去除了标点的
rdd.foreach(println)
rdd.repartition(1).saveAsTextFile("result")
}
}
6)引入spark依赖包。这里我直接把服务器spark-2.0.0-bin/jars目录里的所有jar包下载到了本地并导入工程。File--Prioject Structure--libraries, 选择下载jar包所在的路径。成功引入后jar包会出现在左侧External Libraries 里面。
7)运行,如果成功,会在consel中和result目录下看到输出。result目录文件中的part-00000为保存结果的文本文件。
8)修改代码中路径
package spark.wc
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("wordcount")
val sc = new SparkContext(conf)
val rdd = sc.textFile("/user/student/fy/test.txt")
.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
rdd.repartition(1).saveAsTextFile("/user/student/fy/word/result")
}
}
9)去掉服务器spark中已经存在的jar包依赖并打包(File--Project Structure--Artifacts,然后Build--Artifacts)。成功后在out/artifacts目录下出现jar包。
10)将jar包上传至服务器,将待统计文件test.txt上传至HDFS。在服务器上配置spark环境运行jar包。
export YARN_CONF_DIR=/opt/cloudera/parcels/CDH/lib/spark
spark-submit \
--master yarn-client \
--class test \
--num-executors 1 \
--driver-memory 1g \
--executor-memory 8g \
--executor-cores 4 \
--conf spark.network.timeout=500 \
--conf spark.akka.timeout=500 \
--conf spark.akka.frameSize=100 \
/home/student/fy/WordCount.jar
11)运行成功,在保存目录下获得word/result文件夹,里面的part-00000文件即为结果。如有需求,将其从HDFS保存到本地即可。
hdfs dfs -get /user/student/fy/word/result /home/student/fy/word
SBT方式
SBT是专门用于为scala打包的工具,在idea中建立sbt工程会根据版本号自动引入所需要的包,第一次引入时间较长。参考自
使用Intellij Idea编写Spark应用程序(Scala+SBT)
。
1)注意scala插件的版本。之前我使用的的2017.2.13版本存在bug,在新建SBT工程后不能成功导入所需的包。最好检查更新将idea整体升级(Help--Check for Updates),我这里插件更新后的版本是2018.1.10,经检测没有问题。
2)新建SBT工程,选择scala和sbt版本号。
3)修改build.sbt,选择Enable auto-import,让idea以后每次遇到build.sbt更新后自动导入依赖包。
name := "SBTTest"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
4)等待“dump project structure from sbt”的操作全部执行结束,在src/main/scala项目目录下新建WordCount.scala文件,粘贴代码执行即可。