大数据_Spark入门基础

Spark是什么

Spark的概述

image-20220118093552784


Apache Spark是用于大规模数据(large-scale data)处理的统一(unified)分析引擎。

2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、SparkStreaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。

Spark的特点

Spark 使用Scala语言进行实现,它是一种面向对象、函数式编程语言,能够像操作本地集合一样轻松的操作分布式数据集。Spark具有运行速度快、易用性好、通用性强和随处运行等特点。

速度快

  • Spark VS Hadoop
Hadoop Spark
类型 基础平台, 包含计算, 存储, 调度 分布式计算工具
场景 大规模数据集上的批处理 迭代计算, 交互式计算, 流计算
价格 对机器要求低, 便宜 对内存有要求, 相对较贵
编程范式 Map+Reduce, API 较为底层, 算法适应性差 RDD组成DAG有向无环图, API 较为顶层, 方便使用
数据存储结构 MapReduce中间计算结果在HDFS磁盘上, 延迟大 RDD中间运算结果在内存中 , 延迟小
运行方式 Task以进程方式维护, 任务启动慢 Task以线程方式维护, 任务启动快

尽管Spark相对于Hadoop而言具有较大优势,但Spark并不能完全替代Hadoop

  • Spark主要用于替代Hadoop中的MapReduce计算模型。存储依然可以使用HDFS,但是中间结果可以存放在内存中;
  • Spark已经很好地融入了Hadoop生态圈,并成为其中的重要一员,它可以借助于YARN实现资源调度管理,借助于HDFS实现分布式存储。
  • Hadoop的基于进程的计算和Spark基于线程方式优缺点
    • Hadoop中的MR中每个map/reduce task都是一个java进程方式运行,好处在于进程之间是互相独立的,每个task独享进程资源,没有互相干扰,监控方便,但是问题在于task之间不方便共享数据,执行效率比较低。比如多个map task读取不同数据源文件需要将数据源加载到每个map task中,造成重复加载和浪费内存。而基于线程的方式计算是为了数据共享和提高执行效率,Spark采用了线程的最小的执行单位,但缺点是线程之间会有资源竞争。
  • 由于Apache Spark支持内存计算,并且通过DAG(有向无环图)执行引擎支持无环数据流,所以官方宣称其在内存中的运算速度要比Hadoop的MapReduce快100倍,在硬盘中要快10倍。

    image-20220118104327362

    • Spark处理数据与MapReduce处理数据相比,有如下两个不同点:
      • 其一、Spark处理数据时,可以将中间处理结果数据存储到内存中;
      • 其二、Spark Job调度以DAG方式,并且每个任务Task执行以线程(Thread)方式,并不是像MapReduce以进程(Process)方式执行。

易使用

  • Spark 的版本已经更新到 Spark 3.2.0(截止日期2022.01.15)

  • 为了兼容Spark2.x企业级应用场景,Spark仍然持续更新Spark2版本。

  • Spark可以使用Java、Scala、Python、R和SQL快速编写应用程序。

  • Spark提供80多个高级操作,使构建并行应用程序变得容易。可以从Scala、Python、R和SQL shell交互使用它。

通用性强

在 Spark 的基础上,Spark 还提供了包括Spark SQL、Spark Streaming、MLib 及GraphX在内的多个工具库,可以在一个应用中无缝地使用这些工具库。

image-20220118105020975

运行在任何地方

Spark 支持多种运行方式,包括在 Hadoop 和 Mesos 上,也支持 Standalone的独立运行模式,同时也可以运行在云Kubernetes(Spark 2.3开始支持)上。

对于数据源而言,Spark 支持从HDFS、HBase、Cassandra 及 Kafka 等多种途径获取数据。

image-20220118103148956

Spark框架结构

整个Spark 框架模块包含:Spark Coke、 Spark SQL、 Spark Streaming、 Spark GraphX、 Spark MLlib,而后四项的能力都是建立在核心引擎之上

image-20220118103208082

Spark通信框架

整个Spark 框架通信采用模块:Netty

什么是RPC:https://www.jianshu.com/p/2accc2840a1b

Spark 1.6,引入了Netty
Spark 2.0之后,完全使用Netty,并把akka移除了。

Spark的安装

Spark Local 模式搭建

在本地使用单机多线程模拟Spark集群中的各个角色

安装包下载

目前Spark最新稳定版本:

3.2.0系列

https://spark.apache.org/docs/3.2.0/index.html

★注意1:

Spark3.0+基于Scala2.12

http://spark.apache.org/downloads.html

★注意2:

目前企业中使用较多的Spark版本还是Spark2.x,如Spark2.2.0、Spark2.4.5都使用较多,但未来Spark3.X肯定是主流,毕竟官方高版本是对低版本的兼容以及提升

http://spark.apache.org/releases/spark-release-3-0-0.html

将安装包上传并解压

说明: 只需要上传至node1即可, 以下操作都是在node1执行的

1
2
3
4
5
6
7
8
9
10
11
12
cd /export/software
rz 上传
# 解压:
tar -zxf spark-3.1.2-bin-hadoop3.2.tgz -C /export/server/
# 更名: (两种方式二选一即可, 推荐软连接方案)
cd /export/server
# 方式一:
# 软连接方案:
ln -s spark-3.1.2-bin-hadoop3.2 spark
# 方式二:
# 直接重命名:
mv spark-3.1.2-bin-hadoop3.2 spark

image-20220118144350465

测试

Spark的local模式, 开箱即用, 直接启动bin目录下的spark-shell脚本

1
2
cd /export/server/spark/bin
./spark-shell -master local[*]

image-20220118144947390

sc:SparkContext实例对象:
spark:SparkSession实例对象
4040:Web监控页面端口号

  • Spark-shell说明:
    • 1.直接使用./spark-shell
      • 表示使用local 模式启动,在本机启动一个SparkSubmit进程
    • 2.还可指定参数 –master,如:
      • spark-shell –master local[N] 表示在本地模拟N个线程来运行当前任务
      • spark-shell –master local[*] 表示使用当前机器上所有可用的资源
    • 3.不携带参数默认就是
      • spark-shell –master local[*]
    • 4.后续还可以使用–master指定集群地址,表示把任务提交到集群上运行,如
      • ./spark-shell –master spark://node01:7077,node02:7077
    • 5.退出spark-shell 使用 :
      • quit

PySpark的安装

  • PySpark是Python的库, 由Spark官方提供. 专供Python语言使用. 类似Pandas一样,是一个库
  • Spark是一个独立的框架, 包含PySpark的全部功能, 除此之外, Spark框架还包含了对R语言\ Java语言\ Scala语言的支持. 功能更全. 可以认为是通用Spark。

安装Anaconda

下载安装&简单配置

安装版本:https://www.anaconda.com/distribution/#download-section
Python3.8.8版本:Anaconda3-2021.05-Linux-x86_64.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
cd /export/software
rz 上传Anaconda脚本环境
bash Anaconda3-2021.05-Linux-x86_64.sh
# 不断输入空格,然后输入yes
cd /export/software
rz 上传Anaconda脚本环境
#执行脚本:
bash Anaconda3-2021.05-Linux-x86_64.sh
# 不断输入空格, 直至出现以下内容, 然后输入yes
# 此时, anaconda需要下载相关的依赖包, 时间比较长, 耐心等待即可....
# 配置anaconda的环境变量:
vim /etc/profile
# 增加如下配置
export ANACONDA_HOME=/root/anaconda3/bin
export PATH=$PATH:$ANACONDA_HOME/bin
# 重新加载环境变量:
source /etc/profile
# 修改bashrc文件
sudo vim ~/.bashrc
# 添加如下内容:
export PATH=~/anaconda3/bin:$PATH

说明:

  • profile
    • 其实看名字就能了解大概了, profile 是某个用户唯一的用来设置环境变量的地方, 因为用户可以有多个 shell 比如 bash, sh, zsh 之类的, 但像环境变量这种其实只需要在统一的一个地方初始化就可以了, 而这就是 profile.
  • bashrc
    • bashrc 也是看名字就知道, 是专门用来给 bash 做初始化的比如用来初始化 bash 的设置, bash 的代码补全, bash 的别名, bash 的颜色. 以此类推也就还会有 shrc, zshrc 这样的文件存在了, 只是 bash 太常用了而已.

请将当前连接node1的节点窗口关闭,然后重新打开,否则无法识别

使用anaconda之后, 在linux的界面中出现 base 字段内容, 主要的原因是 anaconda在安装之后, 会自动创建一个base的沙箱环境, 同时在启动会话的时候, 自动进入到基础环境中,如何屏蔽呢?

1
2
3
4
5
# 修改.bashrc 配置文件, 在文件默认添加一个: conda deactivate
sudo vim ~/.bashrc
# 在文件的末尾添加:
conda deactivate
# 重新打开会话, 即可解决问题

Anaconda相关组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
安装包:pip install xxx,conda install xxx
卸载包:pip uninstall xxx,conda uninstall xxx
升级包:pip install upgrade xxx,conda update xxx

Jupyter Notebook:启动命令
jupyter notebook
功能如下:
Anaconda自带,无需单独安装
实时查看运行过程
基本的web编辑器(本地)
ipynb 文件分享
可交互式
记录历史运行结果
修改jupyter显示的文件路径:
通过jupyter notebook --generate-config命令创建配置文件,之后在进入用户文件夹下面查看.jupyter隐藏文件夹,修改其中文件jupyter_notebook_config.py的202行为计算机本地存在的路径。
IPython:
命令:ipython,其功能如下
1.Anaconda自带,无需单独安装
2.Python的交互式命令行 Shell
3.可交互式
4.记录历史运行结果
5.及时验证想法
Spyder:
命令:spyder,其功能如下
1.Anaconda自带,无需单独安装
2.完全免费,适合熟悉Matlab的用户
3.功能强大,使用简单的图形界面开发环境

conda命令及pip命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# conda管理数据科学环境,conda和pip类似均为安装、卸载或管理Python第三方包。
conda install 包名 pip install 包名
conda uninstall 包名 pip uninstall 包名
conda install -U 包名 pip install -U 包名

# Anaconda设置为国内下载镜像
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --set show_channel_urls yes

# conda创建虚拟环境
conda env list
conda create py_env python=3.8.8 #创建python3.8.8环境
activate py_env #激活环境
deactivate py_env #退出环境

安装PySpark

所有节点都需要安装pySpark的

三个方法:

  1. 直接pip

    1. pip install pyspark

    2. pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark # 指定清华镜像源

    3. 如果要为特定组件安装额外的依赖项,可以按如下方式安装(此步骤暂不执行,后面Sparksql部分会执行):

      pip install pyspark[sql]

  2. 创建Conda环境安装PySpark

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    # 从终端创建新的虚拟环境
    conda create -n pyspark_env python=3.8

    # 创建虚拟环境后,它应该在 Conda 环境列表下可见,可以使用以下命令查看
    conda env list

    # 现在使用以下命令激活新创建的环境:
    source activate pyspark_env
    # 或者
    conda activate pyspark_env
    # 如果报错: CommandNotFoundError: Your shell has not been properly configured to use 'conda deactivate'.切换使用 source activate
    # 可以在新创建的环境中通过使用PyPI安装PySpark来安装pyspark,例如如下。它将pyspark_env在上面创建的新虚拟环境下安装 PySpark。
    pip install pyspark
    # 或者,可以从 Conda 本身安装 PySpark:
    conda install pyspark
  3. 手动下载安装

    将spark对应版本下的python目录下的pyspark复制到anaconda的
    Library/Python3/site-packages/目录下即可。

    请注意,PySpark 需要JAVA_HOME正确设置的Java 8 或更高版本。如果使用 JDK 11,请设置-Dio.netty.tryReflectionSetAccessible=true,Arrow相关功能才可以使用。

    扩展:
    conda虚拟环境命令
    查看所有环境
    conda info --envs
    新建虚拟环境
    conda create -n myenv python=3.6
    删除虚拟环境
    conda remove -n myenv --all
    激活虚拟环境
    conda activate myenv
    source activate base
    退出虚拟环境
    conda deactivate myenv

测试PySpark shell方式

前面的Spark Shell实际上使用的是Scala交互式Shell,实际上 Spark 也提供了一个用 Python 交互式Shell,即Pyspark。

1
bin/pyspark --master local[*]

image-20220118192141745

Spark On Yarn搭建

概述

  • Spark On Yarn的本质?
    • 将Spark任务的pyspark文件,经过Py4J转换,提交到Yarn的JVM中去运行
  • Spark On Yarn需要啥?
    • 1.需要Yarn集群:已经安装了
    • 2.需要一台机器上有提交工具:spark-submit命令–在spark/bin目录
    • 3.需要上面的机器上有被提交的PySpark代码:Spark任务的文件,如spark/examples/src/main/python/pi.py中有示例程序,或我们后续自己开发的Spark任务)
    • 4.需要其他依赖jar:Yarn的JVM运行PySpark的代码经过Py4J转化为字节码,需要Spark的jar包支持!Spark安装目录中有jar包,在spark/jars/中

修改配置

当Spark Application运行到YARN上时,在提交应用时指定master为yarn即可,同时需要告知YARN集群配置信息(比如ResourceManager地址信息),此外需要监控Spark Application,配置历史服务器相关属性。

spark-env.sh
1
2
3
4
5
6
7
8
9
10
cd /export/server/spark/conf
vim /export/server/spark/conf/spark-env.sh
## 添加内容
## HADOOP软件配置文件目录,读取HDFS上文件和运行YARN集群
HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
YARN_CONF_DIR=/export/server/hadoop/etc/hadoop
## 同步
cd /export/server/spark/conf
scp -r spark-env.sh root@node2:$PWD
scp -r spark-env.sh root@node3:$PWD
整合&关闭资源检查

整合历史服务器MRHistoryServer并关闭资源检查


整合Yarn历史服务器并关闭资源检查

在【$HADOOP_HOME/etc/hadoop/yarn-site.xml】配置文件中,指定MRHistoryServer地址信息,

在node1上修改, 修改成下面的XML:

1
2
cd /export/server/hadoop/etc/hadoop
vim /export/server/hadoop/etc/hadoop/yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<configuration>
<!-- 配置yarn主节点的位置 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>node1</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 设置yarn集群的内存分配方案 -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>20480</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>2.1</value>
</property>
<!-- 开启日志聚合功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 设置聚合日志在hdfs上的保存时间 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
<!-- 设置yarn历史服务器地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://node1:19888/jobhistory/logs</value>
</property>
<!-- 关闭yarn内存检查 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
</configuration>

同步到全部机器

1
2
3
cd /export/server/hadoop/etc/hadoop
scp -r yarn-site.xml root@node2:$PWD
scp -r yarn-site.xml root@node3:$PWD
配置HistoryServer

在【$SPARK_HOME/conf/spark-defaults.conf】文件增加SparkHistoryServer地址信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
## 进入配置目录
cd /export/server/spark/conf
## 修改配置文件名称
mv spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
## 添加内容:
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node1:8020/sparklog/
spark.eventLog.compress true
spark.yarn.historyServer.address node1:18080

## 设置日志级别
## 进入目录
cd /export/server/spark/conf
## 修改日志属性配置文件名称
mv log4j.properties.template log4j.properties
## 改变日志级别
vim log4j.properties

修改内容如下:

image-20220119160708194

同步到全部机器

1
2
3
4
5
cd /export/server/spark/conf
scp -r spark-defaults.conf root@node2:$PWD
scp -r spark-defaults.conf root@node3:$PWD
scp -r log4j.properties root@node2:$PWD
scp -r log4j.properties root@node3:$PWD
配置依赖Spark Jar包

当Spark Application应用提交运行在YARN上时,默认情况下,每次提交应用都需要将依赖Spark相关jar包上传到YARN 集群中,为了节省提交时间和存储空间,将Spark相关jar包上传到HDFS目录中,设置属性告知Spark Application应用。

1
2
3
4
5
6
7
8
9
10
11
12
13
## hdfs上创建存储spark相关jar包目录
hadoop fs -mkdir -p /spark/jars/
## 上传$SPARK_HOME/jars所有jar包
hadoop fs -put /export/server/spark/jars/* /spark/jars/
## 在spark-defaults.conf中增加Spark相关jar包位置信息:
## 在node1上操作
vim /export/server/spark/conf/spark-defaults.conf
## 添加内容
spark.yarn.jars hdfs://node1:8020/spark/jars/*
## 同步
cd /export/server/spark/conf
scp -r spark-defaults.conf root@node2:$PWD
scp -r spark-defaults.conf root@node3:$PWD

启动服务

Spark Application运行在YARN上时,上述配置完成

启动服务:HDFS、YARN、MRHistoryServer和Spark HistoryServer

1
2
3
4
5
6
7
8
9
10
## 启动HDFS和YARN服务,在node1执行命令
start-dfs.sh
start-yarn.sh
## 或
/export/server/hadoop/sbin/start-all.sh
## 注意:在onyarn模式下不需要启动start-all.sh(jps查看一下看到worker和master)
## 启动MRHistoryServer服务,在node1执行命令
/export/server/hadoop/sbin/mr-jobhistory-daemon.sh start historyserver
## 启动Spark HistoryServer服务,,在node1执行命令
/export/server/spark/sbin/start-history-server.sh

Spark HistoryServer服务WEB UI页面地址:http://node1:18080/

Pyspark初体验

Pycharm连接SSH

本文使用的pycharm2021专业版, 社区版部分功能没有, 注意.

原理图

image-20220118192500386

连接操作

image-20220118192616578

image-20220118192622251

image-20220118192637741

image-20220118192709409

image-20220118192730283

image-20220118192739050

image-20220118192743432

image-20220118192747281

初始化

image-20220118192856287

image-20220118192902413

image-20220118192910070

image-20220118192914827

运行wordcount程序

image-20220118193449522

原理图

image-20220118193113319

代码

需要等待pycharm初始化索引后操作

测试文本文件
1
2
3
4
Hello World
Hello Spark
Hello Hello
Hello Baby
  1. 本地创建一个words.tx
  2. 往HDFS传一个hdfs dfs -put word.txt /pydata/words.txt hdfs dfs -ls /pydata
本地操作代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# -- coding: utf-8 --
# @Author : Pandolar
# @File : wordcount.py
# @Software: PyCharm
# @Time : 2022/1/18 17:06
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
# 1-首先创建SparkContext上下文环境
conf = SparkConf().setAppName('wordcount').setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2-从外部文件数据源读取数据
rdd1 = sc.textFile('file:///tmp/pycharm_project_594/PySpark-SparkBase_3.1.2/data/words.txt')
# 3-执行flatmap执行扁平化操作
rdd2 = rdd1.flatMap(lambda line: line.split(' '))
# 4-执行map转化操作,得到(Word,1)
rdd3 = rdd2.map(lambda word: (word, 1))
# 5-reduceByKey将相同Key的Value数据累加操作
rdd4 = rdd3.reduceByKey(lambda x, y: x + y)
# 6-将结果输出到文件系统或打印
list_1 = rdd4.collect()
for i in list_1: print(i)
HDFS操作代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# -- coding: utf-8 --
# @Author : Pandolar
# @File : wordcount.py
# @Software: PyCharm
# @Time : 2022/1/18 17:06
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
# 1-首先创建SparkContext上下文环境
conf = SparkConf().setAppName('wordcount').setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2-从外部文件数据源读取数据
rdd1 = sc.textFile('hdfs://node1:8020/pydata/')
# 3-执行flatmap执行扁平化操作
rdd2 = rdd1.flatMap(lambda line: line.split(' '))
# 4-执行map转化操作,得到(Word,1)
rdd3 = rdd2.map(lambda word: (word, 1))
# 5-reduceByKey将相同Key的Value数据累加操作
rdd4 = rdd3.reduceByKey(lambda x, y: x + y)
# 6-将结果输出到文件系统或打印
rdd4.saveAsTextFile("hdfs://node1:8020/pydata/output1")
list_1 = rdd4.collect()
for i in list_1: print(i)
spark_submit操作代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# -- coding: utf-8 --
# @Author : Pandolar
# @File : wordcount.py
# @Software: PyCharm
# @Time : 2022/1/18 17:06
from pyspark import SparkConf, SparkContext
import sys

if __name__ == '__main__':
# 1-首先创建SparkContext上下文环境
conf = SparkConf().setAppName('wordcount').setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2-从外部文件数据源读取数据
rdd1 = sc.textFile(sys.argv[1])
# 3-执行flatmap执行扁平化操作
rdd2 = rdd1.flatMap(lambda line: line.split(' '))
# 4-执行map转化操作,得到(Word,1)
rdd3 = rdd2.map(lambda word: (word, 1))
# 5-reduceByKey将相同Key的Value数据累加操作
rdd4 = rdd3.reduceByKey(lambda x, y: x + y)
# 6-将结果输出到文件系统或打印
# rdd4.saveAsTextFile("hdfs://node1:8020/pydata/output1")
list_1 = rdd4.collect()
for i in list_1: print(i)
1
spark-submit --master local[*] /tmp/pycharm_project_594/PySpark-SparkBase_3.1.2/main/wordcount_spark_submit.py file:///tmp/pycharm_project_594/PySpark-SparkBase_3.1.2/data/words.txt
排错

如果提示错误JAVA HOME is not set

1
2
3
vim  /root/.bashrc
export JAVA_HOME=/export/server/jdk1.8.0_241
export PYSPARK_PYTHON=/root/anaconda3/bin/python

profile文件是在用户登录的时候进行初始化的,/etc/profile和/.profile分别对应所有用户和单个用户。而/.bashrc文件,则是在每次启动一个shell的时候,对环境进行初始化。即etc是全局的,但是远程python只读取.bashrc

Spark On Yarn运行

Spark on Yarn的本质

image-20220119204016157

Spark应用架构

  • 只有Spark应用跑起来才有,如果程序退出,则消失。

  • 就是Spark运行时架构。

  • 1642579917339

  • 从图中可以看到一个Spark Application运行到集群上时,由两部分组成:【一个driver】和【多个Executor】。

    1、Driver Program

    • 是一个JVM Process,运行程序的MAIN函数,必须创建SparkContext上下文对象;
    • 一个SparkApplication仅有【1】个;

    2、Executors

    • 运行JVM Process,相当于一个线程池,其中有很多线程,每个线程运行【1】个Task任务,一个Task运行需要【1】 Core CPU,所以Executor中线程数就等于CPU Core核数;
    • 一个Spark Application可以有【多】个;

Driver中如何由job拆分成task的过程

1642580405548

运行模式概述

Spark On Yarn 有两种模式, ClientCluster

Cluster和Client模式最最本质的区别是:Driver程序运行在哪里。

  • client模式:学习测试时使用,开发不用,了解即可

    • Driver运行在Client上的SparkSubmit进程中
    • 应用程序运行结果会在客户端显示
  • cluster模式:生产环境中使用该模式

    • Driver程序在YARN集群中
    • 应用的运行结果不能在客户端client显示

当一个MR应用提交运行到Hadoop YARN上时
包含两个部分:应用管理者AppMaster和运行应用进程Process(如MapReduce程序MapTask和ReduceTask任务)

image-20220119201330780


当一个Spark应用提交运行在集群上时,
应用架构有两部分组成:Driver Program(资源申请和调度Job执行)和Executors(运行Job中Task任务和缓存数据),都是JVM Process进程

image-20220119201454000

而Driver程序运行的位置可以通过–deploy-mode 来指定,
值可以是:
1.client:表示Driver运行在提交应用的Client上(默认)
2.cluster:表示Driver运行在集群中(Standalone:Worker,YARN:NodeManager)

Client运行模式

DeployMode为Client,表示应用Driver Program运行在提交应用Client主机上

image-20220119201640023

跑个代码

1
2
3
4
5
6
7
8
9
10
11
12
13
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--num-executors 2 \
--queue default \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
${SPARK_HOME}/examples/src/main/python/pi.py \
10

image-20220119204313804

结果查看:http://node1:8088/cluster

image-20220119203741366

Cluster运行模式

DeployMode为Cluster,表示应用Driver Program运行在集群从节点某台机器上

image-20220119201808253

跑个代码

1
2
3
4
5
6
7
8
9
10
11
12
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--num-executors 2 \
--queue default \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
${SPARK_HOME}/examples/src/main/python/pi.py \
10

结果查看:http://node1:8088/cluster

Spark相关参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
(base) [root@node1 bin]# ./spark-submit --help
Usage: spark-submit [options] <app jar | python file | R file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]
Usage: spark-submit run-example [options] example-class [example args]

Options:
--master MASTER_URL spark://host:port, mesos://host:port, yarn,
k8s://https://host:port, or local (Default: local[*]).
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
on one of the worker machines inside the cluster ("cluster")
(Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
--name NAME A name of your application.
--jars JARS Comma-separated list of jars to include on the driver
and executor classpaths.
--packages Comma-separated list of maven coordinates of jars to include
on the driver and executor classpaths. Will search the local
maven repo, then maven central and any additional remote
repositories given by --repositories. The format for the
coordinates should be groupId:artifactId:version.
--exclude-packages Comma-separated list of groupId:artifactId, to exclude while
resolving the dependencies provided in --packages to avoid
dependency conflicts.
--repositories Comma-separated list of additional remote repositories to
search for the maven coordinates given with --packages.
--py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
on the PYTHONPATH for Python apps.
--files FILES Comma-separated list of files to be placed in the working
directory of each executor. File paths of these files
in executors can be accessed via SparkFiles.get(fileName).
--archives ARCHIVES Comma-separated list of archives to be extracted into the
working directory of each executor.

--conf, -c PROP=VALUE Arbitrary Spark configuration property.
--properties-file FILE Path to a file from which to load extra properties. If not
specified, this will look for conf/spark-defaults.conf.

--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
--driver-java-options Extra Java options to pass to the driver.
--driver-library-path Extra library path entries to pass to the driver.
--driver-class-path Extra class path entries to pass to the driver. Note that
jars added with --jars are automatically included in the
classpath.

--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).

--proxy-user NAME User to impersonate when submitting the application.
This argument does not work with --principal / --keytab.

--help, -h Show this help message and exit.
--verbose, -v Print additional debug output.
--version, Print the version of current Spark.

Cluster deploy mode only:
--driver-cores NUM Number of cores used by the driver, only in cluster mode
(Default: 1).

Spark standalone or Mesos with cluster deploy mode only:
--supervise If given, restarts the driver on failure.

Spark standalone, Mesos or K8s with cluster deploy mode only:
--kill SUBMISSION_ID If given, kills the driver specified.
--status SUBMISSION_ID If given, requests the status of the driver specified.

Spark standalone, Mesos and Kubernetes only:
--total-executor-cores NUM Total cores for all executors.

Spark standalone, YARN and Kubernetes only:
--executor-cores NUM Number of cores used by each executor. (Default: 1 in
YARN and K8S modes, or all available cores on the worker
in standalone mode).

Spark on YARN and Kubernetes only:
--num-executors NUM Number of executors to launch (Default: 2).
If dynamic allocation is enabled, the initial number of
executors will be at least NUM.
--principal PRINCIPAL Principal to be used to login to KDC.
--keytab KEYTAB The full path to the file that contains the keytab for the
principal specified above.

Spark on YARN only:
--queue QUEUE_NAME The YARN queue to submit to (Default: "default").

常用命令:

  • 公共参数

    • –master :Spark使用什么资源管理器来分配【CPU】和【内存】资源,后面可以跟【spark://node1:7077】,【yarn】 ,【local[*]】等,企业中用的最多的是【yarn】

    • –deploy-mode :只有【2】个值,【client】和【cluster】,区别是【Driver】进程运行在哪里

    • –name :给Spark程序起一个名字,后期可以在WEBUI(端口是【4040】)页面上看到

    • -class 包名.类名 需要跟上xx.jar:固定用法,需要配合spark-submit使用的,意思是提交jar包中的类的main方法程序。

      • 比如

        1
        2
        3
        4
        5
        6
        SPARK_HOME=/export/server/spark
        ${SPARK_HOME}/bin/spark-submit \
        --master local[*] \
        --class org.apache.spark.examples.SparkPi \
        ${SPARK_HOME}/examples/jars/spark-examples_2.12-3.1.2.jar \
        10
  • Driver进程相关参数:

    • –driver-memory 申请【Driver】进程的【内存】有大多。
    • –driver-cores :仅当是以–deploy-mode 是【cluster】是才有意义。表示【Driver】进程申请几个【核】
  • Executor进程相关参数

    • –total-executor-cores :仅当用Spark 的【standalone】方式才有意义,表示所有的executor一共申请多少个【核数】
    • –executor-cores :仅当Spark 的【standalone】方式以及【yarn】方式才有意义,表示每个executor进程申请几个【核数】。yarn默认是1个,standalone默认是Worker的全部的core。企业中一般指定为3-5个,通常是4.
    • –num-executors :仅当是【Yarn】时才有意义,表示一共申请多少个【Executor】。
    • –executor-memory 申请的【Executor】的【内存】有多大。Executor进程中的一个core可以支配3-5G的内存,如果是3G,executor-memory 最好设为executor-cores*3
    • –queue 就是用来隔离CPU和内存资源的。每个队列中都包含了指定容量的CPU和内存

常见问题

  • 为什么Spark On Yarn 时通过ApplicationMaster去向ResourceManager申请启动Executors,而不是通过Driver去干这些事情?

    • 因为,回顾在standalone集群模式中,Driver向Master申请Worker的资源用于启动Executor,而同样的,在Yarn集群模式中,Spark应用需要向ResourceManager申请NodeManager的资源用于启动Executor,这个职责可以让ApplicationMaster去做,而不是让Driver去做,可以让Driver更轻松,达到解耦的目的。
  • 为什么用yarn-client方式可以在提交的机器上看到打印的结果,但是用yarn-cluster看不到,需要去yarn的历史日志才能看到?

    • 因为用yarn-client方式,Driver进程就是启动在Client上,打印的结果显示在Client机器上。但是用yarn-cluster运行Spark程序时,Driver是某台随机的NodeManager,它就算打印了,也不会显示在当前的提交机器上,但是yarn的历史日志服务,会收集所有yarn节点的日志,所以可以在yarn的历史日志页面(yarn的node1:8088页面,点击logs,再跳转19888端口页面)中查看到程序的结果。
  • 为什么企业中都使用-yarn-cluster模式?

    • 1、因为这种方式,使得Driver向Executor们的通信都在同一个集群的网段中,通信效率高。而反之yarn-client模式的Driver在提交的网段,Executor们在yarn集群的网段,2个网段可能不一样,通信效率低。
    • 2、一般在企业中提交的机器就那么1-2台,如果大家都在同一台机器上用client模式运行各自的Spark应用,那么会造成所有的Driver进程都启动在这台机器上,造成这台机器的负担很重,造成所有的任务都受到影响。而随机的空闲的WorkerNode分散了各个Driver,负载均衡。