本文作者:KTV免费预定

封装spark成服务(spark 闭包)

KTV免费预定 2024年08月14日 18:29:10 1

本文目录一览:

Spark从入门到精通3:Spark全分布模式的安装和配置

Spark的安装模式一般分为三种:1.伪分布模式:即在一个节点上模拟一个分布式环境,master和worker共用一个节点,这种模式一般用于开发和测试Spark程序;2.全分布模式:即真正的集群模式,master和worker部署在不同的节点之上,一般至少需要3个节点(1个master和2个worker),这种模式一般用于实际的生产环境;3.HA集群模式:即高可用集群模式,一般至少需要4台机器(1个主master,1个备master,2个worker),这种模式的优点是在主master宕机之后,备master会立即启动担任master的职责,可以保证集群高效稳定的运行,这种模式就是实际生产环境中多采用的模式。本小节来介绍Spark的全分布模式的安装和配置。

安装介质:

jdk-8u162-linux-x64.tar.gz 提取码:2bh8

hadoop-2.7.3.tar.gz 提取码:d4g2

scala-2.12.6.tgz 提取码:s2ly

spark-2.1.0-bin-hadoop2.7.tgz 提取码:5kcf

准备3台Linux主机,按照下面的步骤在每台主机上执行一遍,设置成如下结果:

安装Linux操作系统比较简单,这里不再详细。参考:《 Linux从入门到精通1:使用 VMware Workstation 14 Pro 安装 CentOS 7 详细图文教程 》

编辑hosts配置文件:# vi /etc/hosts,追加3行:

测试主机名是否可用:

(1)使用ssh-keygen工具生成秘钥对:

(2)将生成的公钥发给三台主机:master、slave1、slave2:

(3)测试秘钥认证是否成功:

由于各个主机上的时间可能不一致,会导致执行Spark程序出现异常,因此需要同步各个主机的时间。在实际生成环境中,一般使用时间服务器来同步时间,但是搭建时间服务器相对较为复杂。这里介绍一种简单的方法来快速同步每台主机主机的时间。我们知道,使用date命令可以设置主机的时间,因此这里使用putty的插件MTPuTTY来同时向每一台主机发送date命令,以到达同步时间的目的。

(1)使用MTPuTTY工具连接三台主机,点击MTPuTTY工具的Tools菜单下的“Send script…”子菜单,打开发送脚本工具窗口。

(2)输入命令:date -s 2018-05-28,然后回车(注意:一定要回车,否则只发送不执行),在下面服务器列表中选择要同步的主机,然后点击“Send script”,即可将时间同步为2018-05-28 00:00:00。

使用winscp工具将JDK安装包 jdk-8u144-linux-x64.tar.gz 上传到/root/tools/目录中,该目录是事先创建的。

进入/root/tools/目录,将jdk安装包解压到/root/training/目录中,该目录也是事先创建的。

使用winscp工具将Hadoop安装包 hadoop-2.7.3.tar.gz 上传到master节点的/root/tools/目录中,该目录是事先创建的。

进入/root/tools/目录,将hadoop安装包解压到/root/training/目录中,该目录也是事先创建的。

进入Hadoop配置文件目录:

(1) 配置hadoop-env.sh文件:

(2) 配置hdfs-site.xml文件:

(3) 配置core-site.xml文件:

(4) 配置mapred-site.xml文件:

将模板文件mapred-site.xml.template拷贝一份重命名为mapred-site.xml然后编辑:

(5) 配置yarn-site.xml文件:

(6) 配置slaves文件:

将master上配置好的Hadoop安装目录分别复制给两个从节点slave1和slave2,并验证是否成功。

第一次启动需要输入yes继续。

启动成功后,使用jps命令查看各个节点上开启的进程:

使用命令行查看HDFS的状态:

使用浏览器查看HDFS的状态:

使用浏览器查看YARN的状态:

(1) 在HDFS上创建输入目录/input:

(2) 将本地数据文件data.txt上传至该目录:

(3) 进入到Hadoop的示例程序目录:

(4) 执行示例程序中的Wordcount程序,以HDFS上的/input/data.txt作为输入数据,输出结果存放到HDFS上的/out/wc目录下:

(5) 查看进度和结果:

可以通过终端打印出来的日志信息知道执行进度:

执行结束后可以在HDFS上的/out/wc目录下查看是否有_SUCCESS标志文件来判断是否执行成功。

如果执行成功,可以在输出目录下看到_SUCCESS标志文件,且可以在part-r-00000文件中查看到wordcount程序的结果:

由于Scala只是一个应用软件,只需要安装在master节点即可。

使用winscp工具将Scala安装包上传到master节点的/root/tools目录下:

进入/root/tools目录,将Scala安装包解压到安装目录/root/training/:

将Scala的家目录加入到环境变量PATH中:

使环境变量生效:

输入scala命令,如下进入scala环境,则证明scala安装成功:

我们先在master节点上配置好参数,再分发给两个从节点slave1和slave2。

使用winscp工具将Spark安装包上传到master节点的/root/tools目录下:

进入/root/tools目录,将Spark安装包解压到安装目录/root/training/下:

注意:由于Spark的命令脚本和Hadoop的命令脚本有冲突(比如都有start-all.sh和stop-all.sh等),

所以这里需要注释掉Hadoop的环境变量,添加Spark的环境变量:

按Esc:wq保存退出,使用source命令使配置文件立即生效:

进入Spark的配置文件目录下:

(1) 配置spark-env.sh文件:

(2) 配置slaves文件:

将master上配置好的Spark安装目录分别复制给两个从节点slave1和slave2,并验证是否成功。

启动后查看每个节点上的进程:

使用浏览器监控Spark的状态:

使用spark-shell命令进入SparkContext(即Scala环境):

启动了spark-shell之后,可以使用4040端口访问其Web控制台页面(注意:如果一台机器上启动了多个spark-shell,即运行了多个SparkContext,那么端口会自动连续递增,如4041,4042,4043等等):

注意:由于我们将Hadoop从环境变量中注释掉了,这时只能手动进入到Hadoop的sbin目录停止Hadoop:

Spark中常用的端口总结:

spark几种部署模式,每种模式特点及搭建

下面对集中部署模式进行详细介绍

该模式运行任务不会提交在集群中,只在本节点执行,有两种情况

运行该模式非常简单,只需要把Spark的安装包解压后,改一些常用的配置即可使用,而不用启动Spark的Master、Worker守护进程( 只有集群的Standalone方式时,才需要这两个角色),也不用启动Hadoop的各服务(除非你要用到HDFS)。

Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。将Spark应用以多线程的方式直接运行在本地,一般都是为了方便调试,本地单机模式分三类:

搭建步骤:

(中间有报错:raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)

pyspark.sql.utils.IllegalArgumentException: u'Unable to locate hive jars to connect to metastore. Please set spark.sql.hive.metastore.jars.',网上提示查看jdk版本,发现ubuntu 18.04默认是openjdk-11-jdk包(java -version提示10.0.1)。重新安装openjdk-8-jdk版本不报错)

运行:

使用spark-shell、spark-submit、pyspark

例如使用spark-shell:

local:单机、单核运行

local[k]:启动k个executor

local[ ]:启动跟cpu数目相同的 executor*

上述情况中,local[N]与local[*]相当于用单机的多个线程来模拟spark分布式计算,通常用来检验开发出来的程序逻辑上有没有问题。

其中N代表可以使用N个线程,每个线程拥有一个core。

这些任务的线程,共享在一个进程中,可以开到,在程序的执行过程中只会产生一个进程,这个进程揽下了所有的任务,既是客户提交任务的client进程,又是spark的driver程序,还是spark执行task的executor

这种运行模式,和Local[N]很像,不同的是,它会在单机启动多个进程来模拟集群下的分布式场景,而不像Local[N]这种多个线程只能在一个进程下委屈求全的共享资源。通常也是用来验证开发出来的应用程序逻辑上有没有问题,或者想使用Spark的计算框架而没有太多资源。

用法:提交应用程序时使用local-cluster[x,y,z]参数:x代表要生成的executor数,y和z分别代表每个executor所拥有的core和memory数。

上面这条命令代表会使用2个executor进程,每个进程分配3个core和1G的内存,来运行应用程序。可以看到,在程序执行过程中,会生成如下几个进程:

如何在本地安装运行Spark?

2.1.2 在Windows上安装与配置Spark

本节介绍在Windows系统上安装Spark的过程。在Windows环境下需要安装Cygwin模拟Linux的命令行环境来安装Spark。

(1)安装JDK

相对于Linux、Windows的JDK安装更加自动化封装spark成服务,用户可以下载安装Oracle JDK或者OpenJDK。只安装JRE是不够的封装spark成服务,用户应该下载整个JDK。

安装过程十分简单封装spark成服务,运行二进制可执行文件即可,程序会自动配置环境变量。

(2)安装Cygwin

Cygwin是在Windows平台下模拟Linux环境的一个非常有用的工具,只有通过它才可以在Windows环境下安装Hadoop和Spark。具体安装步骤如下。

1)运行安装程序,选择install from internet。

2)选择网络最好的下载源进行下载。

3)进入Select Packages界面(见图2-2),然后进入Net,选择openssl及openssh。因为之后还是会用到ssh无密钥登录的。

另外应该安装“Editors Category”下面的“vim”。这样就可以在Cygwin上方便地修改配置文件。

最后需要配置环境变量,依次选择“我的电脑”→“属性”→“高级系统设置”→“环境变量”命令,更新环境变量中的path设置,在其后添加Cygwin的bin目录和Cygwin的usr\bin两个目录。

(3)安装sshd并配置免密码登录

1)双击桌面上的Cygwin图标,启动Cygwin,执行ssh-host-config -y命令,出现如图2-3所示的界面。

2)执行后,提示输入密码,否则会退出该配置,此时输入密码和确认密码,按回车键。最后出现Host configuration finished.Have fun!表示安装成功。

3)输入net start sshd,启动服务。或者在系统的服务中找到并启动Cygwin sshd服务。

注意,如果是Windows 8操作系统,启动Cygwin时,需要以管理员身份运行(右击图标,选择以管理员身份运行),否则会因为权限问题,提示“发生系统错误5”。

(4)配置SSH免密码登录

1)执行ssh-keygen命令生成密钥文件,如图2-4所示。

2)执行此命令后,在封装spark成服务你的Cygwin\home\用户名路径下面会生成.ssh文件夹,可以通过命令ls -a /home/用户名 查看,通过ssh -version命令查看版本。

3)执行完ssh-keygen命令后,再执行下面命令,生成authorized_keys文件。

cd ~/.ssh/

cp id_dsa.pub authorized_keys

这样就配置好封装spark成服务了sshd服务。

(5)配置Hadoop

修改和配置相关文件与Linux的配置一致,读者可以参照上文Linux中的配置方式,这里不再赘述。

(6)配置Spark

修改和配置相关文件与Linux的配置一致,读者可以参照上文Linux中的配置方式,这里不再赘述。

(7)运行Spark

1)Spark的启动与关闭

①在Spark根目录启动Spark。

./sbin/start-all.sh

②关闭Spark。

./sbin/stop-all.sh

2)Hadoop的启动与关闭

①在Hadoop根目录启动Hadoop。

./sbin/start-all.sh

②关闭Hadoop。

./sbin/stop-all.sh

3)检测是否安装成功

正常状态下会出现如下内容。

-bash-4.1# jps

23526 Jps

2127 Master

7396 NameNode

7594 SecondaryNameNode

7681 ResourceManager

1053 DataNode

31935 NodeManager

1405 Worker

如缺少进程请到logs文件夹下查看相应日志,针对具体问题进行解决。

spark安装与运行模式

Spark 的运行模式有 Local(也称单节点模式),Standalone(集群模式),Spark on Yarn(运行在Yarn上),Mesos以及K8s等常用模式,本文介绍前三种模式。

Spark-shell 参数

Spark-shell 是以一种交互式命令行方式将Spark应用程序跑在指定模式上,也可以通过Spark-submit提交指定运用程序,Spark-shell 底层调用的是Spark-submit,二者的使用参数一致的,通过- -help 查看参数:

sparkconf的传入有三种方式:

1.通过在spark应用程序开发的时候用set()方法进行指定

2.通过在spark应用程序提交的时候用过以上参数指定,一般使用此种方式,因为使用较为灵活

3.通过配置spark-default.conf,spark-env.sh文件进行指定,此种方式较shell方式级别低

Local模式

Local 模式是最简单的一种Spark运行方式,它采用单节点多线程(cpu)方式运行,local模式是一种OOTB(开箱即用)的方式,只需要在spark-env.sh导出JAVA_HOME,无需其他任何配置即可使用,因而常用于开发和学习

方式:./spark-shell - -master local[n] ,n代表线程数

Standalone模式

Spark on Yarn

on Yarn的俩种模式

客户端的Driver将应用提交给Yarn后,Yarn会先后启动ApplicationMaster和excutor,另外ApplicationMaster和executor都装在在container里运行,container默认的内存是1g,ApplicationMaster分配的内存是driver-memory,executor分配的内存是executor-memory.同时,因为Driver在客户端,所以程序的运行结果可以在客户端显示,Driver以进程名为SparkSubmit的形式存在。

Cluster 模式

1.由client向ResourceManager提交请求,并上传Jar到HDFS上

这期间包括四个步骤:

a).连接到RM

b).从RM ASM(applicationsManager)中获得metric,queue和resource等信息。

c).upload app jar and spark-assembly jar

d).设置运行环境和container上下文

2.ResourceManager向NodeManager申请资源,创建Spark ApplicationMaster(每个SparkContext都有一个ApplicationManager)

3.NodeManager启动Spark App Master,并向ResourceManager ASM注册

4.Spark ApplicationMaster从HDFS中找到jar文件,启动DAGScheduler和YARN Cluster Scheduler

5.ResourceManager向ResourceManager ASM注册申请container资源(INFO YarnClientImpl: Submitted application)

6.ResourceManager通知NodeManager分配Container,这是可以收到来自ASM关于container的报告。(每个container的对应一个executor)

7.Spark ApplicationMaster直接和container(executor)进行交互,完成这个分布式任务。

进入spark安装目录下的conf文件夹

[atguigu@hadoop102 module] mv slaves.template slaves

[atguigu@hadoop102 conf] vim slaves

hadoop102

hadoop103

hadoop104

4)修改spark-env.sh文件,添加如下配置:

[atguigu@hadoop102 conf]$ vim spark-env.sh

SPARK_MASTER_HOST=hadoop102

SPARK_MASTER_PORT=7077

5)分发spark包

[atguigu@hadoop102 module] sbin/start-all.sh

注意:如果遇到 “JAVA_HOME not set” 异常,可以在sbin目录下的spark-config.sh 文件中加入如下配置:

export JAVA_HOME=XXXX

官方求PI案例

spark-submit

--class org.apache.spark.examples.SparkPi

--master spark://server-2:7077

--executor-memory 1G

--total-executor-cores 2

/home/xxx/software/spark-2.4.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.4.jar

100

spark-shell

--master spark://server-2:7077

--executor-memory 1g

--total-executor-cores 2

spark-shell --master spark://server-2:7077 --executor-memory 1g --total-executor-cores 2

参数:--master spark://server-2:7077 指定要连接的集群的master

Spark客户端直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。

yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出

yarn-cluster:Driver程序运行在由RM(ResourceManager)启动的AP(APPMaster)适用于生产环境。

安装使用

1)修改hadoop配置文件yarn-site.xml,添加如下内容:

2)修改spark-env.sh,添加如下配置:

[atguigu@hadoop102 conf]$ vi spark-env.sh

YARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop

3)分发配置文件

[atguigu@hadoop102 conf] xsync spark-env.sh

4)执行一个程序

spark-submit

--class org.apache.spark.examples.SparkPi

--master yarn

--deploy-mode client

/home/xxx/software/spark-2.4.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.4.jar

100

注意:在提交任务之前需启动HDFS以及YARN集群。

日志查看

修改配置文件spark-defaults.conf

添加如下内容:

spark.yarn.historyServer.address=server-2:18080

spark.history.ui.port=18080

2)重启spark历史服务

[atguigu@hadoop102 spark] sbin/start-history-server.sh

starting org.apache.spark.deploy.history.HistoryServer, logging to /opt/module/spark/logs/spark-atguigu-org.apache.spark.deploy.history.HistoryServer-1-hadoop102.out

3)提交任务到Yarn执行

spark-submit

--class org.apache.spark.examples.SparkPi

--master yarn

--deploy-mode client

/home/xxx/software/spark-2.4.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.4.jar

100

Spark内存管理详解(下)——内存管理

弹性分布式数据集(RDD)作为Spark最根本的数据抽象,是只读的分区记录(Partition)的集合,只能基于在稳定物理存储中的数据集上创建,或者在其他已有的RDD上执行转换(Transformation)操作产生一个新的RDD。转换后的RDD与原始的RDD之间产生的依赖关系,构成了血统(Lineage)。凭借血统,Spark保证了每一个RDD都可以被重新恢复。但RDD的所有转换都是惰性的,即只有当一个返回结果给Driver的行动(Action)发生时,Spark才会创建任务读取RDD,然后真正触发转换的执行。

Task在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查Checkpoint或按照血统重新计算。所以如果一个RDD上要执行多次行动,可以在第一次行动中使用persist或cache方法,在内存或磁盘中持久化或缓存这个RDD,从而在后面的行动时提升计算速度。事实上,cache方法是使用默认的MEMORY_ONLY的存储级别将RDD持久化到内存,故缓存是一种特殊的持久化。 堆内和堆外存储内存的设计,便可以对缓存RDD时使用的内存做统一的规划和管理 (存储内存的其他应用场景,如缓存broadcast数据,暂时不在本文的讨论范围之内)。

RDD的持久化由Spark的Storage模块 [1] 负责,实现了RDD与物理存储的解耦合。Storage模块负责管理Spark在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。在具体实现时Driver端和Executor端的Storage模块构成了主从式的架构,即Driver端的BlockManager为Master,Executor端的BlockManager为Slave。Storage模块在逻辑上以Block为基本存储单位,RDD的每个Partition经过处理后唯一对应一个Block(BlockId的格式为 rdd_RDD-ID_PARTITION-ID )。Master负责整个Spark应用程序的Block的元数据信息的管理和维护,而Slave需要将Block的更新等状态上报到Master,同时接收Master的命令,例如新增或删除一个RDD。

在对RDD持久化时,Spark规定了MEMORY_ONLY、MEMORY_AND_DISK等7种不同的 存储级别 ,而存储级别是以下5个变量的组合 [2] :

通过对数据结构的分析,可以看出存储级别从三个维度定义了RDD的Partition(同时也就是Block)的存储方式:

RDD在缓存到存储内存之前,Partition中的数据一般以迭代器( Iterator )的数据结构来访问,这是Scala语言中一种遍历数据集合的方法。通过Iterator可以获取分区中每一条序列化或者非序列化的数据项(Record),这些Record的对象实例在逻辑上占用了JVM堆内内存的other部分的空间,同一Partition的不同Record的空间并不连续。

RDD在缓存到存储内存之后,Partition被转换成Block,Record在堆内或堆外存储内存中占用一块连续的空间。 将Partition由不连续的存储空间转换为连续存储空间的过程,Spark称之为“展开”(Unroll) 。Block有序列化和非序列化两种存储格式,具体以哪种方式取决于该RDD的存储级别。非序列化的Block以一种DeserializedMemoryEntry的数据结构定义,用一个数组存储所有的Java对象,序列化的Block则以SerializedMemoryEntry的数据结构定义,用字节缓冲区(ByteBuffer)来存储二进制数据。每个Executor的Storage模块用一个链式Map结构(LinkedHashMap)来管理堆内和堆外存储内存中所有的Block对象的实例 [6] ,对这个LinkedHashMap新增和删除间接记录了内存的申请和释放。

因为不能保证存储空间可以一次容纳Iterator中的所有数据,当前的计算任务在Unroll时要向MemoryManager申请足够的Unroll空间来临时占位,空间不足则Unroll失败,空间足够时可以继续进行。对于序列化的Partition,其所需的Unroll空间可以直接累加计算,一次申请。而非序列化的Partition则要在遍历Record的过程中依次申请,即每读取一条Record,采样估算其所需的Unroll空间并进行申请,空间不足时可以中断,释放已占用的Unroll空间。如果最终Unroll成功,当前Partition所占用的Unroll空间被转换为正常的缓存RDD的存储空间,如下图2所示。

在 《Spark内存管理详解(上)——内存分配》 的图3和图5中可以看到,在静态内存管理时,Spark在存储内存中专门划分了一块Unroll空间,其大小是固定的,统一内存管理时则没有对Unroll空间进行特别区分,当存储空间不足是会根据动态占用机制进行处理。

由于同一个Executor的所有的计算任务共享有限的存储内存空间,当有新的Block需要缓存但是剩余空间不足且无法动态占用时,就要对LinkedHashMap中的旧Block进行淘汰(Eviction),而被淘汰的Block如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘(Drop),否则直接删除该Block。

存储内存的淘汰规则为:

落盘的流程则比较简单,如果其存储级别符合 _useDisk 为true的条件,再根据其 _deserialized 判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在Storage模块中更新其信息。

Executor内运行的任务同样共享执行内存,Spark用一个HashMap结构保存了任务到内存耗费的映射。每个任务可占用的执行内存大小的范围为 1/2N ~ 1/N ,其中N为当前Executor内正在运行的任务的个数。每个任务在启动之时,要向MemoryManager请求申请最少为1/2N的执行内存,如果不能被满足要求则该任务被阻塞,直到有其他任务释放了足够的执行内存,该任务才可以被唤醒。

执行内存主要用来存储任务在执行Shuffle时占用的内存,Shuffle是按照一定规则对RDD数据重新分区的过程,我们来看Shuffle的Write和Read两阶段对执行内存的使用:

在ExternalSorter和Aggregator中,Spark会使用一种叫AppendOnlyMap的哈希表在堆内执行内存中存储数据,但在Shuffle过程中所有数据并不能都保存到该哈希表中,当这个哈希表占用的内存会进行周期性地采样估算,当其大到一定程度,无法再从MemoryManager申请到新的执行内存时,Spark就会将其全部内容存储到磁盘文件中,这个过程被称为溢存(Spill),溢存到磁盘的文件最后会被归并(Merge)。

Shuffle Write阶段中用到的Tungsten是Databricks公司提出的对Spark优化内存和CPU使用的计划 [4] ,解决了一些JVM在性能上的限制和弊端。Spark会根据Shuffle的情况来自动选择是否采用Tungsten排序。Tungsten采用的页式内存管理机制建立在MemoryManager之上,即Tungsten对执行内存的使用进行了一步的抽象,这样在Shuffle过程中无需关心数据具体存储在堆内还是堆外。每个内存页用一个MemoryBlock来定义,并用 Object obj 和 long offset 这两个变量统一标识一个内存页在系统内存中的地址。堆内的MemoryBlock是以long型数组的形式分配的内存,其 obj 的值为是这个数组的对象引用, offset 是long型数组的在JVM中的初始偏移地址,两者配合使用可以定位这个数组在堆内的绝对地址;堆外的MemoryBlock是直接申请到的内存块,其 obj 为null, offset 是这个内存块在系统内存中的64位绝对地址。Spark用MemoryBlock巧妙地将堆内和堆外内存页统一抽象封装,并用页表(pageTable)管理每个Task申请到的内存页。

Tungsten页式管理下的所有内存用64位的逻辑地址表示,由页号和页内偏移量组成:

有了统一的寻址方式,Spark可以用64位逻辑地址的指针定位到堆内或堆外的内存,整个Shuffle Write排序的过程只需要对指针进行排序,并且无需反序列化,整个过程非常高效,对于内存访问效率和CPU使用效率带来了明显的提升 [5] 。

Spark的存储内存和执行内存有着截然不同的管理方式:对于存储内存来说,Spark用一个LinkedHashMap来集中管理所有的Block,Block由需要缓存的RDD的Partition转化而成;而对于执行内存,Spark用AppendOnlyMap来存储Shuffle过程中的数据,在Tungsten排序中甚至抽象成为页式内存管理,开辟了全新的JVM内存管理机制。

Spark的内存管理是一套复杂的机制,且Spark的版本更新比较快,笔者水平有限,难免有叙述不清、错误的地方,若读者有好的建议和更深的理解,还望不吝赐教。

如何使用OpenStack,Docker和Spark打造一个云服务

蘑菇街基于 OpenStack 和 Docker 的私有云实践

本次主要想分享一下过去一年时间里,我们在建设基于Docker的私有云实践过程中,曾经遇到过的问题,如何解决的经验,还有我们的体会和思考,与大家共勉。

在生产环境中使用Docker有一些经历和经验。私有云项目是2014年圣诞节期间上线的,从无到有,经过了半年多的发展,经历了3次大促,已经逐渐形成了一定的规模。

架构

集群管理

大家知道,Docker自身的集群管理能力在当时条件下还很不成熟,因此我们没有选择刚出现的 Swarm,而是用了业界最成熟的OpenStack,这样能同时管理Docker和KVM。我们把Docker当成虚拟机来跑,是为了能满足业务上对虚拟化的需求。今后的思路是微服务化,把应用进行拆分,变成一个个微服务,实现PaaS基于应用的部署和发布。

通过OpenStack如何管理Docker?我们采用的是OpenStack+nova-docker+Docker的架构模式。nova- docker是StackForge上一个开源项目,它做为nova的一个插件,通过调用Docker的RESTful接口来控制容器的启停等动作。

我们在IaaS基础上自研了编排调度等组件,支持应用的弹性伸缩、灰度升级等功能,并支持一定的调度策略,从而实现了PaaS层的主要功能。

同时,基于Docker和Jenkins实现了持续集成(CI)。Git中的项目如果发生了git push等动作,便会触发Jenkins Job进行自动构建,如果构建成功便会生成Docker Image并push到镜像仓库。基于CI生成的Docker Image,可以通过PaaS的API或界面,进行开发测试环境的实例更新,并最终进行生产环境的实例更新,从而实现持续集成和持续交付。

网络和存储

网络方面,我们没有采用Docker默认提供的NAT网络模式,NAT会造成一定的性能损失。通过OpenStack,我们支持Linux bridge和Open vSwitch,不需要启动iptables,Docker的性能接近物理机的95%。

容器的监控

监控方面,我们自研了container tools,实现了容器load值的计算,替换了原有的top、free、iostat、uptime等命令。这样业务方在容器内使用常用命令时看到的是容器的值,而不是整个物理机的。目前我们正在移植Lxcfs到我们的平台上。

我们还在宿主机上增加了多个阈值监控和报警,比如关键进程监控、日志监控、实时pid数量、网络连接跟踪数、容器oom报警等等。

冗灾和隔离性

冗灾和隔离性方面,我们做了大量的冗灾预案和技术准备。我们能够在不启动docker daemon的情况下,离线恢复Docker中的数据。同时,我们支持Docker的跨物理机冷迁移,支持动态的CPU扩容/缩容,网络IO磁盘IO的限速。

遇到的问题及解决方法

接近一年不到的产品化和实际使用中我们遇到过各种的问题,使用Docker的过程也是不断优化Docker、不断定位问题、解决问题的过程。

我们现在的生产环境用的是CentOS 6.5。曾经有个业务方误以为他用的Docker容器是物理机,在Docker容器里面又装了一个Docker,瞬间导致内核crash,影响了同一台物理机的其他Docker容器。

经过事后分析是2.6.32-431版本的内核对network namespace支持不好引起的,在Docker内创建bridge会导致内核crash。upstream修复了这个bug,从2.6.32-431升级到2.6.32-504后问题解决。

还有一个用户写的程序有bug,创建的线程没有及时回收,容器中产生了大量的线程,最后在宿主机上都无法执行命令或者ssh登陆,报的错是"bash: fork: Cannot allocate memory",但通过free看空闲的内存却是足够的。

经过分析,发现是内核对pid的隔离性支持不完善,pid_max(/proc/sys/kernel/pid_max)是全局共享的。当一个容器中的pid数目达到上限32768,会导致宿主机和其他容器无法创建新的进程。最新的4.3-rc1才支持对每个容器进行pid_max限制。

我们还观察到docker的宿主机内核日志中会产生乱序的问题。经过分析后发现是由于内核中只有一个log_buf缓冲区,所有printk打印的日志先放到这个缓冲区中,docker host以及container上的rsyslogd都会通过syslog从kernel的log_buf缓冲区中取日志,导致日志混乱。通过修改 container里的rsyslog配置,只让宿主机去读kernel日志,就能解决这个问题。

除此之外,我们还解决了device mapper的dm-thin discard导致内核crash等问题。

体会和思考

最后分享一下我们的体会和思考,相比KVM比较成熟的虚拟化技术,容器目前还有很多不完善的地方,除了集群管理、网络和存储,最重要的还是稳定性。影响稳定性的主要还是隔离性的不完善造成的,一个容器内引起的问题可能会影响整个系统。

容器的memcg无法回收slab cache,也不对dirty cache量进行限制,更容易发生OOM问题。还有,procfs上的一些文件接口还无法做到per-container,比如pid_max。

另外一点是对容器下的运维手段和运维经验的冲击。有些系统维护工具,比如ss,free,df等在容器中无法使用了,或者使用的结果跟物理机不一致,因为系统维护工具一般都会访问procfs下的文件,而这些工具或是需要改造,或是需要进行适配。

虽然容器还不完善,但是我们还是十分坚定的看好容器未来的发展。Kubernetes、Mesos、Hyper、CRIU、runC等容器相关的开源软件,都是我们关注的重点。

QA

Q:请问容器间的负载均衡是如何做的?

A: 容器间的负载均衡,更多是PaaS和SaaS层面的。我们的P层支持4层和7层的动态路由,通过域名的方式,或者名字服务来暴露出对外的接口。我们能够做到基于容器的灰度升级,和弹性伸缩。

Q:请问你们的OpenStack是运行在CentOS 6.5上的吗?

A: 是的,但是我们针对OpenStack和Docker依赖的包进行了升级。我们维护了内部的yum源。

Q:请问容器IP是静态编排还是动态获取的?

A: 这个跟运维所管理的网络模式有关,我们内部的网络没有DHCP服务,因此对于IaaS层,容器的IP是静态分配的。对于PaaS层来说,如果有DHCP服务,容器的App所暴露出来IP和端口就可以做到动态的。

Q:请问你们当时部署的时候有没有尝试过用Ubuntu,有没有研究过两个系统间的区别,另外请问你们在OpenStack上是怎样对这些虚拟机监控的?

A: 我们没有尝试过Ubuntu,因为公司生产环境上用的是CentOS。我们的中间件团队负责公司机器的监控,我们和监控团队配合,将监控的agent程序部署到宿主机和每个容器里,这样就可以当成虚拟机来进行监控。

当然,容器的数据是需要从cgroups里来取,这部分提取数据的工作,是我们来实现的。

Q:容器间的网络选型有什么建议,据说采用虚拟网卡比物理网卡有不小的性能损失,Docker自带的weaves和ovs能胜任吗?

A: 容器的网络不建议用默认的NAT方式,因为NAT会造成一定的性能损失。之前我的分享中提到过,不需要启动iptables,Docker的性能接近物理机的95%。Docker的weaves底层应该还是采用了网桥或者Open vSwitch。建议可以看一下nova-docker的源码,这样会比较容易理解。

Q:静态IP通过LXC实现的吗?

A: 静态IP的实现是在nova-docker的novadocker/virt/docker/vifs.py中实现的。实现的原理就是通过ip命令添加 veth pair,然后用ip link set/ip netns exec等一系列命令来实现的,设置的原理和weaves类似。

Q:容器内的进程gdb你们怎么弄的,把gdb打包到容器内吗?

A: 容器内的gdb不会有问题的,可以直接yum install gdb。

Q:共享存储能直接mount到容器里吗?

A: 虽然没试过,但这个通过docker -v的方式应该没什么问题。

Q:不启动Docker Daemon的情况下,离线恢复Docker中的数据是咋做到的?

A: 离线恢复的原理是用dmsetup create命令创建一个临时的dm设备,映射到Docker实例所用的dm设备号,通过mount这个临时设备,就可以恢复出原来的数据。

Q:Docker的跨物理机冷迁移,支持动态的CPU扩容/缩容,网络IO磁盘IO的限速,是怎么实现的,能具体说说吗?

A:Docker的冷迁移是通过修改nova-docker,来实现OpenStack迁移的接口,具体来说,就是在两台物理机间通过docker commit,docker push到内部的registry,然后docker pull snapshot来完成的。

动态的CPU扩容/缩容,网络IO磁盘IO的限速主要是通过novadocker来修改cgroups中的cpuset、iops、bps还有TC的参数来实现的。

Q:请问你们未来会不会考虑使用Magnum项目,还是会选择Swarm?

A:这些都是我们备选的方案,可能会考虑Swarm。因为Magnum底层还是调用了Kubernetes这样的集群管理方案,与其用Magnum,不如直接选择Swarm或者是Kubernetes。当然,这只是我个人的看法。

Q:你们的业务是基于同一个镜像么,如果是不同的镜像,那么计算节点如何保证容器能够快速启动?

A:运维会维护一套统一的基础镜像。其他业务的镜像会基于这个镜像来制作。我们在初始化计算节点的时候就会通过docker pull把基础镜像拉到本地,这也是很多公司通用的做法,据我了解,腾讯、360都是类似的做法。

Q:做热迁移,有没有考虑继续使用传统共享存储的来做?

A: 分布式存储和共享存储都在考虑范围内,我们下一步,就计划做容器的热迁移。

Q:请问你们是直接将公网IP绑定到容器吗,还是通过其他方式映射到容器的私有IP,如果是映射如何解决原本二层的VLAN隔离?

A:因为我们是私有云,不涉及floating ip的问题,所以你可以认为是公网IP。VLAN的二层隔离完全可以在交换机上作。我们用Open vSwitch划分不同的VLAN,就实现了Docker容器和物理机的网络隔离。

Q:Device mapper dm-thin discard问题能说的详细些吗?

A:4月份的时候,有两台宿主机经常无故重启。首先想到的是查看/var/log/messages日志,但是在重启时间点附近没有找到与重启相关的信息。而后在/var/crash目录下,找到了内核crash的日志vmcore-dmesg.txt。日志的生成时间与宿主机重启时间一致,可以说明宿主机是发生了kernel crash然后导致的自动重启。“kernel BUG at drivers/md/persistent-data/dm-btree-remove.c:181!”。 从堆栈可以看出在做dm-thin的discard操作(process prepared discard),虽然不知道引起bug的根本原因,但是直接原因是discard操作引发的,可以关闭discard support来规避。

我们将所有的宿主机配置都禁用discard功能后,再没有出现过同样的问题。

在今年CNUTCon的大会上,腾讯和大众点评在分享他们使用Docker的时候也提到了这个crash,他们的解决方法和我们完全一样。

Q:阈值监控和告警那块,有高中低多种级别的告警吗,如果当前出现低级告警,是否会采取一些限制用户接入或者砍掉当前用户正在使用的业务,还是任由事态发展?

A:告警这块,运维有专门的PE负责线上业务的稳定性。当出现告警时,业务方和PE会同时收到告警信息。如果是影响单个虚拟机的,PE会告知业务方,如果严重的,甚至可以及时下掉业务。我们会和PE合作,让业务方及时将业务迁移走。

Q:你们自研的container tools有没有开源,GitHub上有没有你们的代码,如何还没开源,后期有望开源吗,关于监控容器的细粒度,你们是如何考虑的?

A:虽然我们目前还没有开源,单我觉得开源出来的是完全没问题的,请大家等我们的好消息。关于监控容器的细粒度,主要想法是在宿主机层面来监控容器的健康状态,而容器内部的监控,是由业务方来做的。

Q:请问容器的layer有关心过层数么,底层的文件系统是ext4么,有优化策略么?

A:当然有关心,我们通过合并镜像层次来优化docker pull镜像的时间。在docker pull时,每一层校验的耗时很长,通过减小层数,不仅大小变小,docker pull时间也大幅缩短。

Q:容器的memcg无法回收slab cache,也不对dirty cache量进行限制,更容易发生OOM问题。----这个缓存问题你们是怎么处理的?

A:我们根据实际的经验值,把一部分的cache当做used内存来计算,尽量逼近真实的使用值。另外针对容器,内存报警阈值适当调低。同时添加容器OOM的告警。如果升级到CentOS 7,还可以配置kmem.limit_in_bytes来做一定的限制。

Q:能详细介绍下你们容器网络的隔离?

A:访问隔离,目前二层隔离我们主要用VLAN,后面也会考虑VXLAN做隔离。 网络流控,我们是就是使用OVS自带的基于port的QoS,底层用的还是TC,后面还会考虑基于flow的流控。

Q:请问你们这一套都是用的CentOS 6.5吗,这样技术的实现。是运维还是开发参与的多?

A:生产环境上稳定性是第一位的。CentOS 6.5主要是运维负责全公司的统一维护。我们会给运维在大版本升级时提建议。同时做好虚拟化本身的稳定性工作。

Q:请问容器和容器直接是怎么通信的?网络怎么设置?

A:你是指同一台物理机上的吗?我们目前还是通过IP方式来进行通信。具体的网络可以采用网桥模式,或者VLAN模式。我们用Open vSwitch支持VLAN模式,可以做到容器间的隔离或者通信。

Q:你们是使用nova-api的方式集成Dcoker吗,Docker的高级特性是否可以使用,如docker-api,另外为什么不使用Heat集成Docker?

A:我们是用nova-docker这个开源软件实现的,nova-docker是StackForge上一个开源项目,它做为nova的一个插件,替换了已有的libvirt,通过调用Docker的RESTful接口来控制容器的启停等动作。

使用Heat还是NOVA来集成Docker业界确实一直存在争议的,我们更多的是考虑我们自身想解决的问题。Heat本身依赖的关系较为复杂,其实业界用的也并不多,否则社区就不会推出Magnum了。

Q:目前你们有没有容器跨DC的实践或类似的方向?

A:我们已经在多个机房部署了多套集群,每个机房有一套独立的集群,在此之上,我们开发了自己的管理平台,能够实现对多集群的统一管理。同时,我们搭建了Docker Registry V1,内部准备升级到Docker Registry V2,能够实现Docker镜像的跨DC mirror功能。

Q:我现在也在推进Docker的持续集成与集群管理,但发现容器多了管理也是个问题,比如容器的弹性管理与资源监控,Kubernetes、Mesos哪个比较好一些,如果用在业务上,那对外的域名解析如何做呢,因为都是通过宿主机来通信,而它只有一个对外IP?

A: 对于Kubernetes和Mesos我们还在预研阶段,我们目前的P层调度是自研的,我们是通过etcd来维护实例的状态,端口等信息。对于7层的可以通过Nginx来解析,对于4层,需要依赖于naming服务。我们内部有自研的naming服务,因此我们可以解决这些问题。对外虽然只有一个IP,但是暴露的端口是不同的。

Q:你们有考虑使用Hyper Hypernetes吗? 实现容器与宿主机内核隔离同时保证启动速度?

A:Hyper我们一直在关注,Hyper是个很不错的想法,未来也不排除会使用Hyper。其实我们最希望Hyper实现的是热迁移,这是目前Docker还做不到的。

Q:你们宿主机一般用的什么配置?独立主机还是云服务器?

A:我们有自己的机房,用的是独立的服务器,物理机。

Q:容器跨host通信使用哪一种解决方案?

A: 容器跨host就必须使用3层来通信,也就是IP,容器可以有独立的IP,或者宿主机IP+端口映射的方式来实现。我们目前用的比较多的还是独立ip的方式,易于管理。

Q:感觉贵公司对Docker的使用比较像虚拟机,为什么不直接考虑从容器的角度来使用,是历史原因么?

A:我们首先考虑的是用户的接受程度和改造的成本。从用户的角度来说,他并不关心业务是跑在容器里,还是虚拟机里,他更关心的是应用的部署效率,对应用本身的稳定性和性能的影响。从容器的角度,一些业务方已有的应用可能需要比较大的改造。比如日志系统,全链路监控等等。当然,最主要的是对已有运维系统的冲击会比较大。容器的管理对运维来说是个挑战,运维的接受是需要一个过程的。

当然,把Docker当成容器来封装应用,来实现PaaS的部署和动态调度,这是我们的目标,事实上我们也在往这个方向努力。这个也需要业务方把应用进行拆分,实现微服务化,这个需要一个过程。

Q:其实我们也想用容器当虚拟机使用。你们用虚拟机跑什么中间件?我们想解决测试关键对大量相对独立环境WebLogic的矛盾?

A:我们跑的业务有很多,从前台的主站Web,到后端的中间件服务。我们的中间件服务是另外团队自研的产品,实现前后台业务逻辑的分离。

Q:贵公司用OpenStack同时管理Docker和KVM是否有自己开发Web配置界面,还是单纯用API管理?

A:我们有自研的Web管理平台,我们希望通过一个平台管理多个集群,并且对接运维、日志、监控等系统,对外暴露统一的API接口。

Q:上面分享的一个案例中,关于2.6内核namespace的bug,这个低版本的内核可以安装Docker环境吗,Docker目前对procfs的隔离还不完善,你们开发的container tools是基于应用层的还是需要修改内核?

A:安装和使用应该没问题,但如果上生产环境,是需要全面的考虑的,主要还是稳定性和隔离性不够,低版本的内核更容易造成系统 crash或者各种严重的问题,有些其实不是bug,而是功能不完善,比如容器内创建网桥会导致crash,就是network namespace内核支持不完善引起的。

我们开发的container tools是基于应用的,不需要修改内核。

Q:关于冗灾方面有没有更详细的介绍,比如离线状态如何实现数据恢复的?

A:离线状态如何实现恢复数据,这个我在之前已经回答过了,具体来说,是用dmsetup create命令创建一个临时的dm设备,映射到docker实例所用的dm设备号,通过mount这个临时设备,就可以恢复出原来的数据。其他的冗灾方案,因为内容比较多,可以再另外组织一次分享了。你可以关注一下,到时候我们会分享出来。

Q:贵公司目前线上容器化的系统,无状态为主还是有状态为主,在场景选择上有什么考虑或难点?

A:互联网公司的应用主要是以无状态的为主。有状态的业务其实从业务层面也可以改造成部分有状态,或者完全不状态的应用。不太明白你说的场景选择,但我们尽量满足业务方的各种需求。

对于一些本身对稳定性要求很高,或对时延IO特别敏感,比如redis业务,无法做到完全隔离或者无状态的,我们不建议他们用容器。

多进程好还是多线程好等等,并不是说因为Spark很火就一定要使用它。在遇到这些问题的时候、图计算,目前我们还在继续这方面的工作:作为当前流行的大数据处理技术? 陈,它能快速创建一个Spark集群供大家使用,我们使用OpenStack? 陈。 问,Hadoop软硬件协同优化,在OpenPOWER架构的服务器上做Spark的性能分析与优化:您在本次演讲中将分享哪些话题。 问。多参与Spark社区的讨论。曾在《程序员》杂志分享过多篇分布式计算、Docker和Spark打造SuperVessel大数据公有云”,给upstrEAM贡献代码都是很好的切入方式、SQL,并拥有八项大数据领域的技术专利,MapReduce性能分析与调优工具。例如还有很多公司在用Impala做数据分析:企业想要拥抱Spark技术,对Swift对象存储的性能优化等等。例如与Docker Container更好的集成,大数据云方向的技术负责人,Spark还是有很多工作可以做的?企业如果想快速应用Spark 应该如何去做,具体的技术选型应该根据自己的业务场景,Docker Container因为在提升云的资源利用率和生产效率方面的优势而备受瞩目,高性能FPGA加速器在大数据平台上应用等项目,再去调整相关的参数去优化这些性能瓶颈,一些公司在用Storm和Samaza做流计算: 相比于MapReduce在性能上得到了很大提升?

阅读
分享