本文目录一览:
- 1、如何学习Spark API
- 2、spark任务的提交流程(yarn)
- 3、如何使用spark将程序提交任务到yarn-Spark-about云开发
- 4、如何正确使用Hadoop YARN Restful api提交spark应用
- 5、几种常见的spark任务提交模式
如何学习Spark API
Spark采用一个统一sparkapi提交服务的技术堆栈解决sparkapi提交服务了云计算大数据的如流处理、图技术、机器学习、NoSQL查询等方面的所有核心问题sparkapi提交服务,具有完善的生态系统,这直接奠定了其一统云计算大数据领域的霸主地位sparkapi提交服务;
要想成为Spark高手,需要经历一下阶段sparkapi提交服务:
第一阶段:熟练地掌握Scala语言
1, Spark框架是采用Scala语言编写的,精致而优雅。要想成为Spark高手,你就必须阅读Spark的源代码,就必须掌握Scala,;
2, 虽然说现在的Spark可以采用多语言Java、Python等进行应用程序开发,但是最快速的和支持最好的开发API依然并将永远是Scala方式的API,所以你必须掌握Scala来编写复杂的和高性能的Spark分布式程序;
3, 尤其要熟练掌握Scala的trait、apply、函数式编程、泛型、逆变与协变等;
第二阶段:精通Spark平台本身提供给开发者API
1, 掌握Spark中面向RDD的开发模式,掌握各种transformation和action函数的使用;
2, 掌握Spark中的宽依赖和窄依赖以及lineage机制;
3, 掌握RDD的计算流程,例如Stage的划分、Spark应用程序提交给集群的基本过程和Worker节点基础的工作原理等
第三阶段:深入Spark内核
此阶段主要是通过Spark框架的源码研读来深入Spark内核部分:
1, 通过源码掌握Spark的任务提交过程;
2, 通过源码掌握Spark集群的任务调度;
3, 尤其要精通DAGScheduler、TaskScheduler和Worker节点内部的工作的每一步的细节;
第四阶级:掌握基于Spark上的核心框架的使用
Spark作为云计算大数据时代的集大成者,在实时流处理、图技术、机器学习、NoSQL查询等方面具有显著的优势,我们使用Spark的时候大部分时间都是在使用其上的框架例如Shark、Spark Streaming等:
1, Spark Streaming是非常出色的实时流处理框架,要掌握其DStream、transformation和checkpoint等;
2, Spark的离线统计分析功能,Spark 1.0.0版本在Shark的基础上推出了Spark SQL,离线统计分析的功能的效率有显著的提升,需要重点掌握;
3, 对于Spark的机器学习和GraphX等要掌握其原理和用法;
第五阶级:做商业级别的Spark项目
通过一个完整的具有代表性的Spark项目来贯穿Spark的方方面面,包括项目的架构设计、用到的技术的剖析、开发实现、运维等,完整掌握其中的每一个阶段和细节,这样就可以让您以后可以从容面对绝大多数Spark项目。
第六阶级:提供Spark解决方案
1, 彻底掌握Spark框架源码的每一个细节;
spark任务的提交流程(yarn)
spark一般都是部署到yarn上使用sparkapi提交服务的sparkapi提交服务,所以就说y问sparkapi提交服务的最多的就是arn的提交流程sparkapi提交服务,两种模式最大的区别就是driver端的执行位置.
Yarn Client 模式
第一步sparkapi提交服务,Driver端在任务提交的本地机上运行
第二步,Driver启动之后就会和ResourceManager通讯,申请启动一个ApplicationMaster
第三步,ResourceManager就会分配container容器,在合适的nodemanager上启动ApplicationMaster,负责向ResourceManager申请Executor内存
第四步,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程
第五步,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数
第六步,之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。
Yarn Cluster 模式
第一步,在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster
第二步, 随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。
第三步, Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程
第四步,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,
第五步,之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。
如何使用spark将程序提交任务到yarn-Spark-about云开发
使用脚本提交
1.使用spark脚本提交到yarn,首先需要将spark所在的主机和hadoop集群之间hosts相互配置(也就是把spark主机的ip和主机名配置到hadoop所有节点的/etc/hosts里面,再把集群所有节点的ip和主机名配置到spark所在主机的/etc/hosts里面)。
2.然后需要把hadoop目录etc/hadoop下面的*-sit.xml复制到${SPARK_HOME}的conf下面.
3.确保hadoop集群配置了 HADOOP_CONF_DIR or YARN_CONF_DIR
1.yarn-standalone方式提交到yarn
在${SPARK_HOME}下面执行:
SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \
./bin/spark-class org.apache.spark.deploy.yarn.Client \
--jar ./examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \
--class org.apache.spark.examples.SparkPi \
--args yarn-standalone \
--num-workers 3 \
--master-memory 2g \
--worker-memory 2g \
--worker-cores 1
复制代码
2. yarn-client 方式提交到yarn
在${SPARK_HOME}下面执行:
SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \
SPARK_YARN_APP_JAR=examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \
./bin/run-example org.apache.spark.examples.SparkPi yarn-client
复制代码
二、使用程序提交
1.必须使用linux主机提交任务,使用windows提交到linux hadoop集群会报
org.apache.hadoop.util.Shell$ExitCodeException: /bin/bash: 第 0 行: fg: 无任务控制
复制代码
错误。hadoop2.2.0不支持windows提交到linux hadoop集群,网上搜索发现这是hadoop的bug。
2.提交任务的主机和hadoop集群主机名需要在hosts相互配置。
3.因为使用程序提交是使用yarn-client方式,所以必须像上面脚本那样设置环境变量SPARK_JAR 和 SPARK_YARN_APP_JAR
比如我的设置为向提交任务主机~/.bashrc里面添加:
export SPARK_JAR=
export SPARK_YARN_APP_JAR=
复制代码
file:// 表明是本地文件,如果使用hdfs上的文件将file://替换为hdfs://主机名:端口号。建议使用hdfs来引用 spark-assembly-0.9.0-incubating-hadoop2.2.0.jar,因为这个文件比较大,如果使用file://每次提交任务都需要上传这个jar到各个集群,很慢。
其中SPARK_JAR是${SPARK_HOME}/assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar
SPARK_YARN_APP_JAR是自己程序打的jar包,包含自己的测试程序。
4.程序中加入hadoop、yarn、依赖。
注意,如果引入了hbase依赖,需要这样配置
dependency
groupIdorg.apache.hbase/groupId
artifactIdhbase-thrift/artifactId
version${hbase.version}/version
exclusions
exclusion
groupIdorg.apache.hadoop/groupId
artifactIdhadoop-mapreduce-client-jobclient/artifactId
/exclusion
exclusion
groupIdorg.apache.hadoop/groupId
artifactIdhadoop-client/artifactId
/exclusion
/exclusions
/dependency
复制代码
然后再加入
dependency
groupIdorg.ow2.asm/groupId
artifactIdasm-all/artifactId
version4.0/version
/dependency
复制代码
否则会报错:
IncompatibleClassChangeError has interface org.objectweb.asm.ClassVisitor as super class
复制代码
异常是因为Hbase jar hadoop-mapreduce-client-jobclient.jar里面使用到了asm3.1 而spark需要的是asm-all-4.0.jar
5. hadoop conf下的*-site.xml需要复制到提交主机的classpath下,或者说maven项目resources下面。
6.编写程序
代码示例:
package com.sdyc.ndspark.sys;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* Created with IntelliJ IDEA.
* User: zarchary
* Date: 14-1-19
* Time: 下午6:23
* To change this template use File | Settings | File Templates.
*/
public class ListTest {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("listTest");
//使用yarn模式提交
sparkConf.setMaster("yarn-client");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
ListString listA = new ArrayListString();
listA.add("a");
listA.add("a");
listA.add("b");
listA.add("b");
listA.add("b");
listA.add("c");
listA.add("d");
JavaRDDString letterA = sc.parallelize(listA);
JavaPairRDDString, Integer letterB = letterA.map(new PairFunctionString, String, Integer() {
@Override
public Tuple2String, Integer call(String s) throws Exception {
return new Tuple2String, Integer(s, 1);
}
});
letterB = letterB.reduceByKey(new Function2Integer, Integer, Integer() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
//颠倒顺序
JavaPairRDDInteger, String letterC = letterB.map(new PairFunctionTuple2String, Integer, Integer, String() {
@Override
public Tuple2Integer, String call(Tuple2String, Integer stringIntegerTuple2) throws Exception {
return new Tuple2Integer, String(stringIntegerTuple2._2, stringIntegerTuple2._1);
}
});
JavaPairRDDInteger, ListString letterD = letterC.groupByKey();
// //false说明是降序
JavaPairRDDInteger, ListString letterE = letterD.sortByKey(false);
System.out.println("========" + letterE.collect());
System.exit(0);
}
}
复制代码
代码中master设置为yar-client表明了是使用提交到yarn.
关于spark需要依赖的jar的配置可以参考我的博客spark安装和远程调用。
以上弄完之后就可以运行程序了。
运行后会看到yarn的ui界面出现:
正在执行的过程中会发现hadoop yarn 有的nodemanage会有下面这个进程:
13247 org.apache.spark.deploy.yarn.WorkerLauncher
复制代码
这是spark的工作进程。
如果接收到异常为:
WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
复制代码
出现这个错误是因为提交任务的节点不能和spark工作节点交互,因为提交完任务后提交任务节点上会起一个进程,展示任务进度,大多端口为4044,工作节点需要反馈进度给该该端口,所以如果主机名或者IP在hosts中配置不正确,就会报
WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory错误。
所以请检查主机名和IP是否配置正确。
我自己的理解为,程序提交任务到yarn后,会上传SPARK_JAR和SPARK_YARN_APP_JAR到hadoop节点, yarn根据任务情况来分配资源,在nodemanage节点上来启动org.apache.spark.deploy.yarn.WorkerLauncher工作节点来执行spark任务,执行完成后退出。
如何正确使用Hadoop YARN Restful api提交spark应用
以wordcount为例,在map阶段,map函数在每个单词后面加上一个1;
在reduce阶段,reduce函数将相同单词后面的1都加起来。
其中hadoop框架实现过程中的排序,分配等,当然这些也可以通过自定义的函数来控制。
几种常见的spark任务提交模式
[if !supportLists]2.1.1 [endif] YARN Clu
图2-4 YARN Cluster 模式
在YARN Cluster 模式下,任务提交后会和ResourceManager 通讯申请启动
ApplicationMaster,随后ResourceManager 分配container,在合适sparkapi提交服务的NodeManager
上启动ApplicationMaster,此时sparkapi提交服务的ApplicationMaster 跟Driver在一个NodeManager上,但当有多个App任务时,Driver会分布在多个NodeManager上面,因为Driver要与client通信,Driver在同一个NodeManager上会对网络的要求很高。
Driver 启动后向ResourceManager 申请Executor 内存,ResourceManager 接到
ApplicationMaster 的资源申请后会分配container,然后在合适的NodeManager 上启动Executor 进程,Executor 进程启动后会向Driver 反向注册,Executor 全部注册完成后Driver 开始执行main 函数,之后执行到Action 算子时,触发一个job,并根据宽依赖开始划分stage,每个stage 生成对应的taskSet,之后将task 分发到各个
Executor 上执行。