『 Spark 』mac下Spark的安装与使用

每次接触一个新的知识之前我都抱有恐惧之心,因为总认为自己没有接触到的知识都很高大上,比如上篇介绍到的Hadoop的安装与使用与本篇要介绍的Spark,其实在自己真正琢磨以后才发现本以为高大上的知识其实也不过如此。

由于Spark是最新火起来的处理大数据的框架,国内教程资源少之甚少,所以本篇文章是本人在看了Spark官网的快速入门教程后总结下来的经验,由于Spark同Hadoop一样可以运行在多种模式下,而本人又比较穷只有一台电脑,所以本篇文章为大家介绍如何在mac系统的本地模式下安装Spark以及安装后如何用Spark来进行交互式分析。

本文结构:前部分介绍Spark的一点点(详情介绍请自行google)基础概念以及安装过程,后部分通过一个demo让大家快速学会使用Spark基本Api。

1.Spark的运行模式

在正式安装Spark之前,先给大家介绍下Spark可以在哪几种模式下运行。同上篇Hadoop的安装与使用中介绍的Hadoop可以运行在其3种模式中的任意一种模式之上,Spark也可以运行在多种模式之上,主要有以下4种运行模式:

  • 1.local: 本地单进程模式,用于本地开发测试Spark代码。
  • 2.standalone:分布式集群模式,Master-Worker架构,Master负责调度,Worker负责具体Task的执行。
  • 3.on yarn/mesos:运行在yarn/mesos等资源管理框架之上,yarn/mesos提供资源管理,spark提供计算调度,并可与其他计算框架(如MapReduce/MPI/Storm)共同运行在同一个集群之上。
  • 4.on cloud(EC2): 运行在AWS的EC2之上

由于博主比较穷,所以下面为大家介绍本地模式下Spark的安装与使用。

2.Spark的安装

2.1准备工作

第一步:安装Java JDK 1.7及以上版本,并配置好环境变量。本电脑安装的jdk是1.7.0_79版本的。

第二步:安装Hadoop。本电脑安装的Hadoop是2.7.3版本的。

疑惑:上篇文章说到可以不学Hadoop直接学习Spark,那为什么还要安装Hadoop?亲,我的意思是不用学习Hadoop的相关知识例如它的API啥的,但是没说不用先搭建Hadoop的环境呀!
合理解释:Spark会用到HDFS与YARN,因此请先安装Hadoop,关于Hadoop的安装请参考我的上篇博文mac下Hadoop的安装与使用,在此就不再复述。

第三步:安装Scala 2.9.3以上版本。这里介绍下Scala在mac下的安装与环境变量的配置。点击链接进入scala官方网站的下载页,下载2.11.8版本(第一次操作的时候我下载了最新版2.12.1,后来测试spark-shell命令时发现最新版本的scala与1.7版本的jdk不兼容,所以后来换成了2.11.8版本)的Scala:

点击图上下载链接会自动将scala下载到Dowmloads目录下,文件名为:scala-2.12.1.tgz,还有一种下载方法就是直接在命令行使用homebrew命令(作为一个Linux开发人员我建议使用这种方式)进行下载:brew install scala,该命令会自动帮你把scala下载到/usr/local目录下。使用在官网点击链接下载的方式的话,我们也要将该scala文件加到/usr/local目录下,你可以直接拷贝过去,当然作为一个Linux开发人员你可以直接使用一条命令完成将该压缩包进行解压移动/usr/local/目录下:sudo tar -zxf ~/downloads/scala-2.12.1.tgz -C /usr/local/,然后使用命令cd /usr/local进入到该目录下,由于解压后的文件名为:scala-2.12.1,所以为了之后配置的方便我们使用命令:sudo mv ./scala-2.12.1 ./scala将文件名修改为scala。因为该目录属于管理员级别的目录所以如果当前用户不是管理员的话应该在命令前面使用sudo关键字表示使用管理员权限。

这样scala的安装便完成,但是还要配置scala的环境变量,使用命令:sudo vim /etc/profile打开系统中配置环境变量的文件,在里面添加如下内容:

1
2
export SCALA_HOME=/usr/local/scala
export PATH=$PATH:$SCALA_HOME/bin

然后:wq!保存并退出该文件,输入命令使该文件的内容立刻生效:source /etc/profile,接下来在(根目录下)命令行输入:scala并敲击回车,看到控制台打印如下信息说明我们的scala安装并成功配置了环境变量:

图中信息即表明我们使用的scala版本为2.11.8,jdk版本为1.7.0_79。

使用命令行快捷键control+c或者:quit退出scala shell环境(网上教程有说使用exit命令可以退出scala shell环境,我试了但貌似不行)。

疑惑:既然Spark提供了Scala、Python、Java三种程序设计语言的API,那么我直接用java不就好了,为什么还要下载Scala呢?是因为等会我们会使用Spark shell连接到Spark引擎进行交互式数据分析,而Spark shell只支持Scala和Python两种语言。Java不支持交互式的Shell,因此这一功能暂未在Java语言中实现(当然你也可以不使用shell编程,直接在IDE中用java编程语言连接到Spark引擎进行交互式数据分析也是可以的)。所以建议大家还是老老实实在电脑上面下载好scala并配置好环境变量,反正也占不了多大空间啊,而且万一哪天用到这东西了呢?所以下面我都是采用的scala支持的shell来配置的Spark,之后我也会使用scala运行spark-shell进行交互式数据分析的一个小示例带大家快速入门。

准备好如上环境后,接下来就可以进行Spark的安装与相关配置操作了。

2.2安装Spark并配置

接下来才是正题,进入Apache Spark官方网站进行Spark的下载,看到如下页面:

第2条你要是选择的是Hadoop2.7的话,你要保证你之前安装的Hadoop版本也是2.7版本。选择第4条的下载链接即可(当然你也可以直接用Homebrew命令进行下载),系统会将下好的文件放在Dowmloads文件目录下,文件名为:spark-2.0.2-bin-hadoop2.7.tgz,同scala的安装方法一样,我们使用命令:sudo tar -zxf ~/Dowmloads/spark-2.0.2-bin-hadoop2.7.tgz -C /usr/local/直接将该压缩包解压并移动到/usr/local/目录下,然后我们cd /usr/local进入到/usr/local目录下,使用命令更改该目录下的spark文件名:sudo mv ./spark-2.0.2-bin-hadoop2.7 ./spark将文件名改为spark

经过上述步骤从官网下载到Spark的文件,这样我们便完成了Spark的安装,但是Spark也是要进行相应的环境变量配置的,所以接下来我们进行Spark环境变量的配置。

使用命令:sudo vim /etc/profile,在文件中加入Spark的环境变量:

1
2
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin

然后我们进入到Spark目录的conf配置文件中:cd /usr/local/spark/conf,执行命令:cp spark-env.sh.template spark-env.sh将spark-env.sh.template拷贝一份,然后打开拷贝后的spark-env.sh文件:vim spark-env.sh,在里面加入如下内容:

1
2
3
4
5
export SCALA_HOME=/usr/local/scala

export SPARK_MASTER_IP=localhost

export SPARK_WORKER_MEMORY=4g

这样我们便完成了Spark环境变量的配置,接下来测试测试一下Spark,在根目录(因为我们配置了spark环境变量,所以可以直接在根目录)下输入命令:spark-shell,看到控制台输出如下信息:

恭喜你,尽情享受Spark吧。

3.安装过程出现的问题分析

1.运行spark-shell命令时控制台出现:

1
..我忘了是啥报错了.connection out.中间报错信息是这个..

的错误,说明没有配置SSH,配置SSH请参考我上篇文章中Hadoop安装的配置过程。

2.运行命令:scala时控制台出现:

的错误信息,表示scala没有成功安装,或者安装的scala与jdk不兼容,所以这里我建议你们就按本教程的2.11.8scala版本与1.7jdk版本来操作吧。这些坑我都试过了,所以才能为你们总结经验。(大哭脸)

3.其他错误,有以下原因,你们一定要一一进行检查:

  • 1.关于JDK:JDK版本不对,所以我建议大家用1.7;或者是JDK版本正确但是没有成功配置它的环境变量,我配置时更改了两个文件的环境变量:一个是/etc/profile目录下的,一个是.bash_profile文件中的,配置环境变量信息如下:

    1
    2
    export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home
    export PATH=$JAVA_HOME/bin:$PATH
  • 2.关于scala:Scala版本不对,所以我建议大家用2.11.8;或者是没有成功配置scala的环境变量,配置环境变量按照文中介绍的即可。

  • 3.Hadoop版本与Spark版本不兼容:所以大家在Spark官网下载Spark的时候一定要注意下载Spark时选择的第二条信息的Hadoop版本要与电脑上面已经安装的Hadoop一致才行。

4.快速入门Spark基础Api

这里我介绍两种使用Spark基础Api的方式,一种是在spark-shell中进行简单的测试,一种是在开发工具IDEA中进行代码的编写来教大家快速学习Spark基础API。

4.1使用spark-shell完成单词统计功能

由于spark-shell只支持scala和python两种语言的编写,不支持Java,所以我在spark-shell中通过scala的语法来进行简单测试。

在配置好Spark环境变量之后,我们打开命令行,直接在当前用户目录下输入命令spark-shell进入scala编写环境(当然前提是你首先使用命令start-all.sh命令开启了Spark):

我们从 /usr/local/spark/README.md 文件新建一个 RDD,代码如下(本文出现的 Spark 交互式命令代码中,第一行为代码及其解释,第二行及以后是控制台返回的结果):

1
2
scala> val textFile = sc.textFile("file:///usr/local/spark/README.md")
>textFile: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/README.md MapPartitionsRDD[1] at textFile at <console>:24

代码中通过 file:// 前缀或者不加 file:// 前缀表示指定读取本地文件。如果你这里传入的路径写的是HDFS上的文件路径,例如hdfs://远程主机名:Hadoop端口号我/文件名代表你要是读取的是 HDFS 中的文件,你需要先上传文件到 HDFS 中(至于如何上传,后面的demo中我们会进行讲解),否则会有org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/README.md的错误。这里我们就以读取本地文件进行讲解。

RDDs 支持两种类型的操作:1.actions: 在数据集上运行计算后返回值。2.transformations: 转换, 从现有数据集上创建一个新的数据集。

使用上述命令创建好的RDD对象,下面我们就来通过该对象演示 count() 和 first() 操作:

1
2
3
4
5
textFile.count()  // RDD 中的 item 数量,对于文本文件,就是总行数
>res0: Long = 95

textFile.first() // RDD 中的第一个 item,对于文本文件,就是第一行内容
>res1: String = # Apache Spark

接着演示 transformation,通过 filter transformation 来返回一个新的 RDD,代码如下:

1
2
3
4
5
val linesWithSpark = textFile.filter(line => line.contains("Spark"))   // 筛选出包含 Spark 的行
>linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:26

linesWithSpark.count() // 统计行数
>res4: Long = 17

上述我们完成了RDD的简单计算,而RDD 的 actions 和 transformations 其实可用在更复杂的计算中,例如通过如下代码可以找到包含单词最多的那一行内容共有几个单词:

1
2
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
>res1: Int = 22

代码首先将每一行内容 map 为一个整数,这将创建一个新的 RDD,并在这个 RDD 中执行 reduce 操作,找到最大的数。map()、reduce() 中的参数是 Scala 的函数字面量(function literals,也称为闭包 closures),并且可以使用语言特征或 Scala/Java 的库。例如,通过使用 Math.max() 函数(需要导入 Java 的 Math 库),可以使上述代码更容易理解:

1
2
3
4
import java.lang.Math //先导入Math函数

textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
>res6: Int = 14

Hadoop MapReduce 是常见的数据流模式,在 Spark 中同样可以实现(下面这个例子也就是 WordCount):

1
2
3
4
5
val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)   // 实现单词统计
>wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:29

wordCounts.collect() // 输出单词统计结果
>res7: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), (Because,1), (The,1)...)

上述我们通过spark-shell完成单词的统计简单对我们spark的基础Api进行了熟悉,采用的是scala语言,由于我是一个java开发人员,所以接下来就在开发工具IDEA中通过编写Java代码来实现HDFS中某个路径下文件内容中单词的统计功能。

4.2在IDEA中编写Java代码完成HDFS中某个文件中的单词统计功能

既然要统计HDFS中某个文件中的单词,那么我们首先要将文件上传到HDFS上吧!如何上传?听我慢慢道来。

使用命令:quit退出scala命令环境,首先在本地电脑的当前用户目录下创建一个文件,我这里创建了一个叫hello的txt文件,里面写上内容hello world hello you hello hello ,内容可以随便打啦,然后输入hadoop的命令(关于Hadoop的更多命令请自行google):hadoop fs -put ~/hello /,实现将本机目录下的hello文件推至远程主机的根目录下,然后便可以开始编写我们的java代码了。

使用IDEA创建一个Maven项目(便于管理我们的jar包嘛!),在pom.xml中添加上Spark相应jar包坐标:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>cn.codingxiaxw.spark</groupId>
<artifactId>spark-mvn</artifactId>
<packaging>war</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-mvn Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>

<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>

</dependencies>


<build>
<finalName>spark-mvn</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

然后创建一个WordCount.java文件,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class Simple
{

private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {

// if (args.length < 1) {
// System.err.println("Usage: JavaWordCount <file>");
// System.exit(1);
// }


//创建一个RDD对象
SparkConf conf=new SparkConf().setAppName("Simple").setMaster("local");

//创建spark上下文对象,是数据的入口
JavaSparkContext spark=new JavaSparkContext(conf);

//获取数据源
JavaRDD<String> lines = spark.textFile("hdfs://localhost:8020/hello");

/**
* 对于从数据源得到的DStream,用户可以在其基础上进行各种操作,
* 对于当前时间窗口内从数据源得到的数据首先进行分割,
* 然后利用Map和ReduceByKey方法进行计算,当然最后还有使用print()方法输出结果;
*/

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) {
return Arrays.asList(SPACE.split(s)).iterator();
}
});


//使用RDD的map和reduce方法进行计算
JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});


JavaPairRDD<String, Integer> counts = ones.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});

List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
//输出计算结果
System.out.println(tuple._1() + ": " + tuple._2());
}


spark.stop();
}
}

各行代码意思见代码旁的注释,上述代码都是从官方文档抄的,但是貌似要注释掉官方文档的:

1
2
3
4
//        if (args.length < 1) {
// System.err.println("Usage: JavaWordCount <file>");
// System.exit(1);
// }

然后运行程序,控制台输出结果如下图:

成功统计出HDFS上文件内容的单词个数。到此,我们便简单熟悉了Spark的相关API,下篇文章我将介绍通过Spark的Streaming Api实现对流式数据的处理为大家介绍Spark中Streming库的相关Api操作。

2018.3.19更

欢迎加入我的Java交流1群:659957958。

2018.4.21更:如果群1已满或者无法加入,请加Java学习交流2群:305335626

5.联系

If you have some questions after you see this article,you can tell your doubts in the comments area or you can find some info by clicking these links.

坚持原创技术分享,您的支持将鼓励我继续创作!