『 Spark 』使用Spark Streaming处理流式数据

在上篇文章mac下Spark的安装与使用的一个示例中,我们使用RDD的Map和Reduce操作对一个文本中的文字做了简单的字数检查。本篇文章的目的是讲述Spark 中的流数据处理。结构分为三个部分:第一部分简单对Spark进行介绍,第二部分介绍Spark Streaming,最后一部分通过一个简单的Demo讲讲Spark是如何通过Spark Streaming进行流式数据的处理。

1.简单介绍Spark

查看Spark官方文档,发现Spark除了Map和Reduce操作之外,它还支持SQL查询(Spark SQL),流数据(Spark Streaming),机器学习(Spark MLlib)和图表数据处理(Spark GraphX),也就是Spark的一些Api。开发者可以在一个数据管道用例中单独使用某一能力或者将这些能力结合在一起使用。

1.1Spark生态系统

除了Spark核心API之外,Spark生态系统中还包括其他附加库,可以在大数据分析和机器学习领域提供更多的能力。这些库的介绍如下:

  • Spark Streaming:
    • Spark Streaming基于微批量方式的计算和处理,可以用于处理实时的流数据。它使用DStream,简单来说就是一个弹性分布式数据集(RDD)系列,处理实时数据。
  • Spark SQL:
    • Spark SQL可以通过JDBC API将Spark数据集暴露出去,而且还可以用传统的BI和可视化工具在Spark数据上执行类似SQL的查询。用户还可以用Spark SQL对不同格式的数据(如JSON,Parquet以及数据库等)执行ETL,将其转化,然后暴露给特定的查询。
  • Spark MLlib:
    • MLlib是一个可扩展的Spark机器学习库,由通用的学习算法和工具组成,包括二元分类、线性回归、聚类、协同过滤、梯度下降以及底层优化原语。
  • Spark GraphX:
    • GraphX是用于图计算和并行图计算的新的(alpha)Spark API。通过引入弹性分布式属性图(Resilient Distributed Property Graph),一种顶点和边都带有属性的有向多重图,扩展了Spark RDD。为了支持图计算,GraphX暴露了一个基础操作符集合(如subgraph,joinVertices和aggregateMessages)和一个经过优化的Pregel API变体。此外,GraphX还包括一个持续增长的用于简化图分析任务的图算法和构建器集合。

除了这些库以外,还有一些其他的库,如BlinkDB和Tachyon。

  • BlinkDB:
    • BlinkDB是一个近似查询引擎,用于在海量数据上执行交互式SQL查询。BlinkDB可以通过牺牲数据精度来提升查询响应时间。通过在数据样本上执行查询并展示包含有意义的错误线注解的结果,操作大数据集合。
  • Tachyon:
    • Tachyon是一个以内存为中心的分布式文件系统,能够提供内存级别速度的跨集群框架(如Spark和MapReduce)的可信文件共享。它将工作集文件缓存在内存中,从而避免到磁盘中加载需要经常读取的数据集。通过这一机制,不同的作业/查询和框架可以以内存级的速度访问缓存的文件。
      此外,还有一些用于与其他产品集成的适配器,如Cassandra(Spark Cassandra 连接器)和R(SparkR)。Cassandra Connector可用于访问存储在Cassandra数据库中的数据并在这些数据上执行数据分析。

下图展示了在Spark生态系统中,这些不同的库之间的相互关联:

本篇文章只进行Spark Streaming的介绍。

1.2Spark体系架构

Spark体系架构包括如下三个主要组件:

  • 1.数据存储。
    • Spark用HDFS文件系统存储数据(这也就是我们在上问中提到为什么安装Spark要在安装Hadoop之后进行)。它可用于存储任何兼容于Hadoop的数据源,包括一些内置数据源如HDFS、TCP sockets和一些外置数据源如Twitter、Kafka等。
  • 2.API。
    • 利用API,应用开发者可以用标准的API接口创建基于Spark的应用。Spark提供Scala,Java和Python三种程序设计语言的API。
  • 3.资源管理。
    • Spark既可以部署在一个单独的服务器也可以部署在像Mesos或YARN这样的分布式计算框架之上。

如上对Spark进行了简单的介绍,接下来我们就基于Spark对其中的库Spark Streaming来进行介绍。

2.Spark Streaming的简单介绍

上篇文章中的demo中我们对一个文件中的字数基于批处理模式下进行了静态信息处理,比如作为一个按小时或天运行的任务。但若是在数据驱动的业务决策场景下,当需要飞快地分析实时数据流以执行分析并创建决策支持时,又该如何呢?

使用流式数据处理,一旦数据到达计算就会被实时完成,而非作为批处理任务。实时数据处理与分析正在变为大多数组织的大数据战略中至关重要的一个组件。接下来,我们将会学习到如何使用Apache Spark中一个被称为Spark流的库进行实时数据分析。

2.1流数据分析

流数据基本上是一组连续的数据记录,它们通常产生于诸如传感器、服务器流量与在线搜索等数据源。常见的流数据的例子有网站上的用户行为、监控数据、服务器日志与其他事件数据。

流数据处理应用会有助于现场面板、实时在线推荐与即时诈骗检测。
如果我们正在构建一个实时收集、处理与分析流数据的应用,我们需要按照与批处理数据应用不同的设计视角进行考虑。

下面列出了三种不同的流数据处理框架:

  • Apache Samza
  • Storm
  • Spark流
    在本文中我们将专注于Spark流,即Spark Streaming。

2.2Spark流

Spark流是核心Spark API的扩展。Spark流使得基于实时数据流构建容错性处理变得更加简单。下图(就是上面介绍到的Spark生态系统图)展示了Spark流是如何融入到整个Apache Spark生态系统中:

Spark流工作的方式是将数据流按照预先定义的间隔(N秒)划分为批(称微批次)然后将每批数据视为一个弹性分布式数据集(Resilient Distributed Datasets,RDDs)。随后我们就可以使用诸如map、reduce、reduceByKey、join和window这样的操作来处理这些RDDs。这些RDD操作的结果会以批的形式返回。通常我们会将这些结果保存到数据存储中以供未来分析并生成报表与面板,或是发送基于事件的预警。

为Spark流决定时间间隔是很重要的,这需要基于你的用例与数据处理要求。如果值N太低,那么在分析阶段微批次就没有足够的数据以给出有意义的结果。

与Spark流相比,其他流处理框架是基于每个事件而非一个微批次来处理数据流的。用微批次的方法,我们可以在同一应用下使用Spark流API来应用其他Spark库(比如核心、机器学习等)。

流数据可以来源于许多不同的数据源,一些内置数据源如HDFS、TCP sockets和一些外置数据源如Twitter、Kafka等。

使用诸如Apache Spark这种大数据处理框架的另外一个优势就是我们可以在同一系统中组合批处理与流处理。我们也可以在数据流上应用Spark的机器学习与图处理算法,Spark流结构如下图所示:

2.3Spark流用例

Spark流正在变为实现实时数据处理与分析方案的首选平台,这些实时数据往往来源于物联网(Internet of Things,IoT)和传感器。它被用于各种用例与商业应用。下面是一些最有趣的Spark流用例:

  • Uber:车驾共享服务背后的公司,在他们的持续流式ETL管道中使用了Spark流以每天从其移动用户处收集TB级的事件数据来进行实时遥测分析。
  • Pinterest:可视化书签工具背后的公司,使用Spark流、MemSQL与Apache Kafka技术以实时地深入了解他们全球的用户是怎样使用Pins的。
  • Netflix:使用Kafka与Spark流来构建一个实时在线电影推荐与数据监控解决方案,该方案每天要处理来自于不同数据源的数十亿条事件。

Spark流其他现实世界的样例还包括:

  • 供应链分析
  • 实时安全情报操作以寻找威胁
  • 广告竞价平台
  • 实时视频分析,以帮助观看者实现个性化与互动体验

2.4Spark流架构与API

让我们看一下Spark流的架构与API方法。若要编写Spark流程序,我们需要知晓两个组件:DStream与流上下文。

2.4.1DStream

Dstream(离散流,Discretized Stream,的缩写)是Spark流中最基本的抽象,它描述了一个持续的数据流。DStream既可以从诸如Kafka、Flume与Kinesis这样的数据源中创建,也可以对其他DStream实施操作。在内部,一个DStream被描述为一个RDD对象的序列。与RDDs上的转换与动作操作类似,DStream支持的操作有:map、flatMap、filter、count、reduce、countByValue、reduceByKey、join、updateStateByKey。

2.4.2流上下文

与Spark中的Spark上下文(SparkContext)相似,流上下文(StreamingContext)是所有流功能的主入口。

流上下文拥有内置方法可以将流数据接收到Spark流程序中。使用该上下文,我们可以创建一个描述基于TCP数据源的流数据的DStream,可以用主机名与端口号指定TCP数据源。比如,如果我们使用像netcat这样的工具来测试Spark流程序的话,我们将会从运行netcat的机器(比如localhost)的9999端口上接收到数据流。(下面的demo中我们以HDFS上的数据作为数据源)

当代码被执行,在启动时,Spark流仅是设置将要执行的计算,此时还没有进行实时处理。在所有的转换都被设置完毕后,为了启动处理,我们最终会调用start()方法来启动计算,还有awaitTermination()方法来等待计算终结。

2.4.3Spark流Api

由于我是搞java开发的,所以这里就只附上Spark提供的关于Java Api的官方链接啦,小伙伴自行查看吧。

2.5Spark编程的步骤

在展现出用demo应用之前,先来看看Spark流编程中与众不同的步骤:

  • 1.Spark流上下文被用于处理实时数据流。因此,第一步就是用两个参数初始化流上下文对象,Spark上下文和切片间隔时间。切片间隔设置了流中我们处理输入数据的更新窗口。一旦上下文被初始化,就无法再向已经存在的上下文中定义或添加新的计算。并且,在同一时间只有一个流上下文对象可以被激活。
  • 2.当Spark流上下文被定义后,我们通过创建输入DStreams来指定输入数据源。在我们的样例应用中,输入数据源是一个使用了Apache Kafka分布式数据库和消息系统的日志消息生成器。日志生成器程序创建随机日志消息以模拟网络服务器的运行时环境,作为各种网络应用服务用户而产生的流量,日志消息被持续不断地生成。
  • 3.使用map和reduce这样的Spark流变换API为DStreams定义计算。
    当流计算逻辑被定义好后,我们可以使用先前创建的流上下文对象中的start方法来开始接收并处理数据。
  • 4.最终,我们使用流上下文对象的awaitTermination方法等待流数据处理完毕并停止它。

好了,上述讲了这么多大道理,你不烦我都烦了,接下来通过上述介绍的编程步骤直接附上我的demo吧。

3.Spark处理流数据demo

这里我们使用IDEA创建一个maven项目,在pom.xml文件中添加上Spark相关API jar包的坐标如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>cn.codingxiaxw.spark</groupId>
<artifactId>spark-mvn</artifactId>
<packaging>war</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-mvn Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>

<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.2</version>
</dependency>
</dependencies>

<build>
<finalName>spark-mvn</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

然后创建一个WordCountStreaming.java,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package cn.codingxiaxw.test;

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
* Created by codingBoy on 16/12/8.
*/

public class WorldCountStreaming
{

public static void main(String[] args) throws InterruptedException {

//创建RDD
SparkConf sparkConf=new SparkConf().setMaster("local[2]").setAppName("NetworkWorldCount");

/**
*
*
* 创建StreamingContext对象: 同Spark初始化需要创建SparkContext对象一样,
* 使用Spark Streaming就需要创建StreamingContext对象。创建StreamingContext对象所需的参数与SparkContext基本一致,
* 包括指明Master,设定名称(如NetworkWordCount)。需要注意的是参数Seconds(1),Spark Streaming需要指定处理数据的时间间隔,
* 如上例所示的1s,那么Spark Streaming会以1s为时间窗口进行数据处理。此参数需要根据用户的需求和集群的处理能力进行适当的设置;
*
* 与Spark中的Spark上下文(SparkContext)相似,流上下文(StreamingContext)是所有流功能的主入口。
*/

JavaStreamingContext jsc=new JavaStreamingContext(sparkConf,Durations.seconds(1));

/**
* 创建InputDStream:如同Storm的Spout,Spark Streaming需要指明数据源。如上例所示的socketTextStream,Spark Streaming以
* socket连接作为数据源读取数据。当然Spark Streaming支持多种不同的数据源,包括Kafka、 Flume、HDFS/S3、Kinesis和Twitter
* 等数据源
*/

JavaReceiverInputDStream<String> lines=jsc.socketTextStream("localhost",8020);
//
// JavaReceiverInputDStream<String> lines= (JavaReceiverInputDStream<String>) jsc.textFileStream("hdfs://localhost:8020/directory");

/**
* 操作DStream:对于从数据源得到的DStream,用户可以在其基础上进行各种操作,
* 对于当前时间窗口内从数据源得到的数据首先进行分割,然后利用Map和ReduceByKey方法进行计算,当然最后还有使用print()方法输出结果;
*/

JavaDStream<String> words=lines.flatMap(
new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
}
);

/**
* 启动Spark Streaming之前所作的所有步骤只是创建了执行流程,程序没有真正连接上数据源,
* 也没有对数据进行任何操作,只是设定好了所有的执行计划,当ssc.start()启动后程序才真正进行所有预期的操作。
*/

JavaPairDStream<String,Integer> pairs=words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s,1);
}
}
);

JavaPairDStream<String,Integer> wordCounts=pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
}
);

wordCounts.print();

jsc.start();
jsc.awaitTermination();

}
}

通过运行main函数实现Spark对流式数据的处理,代码中我们指定数据源为localhost上的文件(有疑惑?听我慢慢道来),计算远程主机上某个文件内容的字数,发现控制台每隔1秒就输出如下图内容(即Spark每隔一秒就处理数据源的数据):

有人就有疑惑了,为什么控制台只输出了当前处理的时间而没有输出对数据源的操作信息(如这里我们是对数据源中的内容进行计数,为什么没有计数信息)呢?因为上述代码中我们没有指定正确的数据源,使用JavaReceiverInputDStream<String> lines= (JavaReceiverInputDStream<String>) jsc.textFileStream("hdfs://localhost:8020/directory");代替掉代码JavaReceiverInputDStream<String> lines=jsc.socketTextStream("localhost",8020);,括号中输入正确的数据源,这样就能实现实时监听数据源中变化的操作咯。至于数据源,你想用内置数据源,还是外置数据源就看你的实际需要咯!

到此,我们便完成了如果通过Spark来处理流式数据的demo。

参考:用Apache Spark进行大数据处理——第三部分:Spark流

2018.3.19更

欢迎加入我的Java交流1群:659957958。

2018.4.21更:如果群1已满或者无法加入,请加Java学习交流2群:305335626

4.联系

If you have some questions after you see this article,you can tell your doubts in the comments area or you can find some info by clicking these links.

记得扫一扫领一下红包再走哦