Spark | WordCount with idea

在idea中建立Scala工程有三种方式:普通idea工程,SBT和Maven
用的比较广泛的应该是SBT,打算先从本地运行调试spark代码开始,然后将jar包上传到服务器,再到用idea直接连接服务器调试。

环境配置

不同版本的Java对Scala的要求不一样,本来我的电脑是java 1.9,需要的是scala 2.12.6,但是为了和服务器的环境一致我就又装了一个java 1.8。
Java, scala和hadoop都要配置环境变量。环境变量的配置可参见这里: javascalahadoop

普通方式

1)打开idea并安装scala插件(File--Setting--Plugins),重启。
Image
2)新建project,选择scala,IDEA,选择java和scala的路径。
Image
Image
3)在项目根目录放入待统计的txt文件test.txt。
Image
4)src目录右键,新建package命名为spark.wc,在package右键,新建scala class,选择object,命名为WordCount。
Image
Image
5)WordCount代码。

package spark.wc
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("wordcount")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("test.txt")
      .flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    //这里只去除了空格,默认的输入文本是经过处理统一大小写并去除了标点的
    rdd.foreach(println)                
    rdd.repartition(1).saveAsTextFile("result")
  }
}

6)引入spark依赖包。这里我直接把服务器spark-2.0.0-bin/jars目录里的所有jar包下载到了本地并导入工程。File--Prioject Structure--libraries, 选择下载jar包所在的路径。成功引入后jar包会出现在左侧External Libraries 里面。
Image
Image
7)运行,如果成功,会在consel中和result目录下看到输出。result目录文件中的part-00000为保存结果的文本文件。
Image
8)修改代码中路径

package spark.wc
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("wordcount")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("/user/student/fy/test.txt")
      .flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    rdd.repartition(1).saveAsTextFile("/user/student/fy/word/result")
  }
}

9)去掉服务器spark中已经存在的jar包依赖并打包(File--Project Structure--Artifacts,然后Build--Artifacts)。成功后在out/artifacts目录下出现jar包。
Image
Image
Image
Image
Image
10)将jar包上传至服务器,将待统计文件test.txt上传至HDFS。在服务器上配置spark环境运行jar包。

export YARN_CONF_DIR=/opt/cloudera/parcels/CDH/lib/spark
spark-submit \
--master yarn-client \
--class test \
--num-executors 1 \
--driver-memory 1g \
--executor-memory 8g \
--executor-cores 4 \
--conf spark.network.timeout=500 \
--conf spark.akka.timeout=500 \
--conf spark.akka.frameSize=100 \
/home/student/fy/WordCount.jar

11)运行成功,在保存目录下获得word/result文件夹,里面的part-00000文件即为结果。如有需求,将其从HDFS保存到本地即可。

hdfs dfs -get /user/student/fy/word/result /home/student/fy/word

SBT方式

SBT是专门用于为scala打包的工具,在idea中建立sbt工程会根据版本号自动引入所需要的包,第一次引入时间较长。参考自 使用Intellij Idea编写Spark应用程序(Scala+SBT)
1)注意scala插件的版本。之前我使用的的2017.2.13版本存在bug,在新建SBT工程后不能成功导入所需的包。最好检查更新将idea整体升级(Help--Check for Updates),我这里插件更新后的版本是2018.1.10,经检测没有问题。
Image
Image
2)新建SBT工程,选择scala和sbt版本号。
Image
Image
3)修改build.sbt,选择Enable auto-import,让idea以后每次遇到build.sbt更新后自动导入依赖包。

name := "SBTTest"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"

4)等待“dump project structure from sbt”的操作全部执行结束,在src/main/scala项目目录下新建WordCount.scala文件,粘贴代码执行即可。

Comments
Write a Comment