
spark 最初由美国加州伯克利大学 AMP 实验室于2009年开发,是基于内存计算的大数据并行计算框架,可用于构建大型的、低延迟的数据分析应用程序。spark 的主要特点是能够在内存中进行计算,并适用于各种各样原先需要使用不同的分布式平台的场景,包括批处理、迭代计算、交互式查询、流处理。通过在一个统一的框架下支持这些不同的计算,Spark 是我们可以简单而低耗地把各种处理流程整合在一起。
1.2 优势特性 1.2.1 运行速度快 Spark 使用先进的 DAG(Directed Acyclic Graph,有向无环图)执行引擎,以支持循环数据流和内存计算,相比于 Hadoop 基于磁盘来进行 MapReduce 运算要快上百倍;
1.2.2 容易使用 Spark 支持使用 Scala、Java、Python语言进行编程,简介的 API 设计有助于用户轻松构建并行程序,且可以通过 Spark Shell 进行交互式编程;
1.2.3 通用性 Spark 提供了完整而强大的技术栈,包括 SQL 查询、流式计算、机器学习和图算法组件,这些组件可以无缝整合在同一个应用中,足以应对复杂的计算;
1.2.4 运行模式多样 Spark 可运行于独立的集群模式中,或者运行于 Hadoop 中,也可运行于 Amazon EC2 等云环境中,并且可以访问 HDFS、Hbase以及 Hive 等多种数据源
1.3 Spark vs Hadoop 1.3.1 表达能力更丰富 Spark 的计算模式也属于 MapReduce,但不局限于 Map 和 Reduce操作,还提供了多种数据集操作类型,编程模型比 MapReduce 更灵活;
1.3.2 运算效率更优 Spark 提供了内存计算,中间结果直接放到内存中,但 Hadoop 每次在执行 MapReduce 操作时都需要从磁盘读取数据,并在计算完成后又再次将中间结果写入到磁盘中,导致 IO 开销更大,延迟较高;
1.3.3 先进的任务调度机制 Spark 是基于 DAG 的任务调度执行机制,要优于 MapReduce 的迭代执行机制
1.3.4 实际开发更方便 在实际进行开发时,使用 Hadoop 需要编写不少相对底层的代码,不够高效。相对而言,Spark 提供了多种高层次、简洁的 API。更重要的是,Spark 提供了交互式编程环境,可以方便地验证、调整算法。
尽管 Spark 相对于 Hadoop 而言具有较大优势,但 Spark 并不能完全替代 Hadoop,主要用于替代 Hadoop 中的 MapReduce 计算模型。实际上,Spark 已经很好地融入了 Hadoop 生态圈,并成为其中的重要一员,它可以借助于 YARN 实现资源调度管理,借助于 HDFS 实现分布式存储。此外,Hadoop 可以使用廉价的、异构的及其来做分布式存储与计算,但是,Spark 对硬件的要求稍高一些,堆内存与CPU有一定的要求。
1.4 Spark 生态系统 在实际的应用中,大数据处理主要包括以下三种类型:
相比于其他框架在功能上的单一性,Spark 能同时支持批处理、交互式查询和流数据处理。其生态系统主要包含的核心组件有 Spark core、Spark SQL、Spark Streaming、Spark MLib和 Spark Graphx 等,各个组件的具体功能如下:
1.4.1 Spark core 包含 Spark 的基本功能,如内存计算、任务调度、部署模式、故障恢复、存储管理等。Spark 建立在统一的抽象 RDD 之上,使其可以以基本一致的方式应对不同的大数据处理场景;
内部定义了 RDDS(弹性分布式数据集);
提供了很多APIs来创建和操作这些 RDDS;
应用场景:为其他组件提供底层的服务;
1.4.2 Spark SQL 支持对 Hive、Hbase等外部数据源的类 SQL 查询,每个数据表被当作一个RDD;
应用场景:企业中用来做报表统计;
1.4.3 Spark Streaming 支持高吞吐量、可容错处理的事实流数据处理,其核心思路是将流式计算分解成一系列短小的批处理作业;
是实时数据流处理组件,类似 Storm;
Spark Streaming 提供了 API 来操作实时流数据;
应用场景:企业中用来从 kafka 接收数据做实时统计;
1.4.4 Spark MLib 提供了常用机器学习算法的实现,包括聚类、分类、回归、协同过滤等,降低了机器学习的门槛;
应用场景:机器学习;
1.4.5 Spark Graphx 支持在 Spark 中进行图计算,可认为是 Pregel 在 Spark 上的重写及优化,GraphX 性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂的图算法;
应用场景:图计算;
1.5 相关术语帮助文档
Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos),以及Worker Node组成。对于每个Spark应用程序,Worker Node上存在一个Executor进程,Executor进程中包括多个Task线程。
2.2 Spark 的安装与部署 2.2.1 下载安装 Spark下载地址
将下载后的 Spark(版本为2.4.5),上传至服务器并解压 tar zxvf spark-2.4.5-bin-without-hadoop-scala-2.12.tgz
配置 Spark 的环境变量
vim /etc/profile # 在文件中增加如下内容 # SPARK_HOME export SPARK_HOME=/opt/lagou/servers/spark-2.4.5 export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin # 刷新文件使得环境变量生效 source /etc/profile2.2.2 Spark 的部署
修改配置
# 进入到文件所在的目录 cd $SPARK_HOME/conf # 将所要修改的文件的.template去掉 cp slaves.template slaves # 编辑指定的文件 vim slaves # 对该文件进行如下内容的修改 ## 注释掉原有的 localhost ## 增加对应的节点信息:linux121,linux122,linux123 分别为节点的名称 # localhost linux121 linux122 linux123
# 进入到文件所在的目录 cd $SPARK_HOME/conf # 将所要修改的文件的.template去掉 cp spark-defaults.conf.template spark-defaults.conf # 编辑指定的文件 vim spark-defaults.conf # 在该文件中增加如下内容 spark.master spark://linux121:7077 spark.eventLog.enabled true spark.eventLog.dir hdfs://linux121:9000/spark-eventlog spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory 512m
备注:
- spark.master:定义 master 节点,缺省端口号为 7077
- spark.eventLog.enabled:开启 eventLog
- spark.eventLog.dir: eventlog 的存放位置
- spark.serializer:一个高效的序列化器。不设置会使用默认的 java 的序列化器
- spark.driver.memory:定义 driver内存的大小(缺省 1G)
# 进入到文件所在的目录 cd $SPARK_HOME/conf # 将所要修改的文件的.template去掉 cp spark-env.sh.template spark-env.sh # 编辑指定的文件 vim spark-env.sh # 在该文件中增加如下内容 export JAVA_HOME=/opt/lagou/servers/jdk1.8.0_291 export HADOOP_HOME=/opt/lagou/servers/hadoop-2.9.2 export HADOOP_CONF_DIR=/opt/lagou/servers/hadoop-2.9.2/etc/hadoop export SPARK_DIST_CLASSPATH=$(/opt/lagou/servers/hadoop-2.9.2/bin/hadoop classpath) export SPARK_MASTER_HOST=linux121 export SPARK_MASTER_PORT=7077
备注:这里使用的是 spark-2.4.5-bin-without-hadoop,所以要将 Hadoop 相关 jars 的位置告诉Spark
将 Spark 软件分发到集群;修改其他节点的环境变量
cd /opt/lagou/servers/ scp -r spark-2.4.5/ linux122:$PWD scp -r spark-2.4.5/ linux123:$PWD
cd $SPARK_HOME/sbin ./start-all.sh
验证是否启动成功
# 分别在三台服务器上执行 jps 命令 jps [root@linux121 sbin]# jps 29079 DataNode 12313 Master 12427 Worker 12540 Jps 28943 NameNode [root@linux121 sbin]# [root@linux122 ~]# jps 26768 Jps 20741 DataNode 26536 Worker [root@linux122 ~]# [root@linux123 ~]# jps 25346 SecondaryNameNode 21413 Jps 21049 Worker 25243 DataNode [root@linux123 ~]#
可以看到在121上出现了 Master 和 Worker,在 122 和 123 上分别出现了 Worker,且启动过程中没有出现任何的错误信息,则表示启动成功
验证方式二
在浏览器中输入:http://linux121:8080/,若可以看到如下界面即成功
备注: Spark 运行在 Standalone 模式下。
2.2.3 集群测试在$HADOOP_HOME/sbin 及 $SPARK_HOME/sbin 下都有 start-all.sh 和stop-all.sh 文件
在输入 start-all.sh / stop-all.sh 命令时,谁的搜索路径在前面就先执行谁,此时会产生冲突
解决方案:
删除一组 start-all.sh / stop-all.sh 命令,让另外一组命令生效
将其中一组命令重命名。如:将 $HADOOP_HOME/sbin 路径下的命令重命名为:start-all-hadoop.sh / stop-all-hadoop.sh
将其中一个框架的 sbin 路径不放在 PATH 中
测试方式一:run-example 命令 在 121 上执行 run-example SparkPi 10 命令,会看到程序执行,直至执行完成,会看到有结果输出
测试方式二: spark-shell 命令
2.3 Spark 的部署模式 Spark 支持多种部署模式。最简单的就是单机本地模式(Spark 所有进程都运行在一台机器的 JVM 中)、伪分布式模式(在一台机器中模拟集群运行,相关的进程在同一台机器上)、分布式模式,分布式模式包括:Standalone、Yarn、Mesos。
2.3.1 本地模式 本地模式部署在单机上,主要用于测试或实验;最简单的运行模式,所有进程都运行在一台机器的 JVM 中。
本地模式用单机的多个线程来模拟 Spark 分布式计算,通常用来验证开发出来的应用程序逻辑上有没有问题。
这种模式非常简单,只需要把 Spark 的安装包解压后,修改一些常用的配置即可使用。不用启动 Spark 的 Master、Worker 守护进程,也不用启动 Hadoop 的服务(除非用到 HDFS)
local:在本地启动一个线程来运行作业**(默认允许作业失败的次数为1)**
local[N]:启动了 N 个线程**(默认允许作业失败的次数为1)**
local[*]:使用了系统中所有的核**(默认允许作业失败的次数为1)**
local[N,M]:第一个参数表示用到的核;第二个参数表示允许作业失败的次数;
本地模式的测试:
关闭相关的服务
stop-dfs.sh stop-all.sh
启动 Spark 本地运行模式
spark-shell --master local
此时出现了如下错误:
错误原因:因为在 Spark 的配置文件中配置了日志聚合(即用到了 HDFS,但 HDFS 服务关闭了),在配置文件中关闭日志集合的配置信息即可
# 进入到待修改的文件目录 cd $SPARK_HOME/conf # 打开待修改的文件 vim spark-defaults.conf # 修改指定文件,注释掉该文件中的如下两行信息即可 # spark.eventLog.enabled true # spark.eventLog.dir hdfs://linux121:9000/spark-eventlog
再次启动 Spark 本地运行模式
spark-shell --master local
可以发现程序正常启动了
执行测试程序
val lines = sc.textFile("file:///root/a.txt")
lines.count
在一台机器中模拟集群进行,相关的进程在同一台机器上,伪分布式模式不用启动集群资源管理服务。
local-cluster[N,cores,memory] # 参数之间没有空格,memory 也不能加单位 # N 模拟集群的 Slave(或 Worker)节点个数 # cores 模拟集群中各个 Slave 节点上的内核数 # memory 模拟集群的各个 Slave 节点的内存大小
伪分布式测试:
spark-shell --master local-cluster[4,2,1024]
[root@linux121 ~]# jps 17744 CoarseGrainedExecutorBackend # 用来并发执行程序的进程 17747 CoarseGrainedExecutorBackend # 用来并发执行程序的进程 18012 Jps 17740 CoarseGrainedExecutorBackend # 用来并发执行程序的进程 17566 SparkSubmit # 充当全能角色,既是 client 进程,又是 Driver 进程,还有资源管理的作用 17743 CoarseGrainedExecutorBackend # 用来并发执行程序的进程 [root@linux121 ~]#
spark-submit --master local-cluster[4,2,1024] --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.5.jar 10
备注:
Spark 支持 3 种集群部署模式:Standalone、Yarn、Mesos
2.3.4.1 Standalone 模式独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。从一定程度上说,该模式是其他两种的基础
Cluster Manager:Master
Worker Node: Worker
仅支持粗粒度的资源分配方式
Standalone 模式帮助文档
./sbin/start-slave.sh [options]; 启动节点上的 worker 进程,调试中较为常用
| SPARK_WORKER_CORES | Total number of cores to allow Spark applications to use on the machine (default: all available cores). |
|---|---|
| SPARK_WORKER_MEMORY | Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GiB); note that each application’s individual memory is configured using its spark.executor.memory property. |
| SPARK_WORKER_PORT | Start the Spark worker on a specific port (default: random). |
| SPARK_WORKER_WEBUI_PORT | Port for the worker web UI (default: 8081). |
| SPARK_WORKER_DIR | Directory to run applications in, which will include both logs and scratch space (default: SPARK_HOME/work). |
测试在 spark-env.sh 中增加参数,分发到集群,重启服务
export SPARK_WORKER_CORES=10 export SPARK_WORKER_MEMORY=20g2.3.4.1.2 运行模式 (cluster/client)
cluster 和 client 的最大的区别在于 Driver 运行在哪里
spark-submit --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.5.jar 1000
# 程序运行中 [root@linux121 ~]# jps 24033 SparkSubmit 23348 Worker # worker 管理节点资源 23237 Master # master 进程做为cluster manager,管理集群资源 24198 Jps # SparkSubmit 做为Client端,运行 Driver 程序。Spark Application执行完成,进程终止 24108 CoarseGrainedExecutorBackend # CoarseGrainedExecutorBackend,运行在Worker上,用来并发执行应用程序 # 程序运行结束 [root@linux121 ~]# jps 23348 Worker 23237 Master 24311 Jps [root@linux121 ~]#
spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.5.jar 1000
- SparkSubmit 进程会在应用程序提交给集群之后就退出
- Master会在集群中选择一个 Worker 进程生成一个子进程 DriverWrapper 来启动 Driver 程序
- Worker节点上会启动 CoarseGrainedExecutorBackend
- DriverWrapper 进程会占用 Worker 进程的一个core(缺省分配1个core,1G内存)
- 应用程序的结果,会在执行 Driver 程序的节点的 stdout 中输出,而不是打印在屏幕上
[root@linux121 ~]# jps 25393 Jps 23348 Worker 25332 SparkSubmit # SparkSubmit 进程会在应用程序提交给集群之后就退出 23237 Master [root@linux121 ~]# jps 25425 Jps 23348 Worker 23237 Master # Master会在集群中选择一个 Worker 进程生成一个子进程 DriverWrapper 来启动 Driver 程序 25406 CoarseGrainedExecutorBackend # Worker节点上会启动 CoarseGrainedExecutorBackend [root@linux121 ~]# [root@linux122 ~]# jps 2674 Worker 4066 CoarseGrainedExecutorBackend # Worker节点上会启动 CoarseGrainedExecutorBackend 4116 Jps 4008 DriverWrapper [root@linux122 ~]# [root@linux123 ~]# jps 31056 Worker 691 CoarseGrainedExecutorBackend # Worker节点上会启动 CoarseGrainedExecutorBackend 783 Jps [root@linux123 ~]#
在启动 DriverWrapper 的节点上,进入 $SPARK_HOME/work/目录,会看到以 driver开头的目录,会发现该目录下有如下文件:
配置文件修改
# spark-defaults.conf spark.eventLog.enabled true spark.eventLog.dir hdfs://node1:8020/spark-eventlog spark.eventLog.compress true # spark-env.sh export SPARK_HISTORY_OPTS= "-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=50 -Dspark.history.fs.logDirectory=hdfs://linux121:9000/spark-eventlog" spark.history.retainedApplications 设置缓存 Cache 中保存的应用程序历史记录的个数(默认50),如果超过这个值,旧的将被删除 缓存文件数不表示实际显示的文件总数,只是表示不在缓存中的文件可能需要从硬盘读取,速度稍有差别
启动 historyserver 服务
启动 historyserver 的前提条件是启动 hdfs 服务,因为将日志写入到了 hdfs 中。
$SPARK_HOME/sbin/start-history-server.sh2.3.4.2 Spark On Yarn 模式
基于 Yarn 模式需要启动的服务有:hdfs 服务、yarn 服务
需要关闭 Standalone 对应的服务(即集群中的Master、Worker进程)
在 Yarn 模式中,Spark 应用程序有两种运行模式:
yarn.nodemanager.pmem-check-enabled false yarn.nodemanager.vmem-check-enabled false
备注:
# spark-env.sh 中这一项必须要有 export HADOOP_CONF_DIR=/opt/lagou/servers/hadoop-2.9.2/etc/hadoop # spark-default.conf(以下是优化) # 与 hadoop historyserver集成 spark.yarn.historyServer.address linux121:18080 # 添加(以下是优化) spark.yarn.jars hdfs:///spark-yarn/jars/*.jar # 将 $SPARK_HOME/jars 下的jar包上传到hdfs hdfs dfs -mkdir -p /spark-yarn/jars/ cd $SPARK_HOME/jars hdfs dfs -put * /spark-yarn/jars/
# client spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.5.jar 2000
在提取 APP 节点上可以看见:SparkSubmit、CoarseGrainedExecutorBackend
在集群的其他节点上可以看见:CoarseGrainedExecutorBackend
在提取App节点上可以看见:程序计算的结果(即可以看见计算返回的结果)
# cluster spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.11-2.4.5.jar 2000
在提取App节点上可以看见:SparkSubmit
在集群的其他节点上可以看见:CoarseGrainedExecutorBackend、ApplicationMaster(Driver运行在此)
在提取App节点上看不见最终的结果
5. 整合 HistoryServer服务
前提:Hadoop的 HDFS、Yarn、HistoryServer 正常;Spark historyserver服务正常;
Hadoop:JobHistoryServer
Spark:HistoryServer
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.5.jar 202.3.4.3 Spark On Mesos 模式
Spark Standalone集群是 Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master单点故障的问题。如何解决这个问题,Spark提供了两种解决方案:
2.3.4.4.1 基于 zookeeper的 Standby Master,适用于生产模式。将 Spark 集群连接到 zookeeper,利用 zookeeper 提供的选举和状态保存的功能,一个 Master 处于 Active 状态,其他 Master 处于 Standby 状态;
保证在 Zk 中的元数据是集群的信息,包括:Worker,Driver和Application以及Executors的信息;
如果 Active 的 Master 挂掉了,通过选举产生新的 Active 的 Master,然后执行状态恢复,整个恢复过程可能需要1~2分钟
主要用于开发或者测试环境。将 Spark Application 和 Worker 的注册信息保存在文件中,一旦Master发生故障,就可以重新启动Master进程,将系统恢复到之前的状态
# 注释以下两行!!! # export SPARK_MASTER_HOST=linux121 # export SPARK_MASTER_PORT=7077 # 添加以下内容 export SPARK_DAEMON_JAVA_OPTS=" - Dspark.deploy.recoveryMode=ZOOKEEPER - Dspark.deploy.zookeeper.url=linux121,linux122,linux123 - Dspark.deploy.zookeeper.dir=/spark"
备注:
$SPARK_HOME/sbin/start-all.sh
$SPARK_HOME/sbin/start-master.sh
在浏览器中输入 添加链接描述,此时 Master 的状态为 Standby
6. 杀到linux121上 Master 进程,再观察 linux122 上 Master 状态,由 STANDBY
=> RECOVERING => ALIVE
每个应用程序的运行环境有一个 Driver 和 若干个 Executor 组成,其中,每个 Executor 占用若干资源,内部可运行多个 Task。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。
2.4.2 细粒度模式(Fine-grained Mode)鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos 还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,核心思想是按需分配。
2.5 三种集群部署模式如何选择