博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark编程环境搭建及WordCount实例
阅读量:6614 次
发布时间:2019-06-24

本文共 11096 字,大约阅读时间需要 36 分钟。

 

 

基于Intellij IDEA搭建Spark开发环境搭建

 基于Intellij IDEA搭建Spark开发环境搭——参考文档

  ● 参考文档

  ● 操作步骤

·a)创建maven 项目

·b)引入依赖(Spark 依赖、打包插件等等)

 基于Intellij IDEA搭建Spark开发环境—maven vs sbt

  ● 哪个熟悉用哪个

  ● Maven也可以构建scala项目

 基于Intellij IDEA搭建Spark开发环境搭—maven构建scala项目

  ● 参考文档

  ● 操作步骤

  a)用maven构建scala项目(基于net.alchim31.maven:scala-archetype-simple)

 

 

  b)pom.xml引入依赖(spark依赖、打包插件等等)

  在pom.xml文件中的合适位置添加以下内容:

org.apache.spark
spark-core_2.11
2.2.0
provided
//设置作用域,不将所有依赖文件打包到最终的项目中
org.apache.maven.plugins
maven-shade-plugin
2.4.1
package
shade
false

  进行一次打包操作以测试是否工作正常。

  在Terminal中输入指令:

mvn clean package运行结果如下:D:\Code\JavaCode\sparkMaven>mvn clean package [INFO] Scanning for projects...[INFO][INFO] ---------------------< com.zimo.spark:scala-spark >---------------------[INFO] Building scala-spark 1.0-SNAPSHOT[INFO] --------------------------------[ jar ]---------------------------------[INFO][INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ scala-spark ---[INFO][INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ scala-spark ---[WARNING] Using platform encoding (GBK actually) to copy filtered resources, i.e. build is platform dependent![INFO] skip non existing resourceDirectory D:\Code\JavaCode\sparkMaven\src\main\resources[INFO][INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ scala-spark ---[INFO] No sources to compile[INFO][INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ scala-spark ---[WARNING] Using platform encoding (GBK actually) to copy filtered resources, i.e. build is platform dependent![INFO] skip non existing resourceDirectory D:\Code\JavaCode\sparkMaven\src\test\resources[INFO][INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ scala-spark ---[INFO] No sources to compile[INFO][INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ scala-spark ---[INFO] No tests to run.[INFO][INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ scala-spark ---[WARNING] JAR will be empty - no content was marked for inclusion![INFO] Building jar: D:\Code\JavaCode\sparkMaven\target\scala-spark-1.0-SNAPSHOT.jar[INFO][INFO] --- maven-shade-plugin:2.4.1:shade (default) @ scala-spark ---[INFO] Replacing original artifact with shaded artifact.[INFO] Replacing D:\Code\JavaCode\sparkMaven\target\scala-spark-1.0-SNAPSHOT.jar with D:\Code\JavaCode\sparkMaven\target\scala-spark-1.0-SNAPSHOT-shaded.jar[INFO] ------------------------------------------------------------------------[INFO] BUILD SUCCESS[INFO] ------------------------------------------------------------------------[INFO] Total time: 9.675 s[INFO] Finished at: 2018-09-11T15:33:53+08:00[INFO] ------------------------------------------------------------------------

 

  出现了BUILD SUCCESS,表明一切正常。下面给大家演示以下Scala编程的大致流程,以及在该框架下同样用Java进行实现应该如何操作。

 

Scala编程实现WordCount

 

  注意:此处必须选为Object,否则没有main方法!

  然后输入以下代码,执行打包操作

def main(args: Array[String]): Unit = {  println("hello spark")}

 

  

  完成后可以看到项目目录下多出来了一个target目录。这就是使用Scala编程的一个大致流程,下面我们来写一个WordCount程序。(后面也会有Java编程的版本提供给大家)

  首先在集群中创建以下目录和测试文件:

[hadoop@masternode ~]$ cd /home/hadoop/[hadoop@masternode ~]$ lltotal 68drwxr-xr-x. 9 hadoop hadoop  4096 Sep 10 22:15 appdrwxrwxr-x. 6 hadoop hadoop  4096 Aug 17 10:42 datadrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Desktopdrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Documentsdrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Downloadsdrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Musicdrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Picturesdrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Publicdrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Templatesdrwxrwxr-x. 3 hadoop hadoop  4096 Apr 18 10:11 toolsdrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Videos-rw-rw-r--. 1 hadoop hadoop 20876 Apr 20 18:03 zookeeper.out[hadoop@masternode ~]$ mkdir testSpark/[hadoop@masternode ~]$ lltotal 72drwxr-xr-x. 9 hadoop hadoop  4096 Sep 10 22:15 appdrwxrwxr-x. 6 hadoop hadoop  4096 Aug 17 10:42 datadrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Desktopdrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Documentsdrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Downloadsdrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Musicdrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Picturesdrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Publicdrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Templatesdrwxrwxr-x. 2 hadoop hadoop  4096 Sep 12 10:23 testSparkdrwxrwxr-x. 3 hadoop hadoop  4096 Apr 18 10:11 toolsdrwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Videos-rw-rw-r--. 1 hadoop hadoop 20876 Apr 20 18:03 zookeeper.out[hadoop@masternode ~]$ cd testSpark/[hadoop@masternode testSpark]$ vi word.txtapache hadoop spark scalaapache hadoop spark scalaapache hadoop spark scalaapache hadoop spark scala

  WordCount.scala代码如下:(如果右键New下面没有“Scala Class“”选项,请检查IDEA是否添加了scala插件

package com.zimo.sparkimport org.apache.spark.{SparkConf, SparkContext}/**  * Created by Zimo on 2018/9/11  */object MyWordCount {  def main(args: Array[String]): Unit = {    //参数检查    if (args.length < 2) {      System.err.println("Usage: myWordCount  ")      System.exit(1)    }    //获取参数    val input = args(0)    val output = args(1)    //创建Scala版本的SparkContext    val conf = new SparkConf().setAppName("myWordCount")    val sc = new SparkContext(conf)    //读取数据    val lines = sc.textFile(input)    //进行相关计算    lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)     //保存结果    sc.stop()  }}

  从代码可以看出scala的优势就是简洁,但是可读性较差。所以,学习可以与后面的java代码进行对比。

  然后打包

 

 

  打包完成后把上图中的文件上传到spark集群上去,然后执行。

[hadoop@masternode testSpark]$ rz [hadoop@masternode testSpark]$ lltotal 8-rw-r--r--. 1 hadoop hadoop 1936 Sep 12 10:59 scala-spark-1.0-SNAPSHOT.jar-rw-rw-r--. 1 hadoop hadoop  104 Sep 12 10:26 word.txt[hadoop@masternode testSpark]$ cd ../app/spark-2.2.0/[hadoop@masternode spark-2.2.0]$ cd bin/[hadoop@masternode bin]$ lltotal 92-rwxr-xr-x. 1 hadoop hadoop 1089 Jul  1  2017 beeline-rw-r--r--. 1 hadoop hadoop  899 Jul  1  2017 beeline.cmd-rwxr-xr-x. 1 hadoop hadoop 1933 Jul  1  2017 find-spark-home-rw-r--r--. 1 hadoop hadoop 1909 Jul  1  2017 load-spark-env.cmd-rw-r--r--. 1 hadoop hadoop 2133 Jul  1  2017 load-spark-env.sh-rwxr-xr-x. 1 hadoop hadoop 2989 Jul  1  2017 pyspark-rw-r--r--. 1 hadoop hadoop 1493 Jul  1  2017 pyspark2.cmd-rw-r--r--. 1 hadoop hadoop 1002 Jul  1  2017 pyspark.cmd-rwxr-xr-x. 1 hadoop hadoop 1030 Jul  1  2017 run-example-rw-r--r--. 1 hadoop hadoop  988 Jul  1  2017 run-example.cmd-rwxr-xr-x. 1 hadoop hadoop 3196 Jul  1  2017 spark-class-rw-r--r--. 1 hadoop hadoop 2467 Jul  1  2017 spark-class2.cmd-rw-r--r--. 1 hadoop hadoop 1012 Jul  1  2017 spark-class.cmd-rwxr-xr-x. 1 hadoop hadoop 1039 Jul  1  2017 sparkR-rw-r--r--. 1 hadoop hadoop 1014 Jul  1  2017 sparkR2.cmd-rw-r--r--. 1 hadoop hadoop 1000 Jul  1  2017 sparkR.cmd-rwxr-xr-x. 1 hadoop hadoop 3017 Jul  1  2017 spark-shell-rw-r--r--. 1 hadoop hadoop 1530 Jul  1  2017 spark-shell2.cmd-rw-r--r--. 1 hadoop hadoop 1010 Jul  1  2017 spark-shell.cmd-rwxr-xr-x. 1 hadoop hadoop 1065 Jul  1  2017 spark-sql-rwxr-xr-x. 1 hadoop hadoop 1040 Jul  1  2017 spark-submit-rw-r--r--. 1 hadoop hadoop 1128 Jul  1  2017 spark-submit2.cmd-rw-r--r--. 1 hadoop hadoop 1012 Jul  1  2017 spark-submit.cmd
[hadoop@masternode testSpark]$ ./spark-submit --class com.zimo.spark.MyWordCount ~/testSpark/scala-spark-1.0-SNAPSHOT.jar ~/testSpark/word.txt ~/testSpark/

 

  运行结果如下图所示:

 

 

  以上操作是把结果直接打印出来,下面我们尝试一下将结果保存到文本当中去。修改以下代码:

//进行相关计算//lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)val resultRDD = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)//保存结果resultRDD.saveAsTextFile(output)

  再次执行:

./spark-submit --class com.zimo.spark.MyWordCount ~/testSpark/scala-spark-1.0-SNAPSHOT.jar ~/testSpark/word.txt ~/testSpark/result //输出目录一定要为不存在的目录!

 

  结果如下:

[hadoop@masternode testSpark]$ lltotal 5460drwxrwxr-x. 2 hadoop hadoop    4096 Sep 12 16:02 result-rw-r--r--. 1 hadoop hadoop 5582827 Sep 12 16:00 scala-spark-1.0-SNAPSHOT.jar-rw-rw-r--. 1 hadoop hadoop     104 Sep 12 15:52 word.txt[hadoop@masternode testSpark]$ cd result/[hadoop@masternode result]$ lltotal 4-rw-r--r--. 1 hadoop hadoop 42 Sep 12 16:02 part-00000-rw-r--r--. 1 hadoop hadoop  0 Sep 12 16:02 _SUCCESS[hadoop@masternode result]$ cat part-00000(scala,4)(spark,4)(hadoop,4)(apache,4)

 

 

Java编程实现WordCount

  在同样目录新建一个java目录,并设置为”Sources Root”。

 

 

  单元测试目录”test”同样需要建一个java文件夹。

 

 

  同理设置为”Test Sources Root”。然后分别再创建resources目录(用于存放配置文件),并分别设置为“Resources Root”和“Test Resources Root”。

 

 

  最后,创建一个“com.zimo.spark”包,并在下面新建一个MyJavaWordCount.Class类(如果右键New下面没有“Java Class”选项请参看博文下的详细讲解),其中的代码为如下:

package com.zimo.spark;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;/** * Created by Zimo on 2018/9/12 */public class MyJavaWordCount {    public static void main(String[] args) {        //参数检查        if (args.length < 2) {            System.err.println("Usage: MyJavaWordCount  ");            System.exit(1);        }        //获取参数        String input = args[0];        String output = args[1];        //创建Java版本的SparkContext        SparkConf conf = new SparkConf().setAppName("MyJavaWordCount");        JavaSparkContext sc = new JavaSparkContext(conf);        //读取数据        JavaRDD
inputRDD = sc.textFile(input); //进行相关计算 JavaRDD
words = inputRDD.flatMap(new FlatMapFunction
() { @Override public Iterator
call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairRDD
result = words.mapToPair(new PairFunction
() { @Override public Tuple2
call(String word) throws Exception { return new Tuple2
(word, 1); } }).reduceByKey(new Function2
() { @Override public Integer call(Integer x, Integer y) throws Exception { return x+y; } }); //保存结果 result.saveAsTextFile(output); //关闭sc sc.stop(); }}

  注意:此处要做一点点修改。注释掉pom.xml文件下的此处内容

 

  此处是默认Source ROOT的路径,所以打包时就只能打包Scala下的代码,而我们新建的Java目录则不会被打包,注释之后则会以我们之前的目录配置为主。 

  然后就可以执行打包和集群上的运行操作了。运行和Scala编程一模一样,我在这里就不赘述了,大家参见上面即可!只是需要注意一点:output目录必须为不存在的目录,请记得每次运行前进行修改!

 

以上就是博主为大家介绍的这一板块的主要内容,这都是博主自己的学习过程,希望能给大家带来一定的指导作用,有用的还望大家点个支持,如果对你没用也望包涵,有错误烦请指出。如有期待可关注博主以第一时间获取更新哦,谢谢!

 

转载于:https://www.cnblogs.com/zimo-jing/p/9636235.html

你可能感兴趣的文章
重建索引提高SQL Server性能<转>
查看>>
大公司的流量变现
查看>>
Linux进程管理(2)
查看>>
将eclipse中项目的Text File Encoding设置成为GBK
查看>>
对control file的学习笔记
查看>>
JavaScript与有限状态机
查看>>
Sharepoint 2010 以及Office 2010 RTM
查看>>
php优化
查看>>
jQuery之each方法
查看>>
RequireJS源码初探
查看>>
【hibernate】 hibernate的主键策略
查看>>
单表代替密码原理及算法实现
查看>>
如何让VS检查函数和类Comment的添加情况
查看>>
Linq案例
查看>>
23.3. 身份证校验
查看>>
web开发未解之谜
查看>>
制作播放视频关灯效果
查看>>
【POI】解析xls报错:java.util.zip.ZipException: error in opening zip file
查看>>
我的第一个Node web程序
查看>>
【IntelliJ Idea】idea下hibernate反向生成工具,根据数据表生成实体
查看>>