本文目录一览:
- 1、求助,spark 提交任务到集群报错
- 2、如何构建第一个Spark项目代码
- 3、Spark对硬件的要求
- 4、Spark性能调优-数据本地化调优
- 5、Spark连接到MySQL并执行查询为什么速度会快
- 6、spark2.0 可以安装到 hadoop2.5上吗
求助,spark 提交任务到集群报错
这里是结合Hadoop2.0使用的1,download:根据下载的spark的README中的描述下载合适的版本3,安装其实就是解压,配置/etc/profile环境变量exportSPARK_HOME=/data1/spark/sparkexportSCALA_HOME=/data1/spark/scala-2.9.3exportPATH=$PATH:$SPARK_HOME/bin:$SCALA_HOME/bin配置spark的conf下的spark-env.shexportJAVA_HOME=/usr/java/defaultexportSCALA_HOME=/data1/spark/scala-2.9.3exportSPARK_MASTER_IP=192.168.0.1exportSPARK_MASTER_WEBUI_PORT=8080exportSPARK_WORKER_WEBUI_PORT=8000exportYARN_CONF_DIR=/data/hadoop/hadoop-2.0/etc/hadoop配置slaves(ip根据需要修改)192.168.0.2192.168.0.3分发spark目录和scala目录到几台服务器相同路径下4,启动进入主节点的spark目录的bin下stop-all.sh是停掉集群,start-all.sh启动集群,jps可以在主节点看到master进程,slave节点看到worker进程5,运行程序,运行例子进入spark目录下分布式运行./run-exampleorg.apache.spark.examples.SparkPispark://192.168.0.1:7077./run-exampleorg.apache.spark.examples.SparkLRspark://192.168.0.1:7077本地运行./run-exampleorg.apache.spark.examples.SparkPilocal./run-exampleorg.apache.spark.examples.SparkLRlocal
如何构建第一个Spark项目代码
操作系统
Window7/Mac
IDE
IntelliJ IDEA Community Edition 14.1.6
下载地址
JDK 1.8.0_65
下载地址
Scala 2.11.7
下载地址
其它环境
Spark:1.4.1
下载地址
Hadoop Yarn:Hadoop 2.5.0-cdh5.3.2
IDE项目创建
新建一个项目
New Project
使用Maven模型创建一个Scala项目
填写自己企业级spark服务器配置优惠的GroupId、ArtifactId,Version不需要修改企业级spark服务器配置优惠,Maven会根据GroupId生成相应的目录结构,GroupId的取值一般为a.b.c 结构,ArtifactId为项目名称。之后点击next,填写完项目名称和目录,点击finish就可以让maven帮企业级spark服务器配置优惠你创建Scala项目
项目创建完成后,目录结构如下
4.为项目添加JDK以及Scala SDK
点击File-Project Structure,在SDKS和Global Libraries中为项目配置环境。
至此整个项目结构、项目环境都搭建好了
编写主函数
主函数的编写在 projectName/src/main/scala/…/下完成,如果按照上述步骤完成代码搭建,将在目录最后发现
MyRouteBuild
MyRouteMain
这两个文件为模块文件,删除MyRouteBuild,重命名MyRouteMain为DirectKafkaWordCount。这里,我使用Spark Streaming官方提供的一个代码为实例代码,代码如下
package org.apache.spark.examples.streaming
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length 2) {
System.err.println("...")
System.exit(1)
}
//StreamingExamples.setStreamingLogLevels()
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" - brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x = (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
将代码最上面的package org.apache.spark.examples.streaming,替换为DirectKafkaWordCount里的package部分即可。并覆盖DirectKafkaWordCount文件。
至此Spark处理代码已经编写完成。
修改pom.xml,为项目打包做准备
pom.xml中编写了整个项目的依赖关系,这个项目中我们需要导入一些Spark Streaming相关的包。
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.4.1/version
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming-kafka_2.10/artifactId
version1.4.1/version
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming_2.10/artifactId
version1.4.1/version
/dependency
!-- scala --
dependency
groupIdorg.scala-lang/groupId
artifactIdscala-library/artifactId
version2.10.4/version
/dependency
除此之外,如果需要把相关依赖打包到最终JAR包中,需要在pom.xml的bulid标签中写入以下配置:
plugins
!-- Plugin to create a single jar that includes all dependencies --
plugin
artifactIdmaven-assembly-plugin/artifactId
version2.4/version
configuration
descriptorRefs
descriptorRefjar-with-dependencies/descriptorRef
/descriptorRefs
/configuration
executions
execution
idmake-assembly/id
phasepackage/phase
goals
goalsingle/goal
/goals
/execution
/executions
/plugin
plugin
groupIdorg.apache.maven.plugins/groupId
artifactIdmaven-compiler-plugin/artifactId
version2.0.2/version
configuration
source1.7/source
target1.7/target
/configuration
/plugin
plugin
groupIdnet.alchim31.maven/groupId
artifactIdscala-maven-plugin/artifactId
executions
execution
idscala-compile-first/id
phaseprocess-resources/phase
goals
goaladd-source/goal
goalcompile/goal
/goals
/execution
execution
idscala-test-compile/id
phaseprocess-test-resources/phase
goals
goaltestCompile/goal
/goals
/execution
/executions
/plugin
/plugins
pom.xml文件修改完成后,即可开始maven打包,操作如图:
点击右侧弹出窗口的Execute Maven Goal,在command line中输入clean package
Spark作业提交
在项目projectname/target目录下即可找到两个jar包,其中一个仅包含Scala代码,另一个包含所有依赖的包。
将jar包导到Spark服务器,运行Spark作业,运行操作如下
../bin/spark-submit –master yarn-client –jars ../lib/kafka_2.10-0.8.2.1.jar –class huochen.spark.example.DirectKafkaWordCount sparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-broker topic
利用spark-submit把任务提交到Yarn集群,即可看到运行结果。
Spark对硬件的要求
Spark对硬件的要求
估计所有的spark开发者都很关心spark的硬件要求。恰当的硬件配置需要具体情况具体分析,在这里给出以下建议。主要译自官网
一,存储系统
因为大多数Spark工作可能需要从外部存储系统(例如Hadoop文件系统或HBase)中读取输入数据,所以将spark尽可能部署到靠近存储系统很重要。所以,有如下建议:
1,如果可能,在与HDFS相同的节点上运行Spark。最简单的方式是将spark的Standalone集群和hadoop集群安装在相同的节点,同时配置好Spark和hadoop的内存使用,避免相互干扰(对于hadoop,每个task的内存配置参数是mapred.child.java.opts;mapreduce.tasktracker.map.tasks.maximum 和mapreduce.tasktracker.reduce.tasks.maximum决定了task的数目)。也可以将hadoop和spark运行在共同的集群管理器上,如mesos和 yarn。
2,如果不可能,请在与HDFS相同的局域网中的不同节点上运行Spark。
3,对于低延迟数据存储(如HBase),可能优先在与存储系统不同的节点上运行计算任务以避免干扰。
二,本地磁盘
虽然Spark可以在内存中执行大量的计算,但它仍然使用本地磁盘来存储不适合RAM的数据,以及在stage之间,也即shuffle的中间结果。建议每个节点至少有4-8块磁盘,并且不需要RAID,仅仅是独立的磁盘挂在节点。在Linux中,使用noatime选项安装磁盘,以减少不必要的写入。在spark任务中,spark.local.dir配置可以十多个磁盘目录,以逗号分开。如果运行在hdfs上,与hdfs保持一致就很好。
使用noatime选项安装磁盘,要求当挂载文件系统时,可以指定标准Linux安装选项(noatime),这将禁用该文件系统上的atime更新。磁盘挂在命令:
mount -t gfs BlockDevice MountPoint -onoatime
BlockDevice 指定GFS文件系统驻留的块设备。
MountPoint 指定GFS文件系统应安装的目录。
例子:
mount -t gfs /dev/vg01/lvol0 /gfs1 -onoatime
三,内存
单台机器内存从8GB到数百GB,spark都能运行良好。在所有情况下,建议仅为Spark分配最多75%的内存;留下其余的操作系统和缓冲区缓存。
需要多少内存取决于你的应用程序。要确定你的应用的特定数据集需要多大内存,请加载部分数据集到内存,然后在Spark UI的Storage界面去看它的内存占用量。
请注意,内存使用受到存储级别和序列化格式的极大影响 - 有关如何减少内存使用的技巧,请参阅另一篇调优的文章。
最后,请注意,对于超过200GB的内存的机器JAVA VM运行状态并不一直表现良好。如果买的机器内存超过了200GB,那么可以在一个节点上运行多个worker。Spark Standalone模式下,可以在配置文件 conf/spark-env.sh中设置SPARK_WORKER_INSTANCES的值来设置单节点worker的数目。也可以设置SPARK_WORKER_CORES参数来设置每个Worker的cpu数目。
四,网络
根据以往的经验,假如数据是在内存中,那么spark的应用的瓶颈往往就在网络。用10 Gigabit或者更高的网络,是使spark应用跑的最更快的最佳方式。特别是针对“distributed reduce”应用,如group-bys,reduce-bys和sql joins,就表现的更加明显。在任何给定的应用程序中,可以通过spark ui查看spark shuffle过程夸网络传输了多少数据。
五, cpu
对于每台机器几十个cpu的机器,spark也可以很好的扩展,因为他在线程之间执行最小的共享cpu。应该每台机器至少配置8-16个内核。根据cpu负载,可能需要更多的cpu:一旦数据在内存中,大多数应用程序的瓶颈就在CPU和网络。
推荐阅读:
面试必备|spark 高层通用调优
Spark Adaptive Execution调研
Spark 的硬件配置
从MapReduce的兴起,就带来一种思路,就是希望通过大量廉价的机器来处理以前需要耗费昂贵资源的海量数据。这种方式事实上是一种架构的水平伸缩模式——真正的以量取胜。毕竟,以现在的硬件发展来看,CPU的核数、内存的容量以及海量存储硬盘,都慢慢变得低廉而高效。然而,对于商业应用的海量数据挖掘或分析来看,硬件成本依旧是开发商非常关注的。当然最好的结果是:既要马儿跑得快,还要马儿少吃草。
\\
Spark相对于Hadoop的MapReduce而言,确乎要跑得迅捷许多。然而,Spark这种In-Memory的计算模式,是否在硬件资源尤其是内存资源的消耗上,要求更高呢?我既找不到这么多机器,也无法租用多台虚拟instance,再没法测评的情况下,只要寻求Spark的官方网站,又或者通过Google搜索。从Spark官方网站,Databricks公司Patrick Wendell的演讲以及Matei Zaharia的Spark论文,找到了一些关于Spark硬件配置的支撑数据。
\\
Spark 与存储系统
\\
如果Spark使用HDFS作为存储系统,则可以有效地运用Spark的standalone mode cluster,让Spark与HDFS部署在同一台机器上。这种模式的部署非常简单,且读取文件的性能更高。当然,Spark对内存的使用是有要求的,需要合理分配它与HDFS的资源。因此,需要配置Spark和HDFS的环境变量,为各自的任务分配内存和CPU资源,避免相互之间的资源争用。
\\
若HDFS的机器足够好,这种部署可以优先考虑。若数据处理的执行效率要求非常高,那么还是需要采用分离的部署模式,例如部署在Hadoop YARN集群上。
\\
Spark 对磁盘的要求
\\
Spark是in memory的迭代式运算平台,因此它对磁盘的要求不高。Spark官方推荐为每个节点配置4-8块磁盘,且并不需要配置为RAID(即将磁盘作为单独的mount point)。然后,通过配置spark.local.dir来指定磁盘列表。
\\
Spark 对内存的要求
\\
Spark虽然是in memory的运算平台,但从官方资料看,似乎本身对内存的要求并不是特别苛刻。官方网站只是要求内存在8GB之上即可(Impala要求机器配置在128GB)。当然,真正要高效处理,仍然是内存越大越好。若内存超过200GB,则需要当心,因为JVM对超过200GB的内存管理存在问题,需要特别的配置。
\\
内存容量足够大,还得真正分给了Spark才行。Spark建议需要提供至少75%的内存空间分配给Spark,至于其余的内存空间,则分配给操作系统与buffer cache。这就需要部署Spark的机器足够干净。
\\
考虑内存消耗问题,倘若我们要处理的数据仅仅是进行一次处理,用完即丢弃,就应该避免使用cache或persist,从而降低对内存的损耗。若确实需要将数据加载到内存中,而内存又不足以加载,则可以设置Storage Level。0.9版本的Spark提供了三种Storage Level:MEMORY_ONLY(这是默认值),MEMORY_AND_DISK,以及DISK_ONLY。
\\
关于数据的持久化,Spark默认是持久化到内存中。但它也提供了三种持久化RDD的存储方式:
\\
• \\t
in-memory storage as deserialized Javaobjects
\\t\\t
• \\t
in-memory storage as serialised data
\\t\\t
• \\t
on-disk storage
\\t\
第一种存储方式性能最优,第二种方式则对RDD的展现方式(Representing)提供了扩展,第三种方式则用于内存不足时。
\\
然而,在最新版(V1.0.2)的Spark中,提供了更多的Storage Level选择。一个值得注意的选项是OFF_HEAP,它能够将RDD以序列化格式存储到Tachyon中。相比MEMORY_ONLY_SER,这一选项能够减少执行垃圾回收,使Spark的执行器(executor)更小,并能共享内存池。Tachyon是一个基于内存的分布式文件系统,性能远超HDFS。Tachyon与Spark同源同宗,都烙有伯克利AMPLab的印记。目前,Tachyon的版本为0.5.0,还处于实验阶段。
\\
注意,RDDs是Lazy的,在执行Transformation操作如map、filter时,并不会提交Job,只有在执行Action操作如count、first时,才会执行Job,此时才会进行数据的加载。当然,对于一些shuffle操作,例如reduceByKey,虽然仅是Transformation操作,但它在执行时会将一些中间数据进行持久化,而无需显式调用persist()函数。这是为了应对当节点出现故障时,能够避免针对大量数据进行重计算。要计算Spark加载的Dataset大小,可以通过Spark提供的Web UI Monitoring工具来帮助分析与判断。
\\
Spark的RDD是具有分区(partition)的,Spark并非是将整个RDD一次性加载到内存中。Spark针对partition提供了eviction
policy,这一Policy采用了LRU(Least Recently Used)机制。当一个新的RDD分区需要计算时,如果没有合适的空间存储,就会根据LRU策略,将最少访问的RDD分区弹出,除非这个新分区与最少访问的分区属于同一个RDD。这也在一定程度上缓和了对内存的消耗。
\\
Spark对内存的消耗主要分为三部分:
\\
1. \\t
数据集中对象的大小;
\\t\\t
2. \\t
访问这些对象的内存消耗;
\\t\\t
3. \\t
垃圾回收GC的消耗。
\\t\
一个通常的内存消耗计算方法是:内存消耗大小= 对象字段中原生数据 * (2~5)。这是因为Spark运行在JVM之上,操作的Java对象都有定义的“object header”,而数据结构(如Map,LinkedList)对象自身也需要占用内存空间。此外,对于存储在数据结构中的基本类型,还需要装箱(Boxing)。Spark也提供了一些内存调优机制,例如执行对象的序列化,可以释放一部分内存空间。还可以通过为JVM设置flag来标记存放的字节数(选择4个字节而非8个字节)。在JDK 7下,还可以做更多优化,例如对字符编码的设置。这些配置都可以在spark-env.sh中设置。
\\
Spark 对网络的要求
\\
Spark属于网络绑定型系统,因而建议使用10G及以上的网络带宽。
\\
Spark 对 CPU 的要求
\\
Spark可以支持一台机器扩展至数十个CPU
core,它实现的是线程之间最小共享。若内存足够大,则制约运算性能的就是网络带宽与CPU数。
\\
Spark官方利用Amazon EC2的环境对Spark进行了基准测评。例如,在交互方式下进行数据挖掘(Interative Data Mining),租用Amazon EC2的100个实例,配置为8核、68GB的内存。对1TB的维基百科页面查阅日志(维基百科两年的数据)进行数据挖掘。在查询时,针对整个输入数据进行全扫描,只需要耗费5-7秒的时间。如下图所示:
在Matei Zaharia的Spark论文中还给出了一些使用Spark的真实案例。视频处理公司Conviva,使用Spark将数据子集加载到RDD中。报道说明,对于200GB压缩过的数据进行查询和聚合操作,并运行在两台Spark机器上,占用内存为96GB,执行完全部操作需要耗费30分钟左右的时间。同比情况下,Hadoop需要耗费20小时。注意:之所以200GB的压缩数据只占用96GB内存,是因为RDD的处理方式,使得我们可以只加载匹配客户过滤的行和列,而非所有压缩数据。`
Spark集群硬件配置推荐
计算与存储:
大多数Spark作业可能需要从外部存储系统(例如 :Cassandra、Hadoop文件系统或HBase)读取输入数据,所以要让Spark计算引擎尽可能靠近数据持久层。如果使用HDFS作为数据存储集群,可以在相同的集群上部署Spark集群,并配置Spark和Hadoop的内存和CPU使用率以避免干扰。我们的生产存储使用的是Cassandra集群,spark
master 服务单独部署,其它节点同时部署:Cassandra
+ spark worker,保证spark
worker 节点可以快速从本地读取数据进行计算汇总。
磁盘:
虽然Spark可以在内存中执行大量的计算,但它仍然可能会使用本地磁盘来存储不适用于RAM的数据,建议每个节点配置4-8个磁盘,不需要配置RAID(磁盘阵列),磁盘成本越来越低,可以考虑配置ssd硬盘,可以大幅提升性能。另外;在Linux中,使用noatime选项挂载磁盘,以减少不必要的写入操作。 在Spark中,可以将spark.local.dir变量配置为多个本地磁盘的地址,多个地址之间以逗号分隔。
内存
建议为Spark分配的内存容量不大于机器总内存容量的75%;确保为操作系统和缓冲区留下足够的内存。根据业务特点评估需要多少内存。请注意,当内存容量超过200GB时Java 虚拟机的性能表现会不稳定。如果您购买的RAM大于200G,则可以为每个节点运行多个worker
JVM。在Spark的standalone模式下,您可以通过conf/spark-env.sh中的SPARK_WORKER_INSTANCES变量设置每个节点运行的worker进程数,以及通过SPARK_WORKER_CORES变量设置每个worker可用的cpu核心数。
网络
当数据已经存储在内存中时,很多Spark应用程序的性能瓶颈在于网络的传输速率。推荐最低使用10G的网络。
CPU
Spark运行汇总计算任务比较多,推荐配置更多的cpu核数,性能提升还是比较明显,推荐:每台机器至少配置8-16个核。可以根据Spark作业的CPU负载情况,进行配置调整。一旦数据已经在内存中,大多数应用程序的性能瓶颈在于CPU和网络。
参考文档
Spark性能调优-数据本地化调优
比如计算需要的数据在node01这台服务器中的Executor1这个进程中企业级spark服务器配置优惠,那么TaskScheduler会把TaskSet发往Executor1进程中执行企业级spark服务器配置优惠,此时的数据本地化级别时PROCESS_LOCAL,Executor1是最佳的计算位置,如果发送的task在等待了3秒,重试了5次之后仍然没有执行,那么TaskScheduler就认为Executor1的资源不充足,不足以支撑计算,那么降低数据本地化级别,把task发往node01的另外一个进程Executor2中,这时的数据本地化级别为NODE_LOCAL,如果还无法执行,降低为RACK_LOCAL,ANY,直到Task可以开始计算
增加等待时间,默认3s,可以成倍数提高,按照6s,12s,24s…这样的方式来修改,这样可以快速找到最佳值,配置参数:
在默认情况下,最初的数据本地化级别为PROCESS_LOCAL,如果等待了3s,重试5次后还没有开始执行task,那么会降低级别,再尝试开始执行task,比如,我们就想让task的数据本地化级别为PROCESS_LOCAL,那么把 spark.locality.wait.process 修改为一个很大的值,那么这个task会一直等待,直到本机的executor中已经加载过来了需要的数据,当然,我们不会这么做。
Spark连接到MySQL并执行查询为什么速度会快
在已有的 MySQL 服务器之上使用 Apache Spark (无需将数据导出到 Spark 或者 Hadoop 平台上),这样至少可以提升 10 倍的查询性能。使用多个 MySQL 服务器(复制或者 Percona XtraDB Cluster)可以让企业级spark服务器配置优惠我们在某些查询上得到额外的性能提升。企业级spark服务器配置优惠你也可以使用 Spark 的缓存功能来缓存整个 MySQL 查询结果表。
思路很简单:Spark 可以通过 JDBC 读取 MySQL 上的数据,也可以执行 SQL 查询,因此企业级spark服务器配置优惠我们可以直接连接到 MySQL 并执行查询。那么为什么速度会快呢?对一些需要运行很长时间的查询(如报表或者BI),由于 Spark 是一个大规模并行系统,因此查询会非常的快。MySQL 只能为每一个查询分配一个 CPU 核来处理,而 Spark 可以使用所有集群节点的所有核。在下面的例子中,我们会在 Spark 中执行 MySQL 查询,这个查询速度比直接在 MySQL 上执行速度要快 5 到 10 倍。
另外,Spark 可以增加“集群”级别的并行机制,在使用 MySQL 复制或者 Percona XtraDB Cluster 的情况下,Spark 可以把查询变成一组更小的查询(有点像使用企业级spark服务器配置优惠了分区表时可以在每个分区都执行一个查询),然后在多个 Percona XtraDB Cluster 节点的多个从服务器上并行的执行这些小查询。最后它会使用 map/reduce 方式将每个节点返回的结果聚合在一起行程完整的结果。
这篇文章跟我之前文章 “Airlines On-Time Performance” 所使用的数据库是相同的。瓦迪姆创建了一些脚本可以方便的下载这些数据并上传到 MySQL 数据库。脚本的下载地址请看 这里。同时我们这次使用的是 2016年7月26日发布的Apache Spark 2.0。
spark2.0 可以安装到 hadoop2.5上吗
SSH
Hadoop2.5
Ubuntu14
VMware10
JDK1.7
方法/步骤
一、SSH配置:
1、首先在三台服务器上安装SSH,服务器IP地址为:
192.168.217.128;
192.168.217.129;
192.168.217.130
sudo apt-get install openssh-server openssh-client
2、然后分别在三台服务器上执行以下命令,配置SSH免秘钥:
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa $ cat ~/.ssh/id_dsa.pub ~/.ssh/authorized_keys
输入命令:ssh localhost
第一次需要输入密码,然后再次输入:ssh localhost
能无密码登陆,说明配置成功
3、手动复制ubuntu2,ubuntu3 ~/.ssh/id_dsa.pub 文件内容添加到ubuntu1的~/.ssh/authorized_keys 文件中;
或者分别执行以下代码(注:zhou是用户名):
在ubuntu2中执行
$ scp ~/.ssh/id_dsa.pub zhou@ubuntu1:~/.ssh/authorized_keys
在ubuntu3中执行
$ scp ~/.ssh/id_dsa.pub zhou@ubuntu1:~/.ssh/authorized_keys