本文目录一览:
- 1、2019-03-05 SparkSQL集群性能调优 CheatSheet
- 2、企业常用sparkcore还是sparksql
- 3、Spark SQL 到底怎么搭建起来
- 4、Spark SQL(十):Hive On Spark
- 5、如何使用 Spark SQL
2019-03-05 SparkSQL集群性能调优 CheatSheet
0.买高性能机器,增加节点
1.设置磁盘文件预读值大小为16384,使用linux命令sparksql服务化:
echo 16384 /sys/block/{磁盘名}/queue/read_ahead_kb
2. Spark 任务序列化只支持JavaSerializer,数据序列化支持JavaSerializer和 KryoSerializer 。KryoSerializer能达到JavaSerializer的十倍。
3.在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:" -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ",如果频繁出现Full GC,需要优化GC。把RDD做Cache操作,通过日志查看RDD在内存中的大小,如果数据太大,需要改变RDD的存储级别来优化。
4.一般并行度设置为集群CPU总和的2-3倍
5.大表和小表做join操作时可以把小表Broadcast到各个节点,从而就可以把join操作转变成普通的操作,减少sparksql服务化了shuffle操作。
6. 合理设计DAG,减少shuffle //TODO
7.使用 mapPartitions 可以更灵活地操作数据,例如对一个很大的数据求TopN,当N不是很大时,可以先使用mapPartitions对每个partition求TopN,collect结果到本地之后再做排序取TopN。这样相比直接对全量数据做排序取TopN效率要高很多。
8.当之前的操作有很多filter时,使用 coalesce 减少空运行的任务数量
9.当任务数过大时候Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时需要对数据重新进行分区,使用 repartition 。
10.配置多个磁盘给 localDir ,shuffle时写入数据速度增快
11. 别collect大数据量,数据会回到driver端,容易OOM。非要collect,请配置 spark.sql.bigdata.thriftServer.useHdfsCollect 为true,会存在hdfs再读
12.尽量用reduceByKey,会在Map端做本地聚合
13. broadcase set/map而不是Iterator, set/map 查询效率O(1) ,iteratorO(n)
14. 数据发生倾斜,repartition大法 ,查出key,salt it
15.使用Hash Shuffle时,通过设置 spark.shuffle.consolidateFiles 为true,来合并shuffle中间文件,减少shuffle文件的数量,减少文件IO操作以提升性能
16.Spark SQL 小表join,把小表broadcast出去。配置 spark.sql.autoBroadcastJoinThreshold 和 spark.sql.bigdata.useExecutorBroadcast 。小表在join 右端。
17.SparkSQL数据倾斜,配置 spark.sql.planner.skewJoin 和 spark.sql.planner.skewJoin.threshold
18. SparkSQL 小文件,配置 spark.sql.small.file.combine 和 spark.sql.small.file.split.size
企业常用sparkcore还是sparksql
企业最常用的是sparkcore和sparksql协同处理的方式。
企业是基于微批处理的流式计算引擎,通常是利用sparkcore与sparksql一起来处理数据。在企业实时处理架构中,通常将sparkstreaming和kafka集成作为整个大数据处理架构的核心环节之一。
SparkSQL构建在SparkCore之上,专门用来处理结构化数据(不仅仅是SQL)。即SparkSQL是SparkCore封装而来的。SparkSQL在SparkCore的基础上针对结构化数据处理进行很多优化和改进,简单来讲,SparkSQL支持很多种结构化数据源,可以让你跳过复杂的读取过程,轻松从各种数据源中读取数据。当你使用SQL查询这些数据源中的数据并且只用到了一部分字段时,SparkSQL可以智能地只扫描这些用到的字段,而不是像SparkContexthadoopFile中那样简单粗暴地扫描全部数据。
Spark SQL 到底怎么搭建起来
一般spark sql用于访问hive集群的表数据吧?
我们的spark是访问hive集群的,步骤还是很简单的,大致如下:
1)安装spark时需要将hive-site.xml,yarn-site.xml,hdfs-site.xml都拷贝到spark/conf中(yarn-site.xml是因为我们是spark on yarn)
2)编程时用HiveContext,调用sql(...)就好了,如:
val hc = new HiveContext(sc)
hc.sql( "select ..." ) 这里的sql语句自己发挥吧~
不过spark sql稳定性不高,写复杂语句时partition和优化策略不太合理,小数据量玩一下就好(如spark streaming中使用也还可以),大数据量暂时不建议用~
Spark SQL(十):Hive On Spark
Hive是目前大数据领域sparksql服务化,事实上sparksql服务化的SQL标准。其底层默认是基于MapReduce实现的,但是由于MapReduce速度实在比较慢,因此这几年,陆续出来了新的SQL查询引擎,包括Spark SQL,Hive On Tez,Hive On Spark等。
Spark SQL与Hive On Spark是不一样的。Spark SQL是Spark自己研发出来的针对各种数据源,包括Hive、JSON、Parquet、JDBC、RDD等都可以执行查询的,一套基于Spark计算引擎的查询引擎。因此它是Spark的一个项目,只不过提供了针对Hive执行查询的工功能而已,适合在一些使用Spark技术栈的大数据应用类系统中使用。
而Hive On Spark,是Hive的一个项目,它是将Spark作为底层的查询引擎(不通过MapReduce作为唯一的查询引擎)。Hive On Spark,只适用于Hive,在可预见的未来,很有可能Hive默认的底层引擎就从MapReduce切换为Spark了;适合于将原有的Hive数据仓库以及数据统计分析替换为Spark引擎,作为全公司通用的大数据统计分析引擎。
Hive On Spark做了一些优化:
1、Map Join
Spark SQL默认对join是支持使用broadcast机制将小表广播到各个节点上,以进行join的。但是问题是,这会给Driver和Worker带来很大的内存开销。因为广播的数据要一直保留在Driver内存中。所以目前采取的是,类似乎MapReduce的Distributed Cache机制,即提高HDFS replica factor的复制因子,以让数据在每个计算节点上都有一个备份,从而可以在本地进行数据读取。
2、Cache Table
对于某些需要对一张表执行多次操作的场景,Hive On Spark内部做了优化,即将要多次操作的表cache到内存中,以便于提升性能。但是这里要注意,并不是对所有的情况都会自动进行cache。所以说,Hive On Spark还有很多不完善的地方。
Hive QL语句 =
语法分析 = AST =
生成逻辑执行计划 = Operator Tree =
优化逻辑执行计划 = Optimized Operator Tree =
生成物理执行计划 = Task Tree =
优化物理执行计划 = Optimized Task Tree =
执行优化后的Optimized Task Tree
如何使用 Spark SQL
一、启动方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
注:/data/spark-1.4.0-bin-cdh4/为spark的安装路径
/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看启动选项
--master MASTER_URL 指定master url
--executor-memory MEM 每个executor的内存,默认为1G
--total-executor-cores NUM 所有executor的总核数
-e quoted-query-string 直接执行查询SQL
-f filename 以文件方式批量执行SQL
二、Spark sql对hive支持的功能
1、查询语句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作运算:
1) 关系运算:= ==, , , , =, =
2) 算术运算:+, -, *, /, %
3) 逻辑运算:AND, , OR, ||
4) 复杂的数据结构
5) 数学函数:(sign, ln, cos, etc)
6) 字符串函数:
3、 UDF
4、 UDAF
5、 用户定义的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查询: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分区表
12、 视图
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE
14、 支持的数据类型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT
三、Spark sql 在客户端编程方式进行查询数据
1、启动spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、编写程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有数据:df.show()
查看表结构:df.printSchema()
只看name列:df.select("name").show()
对数据运算:df.select(df("name"), df("age") + 1).show()
过滤数据:df.filter(df("age") 21).show()
分组统计:df.groupBy("age").count().show()
1、查询txt数据
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p = Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age = 13 AND age = 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件
val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查询结果数据
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet“)
四、Spark sql性能调优
缓存数据表:sqlContext.cacheTable("tableName")
取消缓存表:sqlContext.uncacheTable("tableName")
spark.sql.inMemoryColumnarStorage.compressedtrue 当设置为true时,Spark SQL将为基于数据统计信息的每列自动选择一个压缩算法。
spark.sql.inMemoryColumnarStorage.batchSize 10000 柱状缓存的批数据大小。更大的批数据可以提高内存的利用率以及压缩效率,但有OOMs的风险