本文作者:admin

spark服务器相关的博客内容(spark服务器相关的博客内容有哪些)

admin 2022年12月12日 09:09:08 3

本文目录一览:

关于Spark有哪些大牛们的博客

自己整理的,方便自己使用吧。

Intel @邵赛赛 的博客 jerryshao.me/ 他是早期Spark contributor之一

盛利:Spark SQL 源码分析系列文章

许鹏:徽沪一郎 - 博客园 博主的新书《Spark源码剖析》快出了吧 :-)

[1]fxjwind - 博客园

[2]张包峰的博客

[3]Spark - anzhsoft的技术专栏

另外有几个业界著名的公司博客

[1]Databricks Blog

[2]Spark Archives

[3]mapr.com/blog/big-data-

spark dataframe可以干什么

DataFrame是Spark SQL的一种编程抽象,它是一张分布式的表,是数据类型为Row的DataSet,可以简单认为:DataFrame是DataSet[Row]的别名。

你说我们得到了一张表可以做些什么呢?那些数据库的操作都可以,比如增删改查,联结操作等等,都是可以的。

推荐你去Spark官网查看官方文档,然后结合官方文档、博客还有相关书籍,这样学起来比较快。

如何构建第一个Spark项目代码

操作系统

Window7/Mac

IDE

IntelliJ IDEA Community Edition 14.1.6

下载地址

JDK 1.8.0_65

下载地址

Scala 2.11.7

下载地址

其它环境

Spark:1.4.1

下载地址

Hadoop Yarn:Hadoop 2.5.0-cdh5.3.2

IDE项目创建

新建一个项目

New Project

使用Maven模型创建一个Scala项目

填写自己的GroupId、ArtifactId,Version不需要修改,Maven会根据GroupId生成相应的目录结构,GroupId的取值一般为a.b.c 结构,ArtifactId为项目名称。之后点击next,填写完项目名称和目录,点击finish就可以让maven帮你创建Scala项目

项目创建完成后,目录结构如下

4.为项目添加JDK以及Scala SDK

点击File-Project Structure,在SDKS和Global Libraries中为项目配置环境。

至此整个项目结构、项目环境都搭建好了

编写主函数

主函数的编写在 projectName/src/main/scala/…/下完成,如果按照上述步骤完成代码搭建,将在目录最后发现

MyRouteBuild

MyRouteMain

这两个文件为模块文件,删除MyRouteBuild,重命名MyRouteMain为DirectKafkaWordCount。这里,我使用Spark Streaming官方提供的一个代码为实例代码,代码如下

package org.apache.spark.examples.streaming

import kafka.serializer.StringDecoder

import org.apache.spark.streaming._

import org.apache.spark.streaming.kafka._

import org.apache.spark.SparkConf

object DirectKafkaWordCount {

def main(args: Array[String]) {

if (args.length 2) {

System.err.println("...")

System.exit(1)

}

//StreamingExamples.setStreamingLogLevels()

val Array(brokers, topics) = args

val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")

val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create direct kafka stream with brokers and topics

val topicsSet = topics.split(",").toSet

val kafkaParams = Map[String, String]("metadata.broker.list" - brokers)

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

ssc, kafkaParams, topicsSet)

// Get the lines, split them into words, count the words and print

val lines = messages.map(_._2)

val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x = (x, 1L)).reduceByKey(_ + _)

wordCounts.print()

// Start the computation

ssc.start()

ssc.awaitTermination()

}

}

将代码最上面的package org.apache.spark.examples.streaming,替换为DirectKafkaWordCount里的package部分即可。并覆盖DirectKafkaWordCount文件。

至此Spark处理代码已经编写完成。

修改pom.xml,为项目打包做准备

pom.xml中编写了整个项目的依赖关系,这个项目中我们需要导入一些Spark Streaming相关的包。

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-core_2.10/artifactId

version1.4.1/version

/dependency

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-streaming-kafka_2.10/artifactId

version1.4.1/version

/dependency

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-streaming_2.10/artifactId

version1.4.1/version

/dependency

!-- scala --

dependency

groupIdorg.scala-lang/groupId

artifactIdscala-library/artifactId

version2.10.4/version

/dependency

除此之外,如果需要把相关依赖打包到最终JAR包中,需要在pom.xml的bulid标签中写入以下配置:

plugins

!-- Plugin to create a single jar that includes all dependencies --

plugin

artifactIdmaven-assembly-plugin/artifactId

version2.4/version

configuration

descriptorRefs

descriptorRefjar-with-dependencies/descriptorRef

/descriptorRefs

/configuration

executions

execution

idmake-assembly/id

phasepackage/phase

goals

goalsingle/goal

/goals

/execution

/executions

/plugin

plugin

groupIdorg.apache.maven.plugins/groupId

artifactIdmaven-compiler-plugin/artifactId

version2.0.2/version

configuration

source1.7/source

target1.7/target

/configuration

/plugin

plugin

groupIdnet.alchim31.maven/groupId

artifactIdscala-maven-plugin/artifactId

executions

execution

idscala-compile-first/id

phaseprocess-resources/phase

goals

goaladd-source/goal

goalcompile/goal

/goals

/execution

execution

idscala-test-compile/id

phaseprocess-test-resources/phase

goals

goaltestCompile/goal

/goals

/execution

/executions

/plugin

/plugins

pom.xml文件修改完成后,即可开始maven打包,操作如图:

点击右侧弹出窗口的Execute Maven Goal,在command line中输入clean package

Spark作业提交

在项目projectname/target目录下即可找到两个jar包,其中一个仅包含Scala代码,另一个包含所有依赖的包。

将jar包导到Spark服务器,运行Spark作业,运行操作如下

../bin/spark-submit –master yarn-client –jars ../lib/kafka_2.10-0.8.2.1.jar –class huochen.spark.example.DirectKafkaWordCount sparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-broker topic

利用spark-submit把任务提交到Yarn集群,即可看到运行结果。

如何用spark stream 收集电脑运行日志

如何收集SparkSteaming运行日志实时进入kafka中

我是攻城师

用过sparkstreaming的人都知道,当使用sparkstreaming on yarn模式的时候,如果我们想查看系统运行的log,是没法直接看的,就算能看也只是一部分。

这里的log分:

(1)Spark本身运行的log

(2)代码里面业务产生的log

spark on yarn模式,如果你的Hadoop集群有100台,那么意味着你的sparkstreaming的log有可能会随机分布在100台中,你想查看log必须登录上每台机器上,一个个查看,如果通过Hadoop的8088页面查看,你也得打开可能几十个页面才能看到所有的log,那么问题来了?

能不能将这个job运行所有的log统一收集到某一个目录里面呢? 如果收集到一起的话排查log就非常方便了。

答案是很遗憾,在sparkstreaming里面没法做到,因为sparkstreaming程序永远不停机,就算你开启hadoop的log聚合也没用,只有当sparkstreaming程序停掉,hadoop的log聚合才能把所有的log收集到一个目录里面,所以其他的非sparkstreaming程序,比如MR,Spark 运行完后,如果开启log聚合,hadoop会负责把运行在各个节点上的log给统一收集到HDFS上,这样的话我们查看log就非常方便了。

现在的问题是sparkstreaming不能停机,那么还能集中收集log到指定的地方吗?答案是可以的,我们使用log4j收集日志然后异步发送至kafka里面,最后再通过logstash收集kafka里面的日志进入es即可,这样一条龙服务打通之后,出现任何异常都可以非常快和方便的在es中排查问题,效率**提升。至于使用logstash从kafka收集到es里面,不是本文的重点,有兴趣的参考散仙前面的文章:****42。

下面会介绍下如何使用:

streaming项目中的log4j使用的是apache log4j

sparkstreaming项目可以单独提交某个job的log4j文件,这样就能定制每个job的log输出格式,如果提交的时候不提交log4j文件,那么默认用的是spark安装目录下面的log4j文件。 看下我们log4j文件的内容:

最后看下提交脚本:

注意上面提交脚本中,/opt/bigdata/jars/spark/这个路径引用的jar包,必须在每台hadoop机器上都要存在,sparkstreaming运行过程中,会从本地加载jar包,此外log4j.properties文件以及参数里面--jars 后面的依赖jar 可以在提交机器上放一份即可,不需要每台机器上都存放。

提交任务后,在kafka的节点上执行消费者命令就能看到对应的log输出: 执行命令:

kafka-console-consumer --zookeeper 192.168.201.5:2181 --topic kp_diag_log

收集到的log内容如下:

至此,我们的log就统一收集成功了,后续我们可以把log从kafka导入到es中,就可以任意分析和查询了。

这里需要注意一点,sparkstreaming运行时候,系统本身也有大量的log,如果把这个系统log也收集到kafka里面本身的量是非常大的,而且好多信息不重要,其实 我们只需要关注业务重点log即可,主要是WARN+ERROR级别的,调试的时候可以把info级别打开,代码里重点关注的log都放在warn级别,异常什么的放在ERROR即可 这样排查问题时候也容易而且了避免了大量log的产生从应用本身性能的影响。

如何使用spark将程序提交任务到yarn-Spark-about云开发

使用脚本提交

1.使用spark脚本提交到yarn,首先需要将spark所在的主机和hadoop集群之间hosts相互配置(也就是把spark主机的ip和主机名配置到hadoop所有节点的/etc/hosts里面,再把集群所有节点的ip和主机名配置到spark所在主机的/etc/hosts里面)。

2.然后需要把hadoop目录etc/hadoop下面的*-sit.xml复制到${SPARK_HOME}的conf下面.

3.确保hadoop集群配置了 HADOOP_CONF_DIR or YARN_CONF_DIR

1.yarn-standalone方式提交到yarn

在${SPARK_HOME}下面执行:

SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \

./bin/spark-class org.apache.spark.deploy.yarn.Client \

--jar ./examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \

--class org.apache.spark.examples.SparkPi \

--args yarn-standalone \

--num-workers 3 \

--master-memory 2g \

--worker-memory 2g \

--worker-cores 1

复制代码

2. yarn-client 方式提交到yarn

在${SPARK_HOME}下面执行:

SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \

SPARK_YARN_APP_JAR=examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \

./bin/run-example org.apache.spark.examples.SparkPi yarn-client

复制代码

二、使用程序提交

1.必须使用linux主机提交任务,使用windows提交到linux hadoop集群会报

org.apache.hadoop.util.Shell$ExitCodeException: /bin/bash: 第 0 行: fg: 无任务控制

复制代码

错误。hadoop2.2.0不支持windows提交到linux hadoop集群,网上搜索发现这是hadoop的bug。

2.提交任务的主机和hadoop集群主机名需要在hosts相互配置。

3.因为使用程序提交是使用yarn-client方式,所以必须像上面脚本那样设置环境变量SPARK_JAR 和 SPARK_YARN_APP_JAR

比如我的设置为向提交任务主机~/.bashrc里面添加:

export SPARK_JAR=

export SPARK_YARN_APP_JAR=

复制代码

file:// 表明是本地文件,如果使用hdfs上的文件将file://替换为hdfs://主机名:端口号。建议使用hdfs来引用 spark-assembly-0.9.0-incubating-hadoop2.2.0.jar,因为这个文件比较大,如果使用file://每次提交任务都需要上传这个jar到各个集群,很慢。

其中SPARK_JAR是${SPARK_HOME}/assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar

SPARK_YARN_APP_JAR是自己程序打的jar包,包含自己的测试程序。

4.程序中加入hadoop、yarn、依赖。

注意,如果引入了hbase依赖,需要这样配置

dependency

groupIdorg.apache.hbase/groupId

artifactIdhbase-thrift/artifactId

version${hbase.version}/version

exclusions

exclusion

groupIdorg.apache.hadoop/groupId

artifactIdhadoop-mapreduce-client-jobclient/artifactId

/exclusion

exclusion

groupIdorg.apache.hadoop/groupId

artifactIdhadoop-client/artifactId

/exclusion

/exclusions

/dependency

复制代码

然后再加入

dependency

groupIdorg.ow2.asm/groupId

artifactIdasm-all/artifactId

version4.0/version

/dependency

复制代码

否则会报错:

IncompatibleClassChangeError has interface org.objectweb.asm.ClassVisitor as super class

复制代码

异常是因为Hbase jar hadoop-mapreduce-client-jobclient.jar里面使用到了asm3.1 而spark需要的是asm-all-4.0.jar

5. hadoop conf下的*-site.xml需要复制到提交主机的classpath下,或者说maven项目resources下面。

6.编写程序

代码示例:

package com.sdyc.ndspark.sys;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

import java.util.ArrayList;

import java.util.List;

/**

* Created with IntelliJ IDEA.

* User: zarchary

* Date: 14-1-19

* Time: 下午6:23

* To change this template use File | Settings | File Templates.

*/

public class ListTest {

public static void main(String[] args) throws Exception {

SparkConf sparkConf = new SparkConf();

sparkConf.setAppName("listTest");

//使用yarn模式提交

sparkConf.setMaster("yarn-client");

JavaSparkContext sc = new JavaSparkContext(sparkConf);

ListString listA = new ArrayListString();

listA.add("a");

listA.add("a");

listA.add("b");

listA.add("b");

listA.add("b");

listA.add("c");

listA.add("d");

JavaRDDString letterA = sc.parallelize(listA);

JavaPairRDDString, Integer letterB = letterA.map(new PairFunctionString, String, Integer() {

@Override

public Tuple2String, Integer call(String s) throws Exception {

return new Tuple2String, Integer(s, 1);

}

});

letterB = letterB.reduceByKey(new Function2Integer, Integer, Integer() {

public Integer call(Integer i1, Integer i2) {

return i1 + i2;

}

});

//颠倒顺序

JavaPairRDDInteger, String letterC = letterB.map(new PairFunctionTuple2String, Integer, Integer, String() {

@Override

public Tuple2Integer, String call(Tuple2String, Integer stringIntegerTuple2) throws Exception {

return new Tuple2Integer, String(stringIntegerTuple2._2, stringIntegerTuple2._1);

}

});

JavaPairRDDInteger, ListString letterD = letterC.groupByKey();

// //false说明是降序

JavaPairRDDInteger, ListString letterE = letterD.sortByKey(false);

System.out.println("========" + letterE.collect());

System.exit(0);

}

}

复制代码

代码中master设置为yar-client表明了是使用提交到yarn.

关于spark需要依赖的jar的配置可以参考我的博客spark安装和远程调用。

以上弄完之后就可以运行程序了。

运行后会看到yarn的ui界面出现:

正在执行的过程中会发现hadoop yarn 有的nodemanage会有下面这个进程:

13247 org.apache.spark.deploy.yarn.WorkerLauncher

复制代码

这是spark的工作进程。

如果接收到异常为:

WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

复制代码

出现这个错误是因为提交任务的节点不能和spark工作节点交互,因为提交完任务后提交任务节点上会起一个进程,展示任务进度,大多端口为4044,工作节点需要反馈进度给该该端口,所以如果主机名或者IP在hosts中配置不正确,就会报

WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory错误。

所以请检查主机名和IP是否配置正确。

我自己的理解为,程序提交任务到yarn后,会上传SPARK_JAR和SPARK_YARN_APP_JAR到hadoop节点, yarn根据任务情况来分配资源,在nodemanage节点上来启动org.apache.spark.deploy.yarn.WorkerLauncher工作节点来执行spark任务,执行完成后退出。

spark处理数据如何用服务器内存

RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

拓展资料:Spark是一种安全的、经正式定义的编程语言,被设计用来支持一些安全或商业集成为关键因素的应用软件的设计。其通过运行用户定义的main函数,在集群上执行各种并发操作和计算Spark提供的最主要的抽象,Spark的正式和明确的定义使得多种静态分析技术在Spark源代码的应用中成为可能。

阅读
分享