本文目录一览:
- 1、关于Spark有哪些大牛们的博客
- 2、spark dataframe可以干什么
- 3、如何构建第一个Spark项目代码
- 4、如何用spark stream 收集电脑运行日志
- 5、如何使用spark将程序提交任务到yarn-Spark-about云开发
- 6、spark处理数据如何用服务器内存
关于Spark有哪些大牛们的博客
自己整理的,方便自己使用吧。
Intel @邵赛赛 的博客 jerryshao.me/ 他是早期Spark contributor之一
盛利:Spark SQL 源码分析系列文章
许鹏:徽沪一郎 - 博客园 博主的新书《Spark源码剖析》快出了吧 :-)
[1]fxjwind - 博客园
[2]张包峰的博客
[3]Spark - anzhsoft的技术专栏
另外有几个业界著名的公司博客
[1]Databricks Blog
[2]Spark Archives
[3]mapr.com/blog/big-data-
spark dataframe可以干什么
DataFrame是Spark SQL的一种编程抽象,它是一张分布式的表,是数据类型为Row的DataSet,可以简单认为:DataFrame是DataSet[Row]的别名。
你说我们得到了一张表可以做些什么呢?那些数据库的操作都可以,比如增删改查,联结操作等等,都是可以的。
推荐你去Spark官网查看官方文档,然后结合官方文档、博客还有相关书籍,这样学起来比较快。
如何构建第一个Spark项目代码
操作系统
Window7/Mac
IDE
IntelliJ IDEA Community Edition 14.1.6
下载地址
JDK 1.8.0_65
下载地址
Scala 2.11.7
下载地址
其它环境
Spark:1.4.1
下载地址
Hadoop Yarn:Hadoop 2.5.0-cdh5.3.2
IDE项目创建
新建一个项目
New Project
使用Maven模型创建一个Scala项目
填写自己的GroupId、ArtifactId,Version不需要修改,Maven会根据GroupId生成相应的目录结构,GroupId的取值一般为a.b.c 结构,ArtifactId为项目名称。之后点击next,填写完项目名称和目录,点击finish就可以让maven帮你创建Scala项目
项目创建完成后,目录结构如下
4.为项目添加JDK以及Scala SDK
点击File-Project Structure,在SDKS和Global Libraries中为项目配置环境。
至此整个项目结构、项目环境都搭建好了
编写主函数
主函数的编写在 projectName/src/main/scala/…/下完成,如果按照上述步骤完成代码搭建,将在目录最后发现
MyRouteBuild
MyRouteMain
这两个文件为模块文件,删除MyRouteBuild,重命名MyRouteMain为DirectKafkaWordCount。这里,我使用Spark Streaming官方提供的一个代码为实例代码,代码如下
package org.apache.spark.examples.streaming
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object DirectKafkaWordCount {
def main(args: Array[String]) {
if (args.length 2) {
System.err.println("...")
System.exit(1)
}
//StreamingExamples.setStreamingLogLevels()
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" - brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x = (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
将代码最上面的package org.apache.spark.examples.streaming,替换为DirectKafkaWordCount里的package部分即可。并覆盖DirectKafkaWordCount文件。
至此Spark处理代码已经编写完成。
修改pom.xml,为项目打包做准备
pom.xml中编写了整个项目的依赖关系,这个项目中我们需要导入一些Spark Streaming相关的包。
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.4.1/version
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming-kafka_2.10/artifactId
version1.4.1/version
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming_2.10/artifactId
version1.4.1/version
/dependency
!-- scala --
dependency
groupIdorg.scala-lang/groupId
artifactIdscala-library/artifactId
version2.10.4/version
/dependency
除此之外,如果需要把相关依赖打包到最终JAR包中,需要在pom.xml的bulid标签中写入以下配置:
plugins
!-- Plugin to create a single jar that includes all dependencies --
plugin
artifactIdmaven-assembly-plugin/artifactId
version2.4/version
configuration
descriptorRefs
descriptorRefjar-with-dependencies/descriptorRef
/descriptorRefs
/configuration
executions
execution
idmake-assembly/id
phasepackage/phase
goals
goalsingle/goal
/goals
/execution
/executions
/plugin
plugin
groupIdorg.apache.maven.plugins/groupId
artifactIdmaven-compiler-plugin/artifactId
version2.0.2/version
configuration
source1.7/source
target1.7/target
/configuration
/plugin
plugin
groupIdnet.alchim31.maven/groupId
artifactIdscala-maven-plugin/artifactId
executions
execution
idscala-compile-first/id
phaseprocess-resources/phase
goals
goaladd-source/goal
goalcompile/goal
/goals
/execution
execution
idscala-test-compile/id
phaseprocess-test-resources/phase
goals
goaltestCompile/goal
/goals
/execution
/executions
/plugin
/plugins
pom.xml文件修改完成后,即可开始maven打包,操作如图:
点击右侧弹出窗口的Execute Maven Goal,在command line中输入clean package
Spark作业提交
在项目projectname/target目录下即可找到两个jar包,其中一个仅包含Scala代码,另一个包含所有依赖的包。
将jar包导到Spark服务器,运行Spark作业,运行操作如下
../bin/spark-submit –master yarn-client –jars ../lib/kafka_2.10-0.8.2.1.jar –class huochen.spark.example.DirectKafkaWordCount sparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-broker topic
利用spark-submit把任务提交到Yarn集群,即可看到运行结果。
如何用spark stream 收集电脑运行日志
如何收集SparkSteaming运行日志实时进入kafka中
我是攻城师
用过sparkstreaming的人都知道,当使用sparkstreaming on yarn模式的时候,如果我们想查看系统运行的log,是没法直接看的,就算能看也只是一部分。
这里的log分:
(1)Spark本身运行的log
(2)代码里面业务产生的log
spark on yarn模式,如果你的Hadoop集群有100台,那么意味着你的sparkstreaming的log有可能会随机分布在100台中,你想查看log必须登录上每台机器上,一个个查看,如果通过Hadoop的8088页面查看,你也得打开可能几十个页面才能看到所有的log,那么问题来了?
能不能将这个job运行所有的log统一收集到某一个目录里面呢? 如果收集到一起的话排查log就非常方便了。
答案是很遗憾,在sparkstreaming里面没法做到,因为sparkstreaming程序永远不停机,就算你开启hadoop的log聚合也没用,只有当sparkstreaming程序停掉,hadoop的log聚合才能把所有的log收集到一个目录里面,所以其他的非sparkstreaming程序,比如MR,Spark 运行完后,如果开启log聚合,hadoop会负责把运行在各个节点上的log给统一收集到HDFS上,这样的话我们查看log就非常方便了。
现在的问题是sparkstreaming不能停机,那么还能集中收集log到指定的地方吗?答案是可以的,我们使用log4j收集日志然后异步发送至kafka里面,最后再通过logstash收集kafka里面的日志进入es即可,这样一条龙服务打通之后,出现任何异常都可以非常快和方便的在es中排查问题,效率**提升。至于使用logstash从kafka收集到es里面,不是本文的重点,有兴趣的参考散仙前面的文章:****42。
下面会介绍下如何使用:
streaming项目中的log4j使用的是apache log4j
sparkstreaming项目可以单独提交某个job的log4j文件,这样就能定制每个job的log输出格式,如果提交的时候不提交log4j文件,那么默认用的是spark安装目录下面的log4j文件。 看下我们log4j文件的内容:
最后看下提交脚本:
注意上面提交脚本中,/opt/bigdata/jars/spark/这个路径引用的jar包,必须在每台hadoop机器上都要存在,sparkstreaming运行过程中,会从本地加载jar包,此外log4j.properties文件以及参数里面--jars 后面的依赖jar 可以在提交机器上放一份即可,不需要每台机器上都存放。
提交任务后,在kafka的节点上执行消费者命令就能看到对应的log输出: 执行命令:
kafka-console-consumer --zookeeper 192.168.201.5:2181 --topic kp_diag_log
收集到的log内容如下:
至此,我们的log就统一收集成功了,后续我们可以把log从kafka导入到es中,就可以任意分析和查询了。
这里需要注意一点,sparkstreaming运行时候,系统本身也有大量的log,如果把这个系统log也收集到kafka里面本身的量是非常大的,而且好多信息不重要,其实 我们只需要关注业务重点log即可,主要是WARN+ERROR级别的,调试的时候可以把info级别打开,代码里重点关注的log都放在warn级别,异常什么的放在ERROR即可 这样排查问题时候也容易而且了避免了大量log的产生从应用本身性能的影响。
如何使用spark将程序提交任务到yarn-Spark-about云开发
使用脚本提交
1.使用spark脚本提交到yarn,首先需要将spark所在的主机和hadoop集群之间hosts相互配置(也就是把spark主机的ip和主机名配置到hadoop所有节点的/etc/hosts里面,再把集群所有节点的ip和主机名配置到spark所在主机的/etc/hosts里面)。
2.然后需要把hadoop目录etc/hadoop下面的*-sit.xml复制到${SPARK_HOME}的conf下面.
3.确保hadoop集群配置了 HADOOP_CONF_DIR or YARN_CONF_DIR
1.yarn-standalone方式提交到yarn
在${SPARK_HOME}下面执行:
SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \
./bin/spark-class org.apache.spark.deploy.yarn.Client \
--jar ./examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \
--class org.apache.spark.examples.SparkPi \
--args yarn-standalone \
--num-workers 3 \
--master-memory 2g \
--worker-memory 2g \
--worker-cores 1
复制代码
2. yarn-client 方式提交到yarn
在${SPARK_HOME}下面执行:
SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \
SPARK_YARN_APP_JAR=examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \
./bin/run-example org.apache.spark.examples.SparkPi yarn-client
复制代码
二、使用程序提交
1.必须使用linux主机提交任务,使用windows提交到linux hadoop集群会报
org.apache.hadoop.util.Shell$ExitCodeException: /bin/bash: 第 0 行: fg: 无任务控制
复制代码
错误。hadoop2.2.0不支持windows提交到linux hadoop集群,网上搜索发现这是hadoop的bug。
2.提交任务的主机和hadoop集群主机名需要在hosts相互配置。
3.因为使用程序提交是使用yarn-client方式,所以必须像上面脚本那样设置环境变量SPARK_JAR 和 SPARK_YARN_APP_JAR
比如我的设置为向提交任务主机~/.bashrc里面添加:
export SPARK_JAR=
export SPARK_YARN_APP_JAR=
复制代码
file:// 表明是本地文件,如果使用hdfs上的文件将file://替换为hdfs://主机名:端口号。建议使用hdfs来引用 spark-assembly-0.9.0-incubating-hadoop2.2.0.jar,因为这个文件比较大,如果使用file://每次提交任务都需要上传这个jar到各个集群,很慢。
其中SPARK_JAR是${SPARK_HOME}/assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar
SPARK_YARN_APP_JAR是自己程序打的jar包,包含自己的测试程序。
4.程序中加入hadoop、yarn、依赖。
注意,如果引入了hbase依赖,需要这样配置
dependency
groupIdorg.apache.hbase/groupId
artifactIdhbase-thrift/artifactId
version${hbase.version}/version
exclusions
exclusion
groupIdorg.apache.hadoop/groupId
artifactIdhadoop-mapreduce-client-jobclient/artifactId
/exclusion
exclusion
groupIdorg.apache.hadoop/groupId
artifactIdhadoop-client/artifactId
/exclusion
/exclusions
/dependency
复制代码
然后再加入
dependency
groupIdorg.ow2.asm/groupId
artifactIdasm-all/artifactId
version4.0/version
/dependency
复制代码
否则会报错:
IncompatibleClassChangeError has interface org.objectweb.asm.ClassVisitor as super class
复制代码
异常是因为Hbase jar hadoop-mapreduce-client-jobclient.jar里面使用到了asm3.1 而spark需要的是asm-all-4.0.jar
5. hadoop conf下的*-site.xml需要复制到提交主机的classpath下,或者说maven项目resources下面。
6.编写程序
代码示例:
package com.sdyc.ndspark.sys;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* Created with IntelliJ IDEA.
* User: zarchary
* Date: 14-1-19
* Time: 下午6:23
* To change this template use File | Settings | File Templates.
*/
public class ListTest {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("listTest");
//使用yarn模式提交
sparkConf.setMaster("yarn-client");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
ListString listA = new ArrayListString();
listA.add("a");
listA.add("a");
listA.add("b");
listA.add("b");
listA.add("b");
listA.add("c");
listA.add("d");
JavaRDDString letterA = sc.parallelize(listA);
JavaPairRDDString, Integer letterB = letterA.map(new PairFunctionString, String, Integer() {
@Override
public Tuple2String, Integer call(String s) throws Exception {
return new Tuple2String, Integer(s, 1);
}
});
letterB = letterB.reduceByKey(new Function2Integer, Integer, Integer() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
//颠倒顺序
JavaPairRDDInteger, String letterC = letterB.map(new PairFunctionTuple2String, Integer, Integer, String() {
@Override
public Tuple2Integer, String call(Tuple2String, Integer stringIntegerTuple2) throws Exception {
return new Tuple2Integer, String(stringIntegerTuple2._2, stringIntegerTuple2._1);
}
});
JavaPairRDDInteger, ListString letterD = letterC.groupByKey();
// //false说明是降序
JavaPairRDDInteger, ListString letterE = letterD.sortByKey(false);
System.out.println("========" + letterE.collect());
System.exit(0);
}
}
复制代码
代码中master设置为yar-client表明了是使用提交到yarn.
关于spark需要依赖的jar的配置可以参考我的博客spark安装和远程调用。
以上弄完之后就可以运行程序了。
运行后会看到yarn的ui界面出现:
正在执行的过程中会发现hadoop yarn 有的nodemanage会有下面这个进程:
13247 org.apache.spark.deploy.yarn.WorkerLauncher
复制代码
这是spark的工作进程。
如果接收到异常为:
WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
复制代码
出现这个错误是因为提交任务的节点不能和spark工作节点交互,因为提交完任务后提交任务节点上会起一个进程,展示任务进度,大多端口为4044,工作节点需要反馈进度给该该端口,所以如果主机名或者IP在hosts中配置不正确,就会报
WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory错误。
所以请检查主机名和IP是否配置正确。
我自己的理解为,程序提交任务到yarn后,会上传SPARK_JAR和SPARK_YARN_APP_JAR到hadoop节点, yarn根据任务情况来分配资源,在nodemanage节点上来启动org.apache.spark.deploy.yarn.WorkerLauncher工作节点来执行spark任务,执行完成后退出。
spark处理数据如何用服务器内存
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
拓展资料:Spark是一种安全的、经正式定义的编程语言,被设计用来支持一些安全或商业集成为关键因素的应用软件的设计。其通过运行用户定义的main函数,在集群上执行各种并发操作和计算Spark提供的最主要的抽象,Spark的正式和明确的定义使得多种静态分析技术在Spark源代码的应用中成为可能。