本文作者:admin

sparksql服务化(spark sparksql)

admin 2022年12月19日 12:15:53 1

本文目录一览:

2019-03-05 SparkSQL集群性能调优 CheatSheet

0.买高性能机器,增加节点

1.设置磁盘文件预读值大小为16384,使用linux命令sparksql服务化

echo 16384 /sys/block/{磁盘名}/queue/read_ahead_kb

2. Spark 任务序列化只支持JavaSerializer,数据序列化支持JavaSerializer和 KryoSerializer 。KryoSerializer能达到JavaSerializer的十倍。

3.在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置项中添加参数:" -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ",如果频繁出现Full GC,需要优化GC。把RDD做Cache操作,通过日志查看RDD在内存中的大小,如果数据太大,需要改变RDD的存储级别来优化。

4.一般并行度设置为集群CPU总和的2-3倍

5.大表和小表做join操作时可以把小表Broadcast到各个节点,从而就可以把join操作转变成普通的操作,减少sparksql服务化了shuffle操作。

6. 合理设计DAG,减少shuffle  //TODO

7.使用 mapPartitions 可以更灵活地操作数据,例如对一个很大的数据求TopN,当N不是很大时,可以先使用mapPartitions对每个partition求TopN,collect结果到本地之后再做排序取TopN。这样相比直接对全量数据做排序取TopN效率要高很多。

8.当之前的操作有很多filter时,使用 coalesce 减少空运行的任务数量

9.当任务数过大时候Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时需要对数据重新进行分区,使用 repartition 。

10.配置多个磁盘给 localDir ,shuffle时写入数据速度增快

11. 别collect大数据量,数据会回到driver端,容易OOM。非要collect,请配置 spark.sql.bigdata.thriftServer.useHdfsCollect 为true,会存在hdfs再读

12.尽量用reduceByKey,会在Map端做本地聚合

13. broadcase set/map而不是Iterator, set/map 查询效率O(1) ,iteratorO(n)

14. 数据发生倾斜,repartition大法 ,查出key,salt it

15.使用Hash Shuffle时,通过设置 spark.shuffle.consolidateFiles 为true,来合并shuffle中间文件,减少shuffle文件的数量,减少文件IO操作以提升性能

16.Spark SQL 小表join,把小表broadcast出去。配置 spark.sql.autoBroadcastJoinThreshold 和 spark.sql.bigdata.useExecutorBroadcast 。小表在join 右端。

17.SparkSQL数据倾斜,配置 spark.sql.planner.skewJoin 和 spark.sql.planner.skewJoin.threshold

18. SparkSQL 小文件,配置 spark.sql.small.file.combine 和  spark.sql.small.file.split.size

企业常用sparkcore还是sparksql

企业最常用的是sparkcore和sparksql协同处理的方式。

企业是基于微批处理的流式计算引擎,通常是利用sparkcore与sparksql一起来处理数据。在企业实时处理架构中,通常将sparkstreaming和kafka集成作为整个大数据处理架构的核心环节之一。

SparkSQL构建在SparkCore之上,专门用来处理结构化数据(不仅仅是SQL)。即SparkSQL是SparkCore封装而来的。SparkSQL在SparkCore的基础上针对结构化数据处理进行很多优化和改进,简单来讲,SparkSQL支持很多种结构化数据源,可以让你跳过复杂的读取过程,轻松从各种数据源中读取数据。当你使用SQL查询这些数据源中的数据并且只用到了一部分字段时,SparkSQL可以智能地只扫描这些用到的字段,而不是像SparkContexthadoopFile中那样简单粗暴地扫描全部数据。

Spark SQL 到底怎么搭建起来

一般spark sql用于访问hive集群的表数据吧?

我们的spark是访问hive集群的,步骤还是很简单的,大致如下:

1)安装spark时需要将hive-site.xml,yarn-site.xml,hdfs-site.xml都拷贝到spark/conf中(yarn-site.xml是因为我们是spark on yarn)

2)编程时用HiveContext,调用sql(...)就好了,如:

val hc = new HiveContext(sc)

hc.sql( "select ..." ) 这里的sql语句自己发挥吧~

不过spark sql稳定性不高,写复杂语句时partition和优化策略不太合理,小数据量玩一下就好(如spark streaming中使用也还可以),大数据量暂时不建议用~

Spark SQL(十):Hive On Spark

Hive是目前大数据领域sparksql服务化,事实上sparksql服务化的SQL标准。其底层默认是基于MapReduce实现的,但是由于MapReduce速度实在比较慢,因此这几年,陆续出来了新的SQL查询引擎,包括Spark SQL,Hive On Tez,Hive On Spark等。

Spark SQL与Hive On Spark是不一样的。Spark SQL是Spark自己研发出来的针对各种数据源,包括Hive、JSON、Parquet、JDBC、RDD等都可以执行查询的,一套基于Spark计算引擎的查询引擎。因此它是Spark的一个项目,只不过提供了针对Hive执行查询的工功能而已,适合在一些使用Spark技术栈的大数据应用类系统中使用。

而Hive On Spark,是Hive的一个项目,它是将Spark作为底层的查询引擎(不通过MapReduce作为唯一的查询引擎)。Hive On Spark,只适用于Hive,在可预见的未来,很有可能Hive默认的底层引擎就从MapReduce切换为Spark了;适合于将原有的Hive数据仓库以及数据统计分析替换为Spark引擎,作为全公司通用的大数据统计分析引擎。

Hive On Spark做了一些优化:

1、Map Join

Spark SQL默认对join是支持使用broadcast机制将小表广播到各个节点上,以进行join的。但是问题是,这会给Driver和Worker带来很大的内存开销。因为广播的数据要一直保留在Driver内存中。所以目前采取的是,类似乎MapReduce的Distributed Cache机制,即提高HDFS replica factor的复制因子,以让数据在每个计算节点上都有一个备份,从而可以在本地进行数据读取。

2、Cache Table

对于某些需要对一张表执行多次操作的场景,Hive On Spark内部做了优化,即将要多次操作的表cache到内存中,以便于提升性能。但是这里要注意,并不是对所有的情况都会自动进行cache。所以说,Hive On Spark还有很多不完善的地方。

Hive QL语句 =

语法分析 = AST =

生成逻辑执行计划 = Operator Tree =

优化逻辑执行计划 = Optimized Operator Tree =

生成物理执行计划 = Task Tree =

优化物理执行计划 = Optimized Task Tree =

执行优化后的Optimized Task Tree

如何使用 Spark SQL

一、启动方法

/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2

注:/data/spark-1.4.0-bin-cdh4/为spark的安装路径

/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看启动选项

--master MASTER_URL 指定master url

--executor-memory MEM 每个executor的内存,默认为1G

--total-executor-cores NUM 所有executor的总核数

-e quoted-query-string 直接执行查询SQL

-f filename 以文件方式批量执行SQL

二、Spark sql对hive支持的功能

1、查询语句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY

2、hive操作运算:

1) 关系运算:= ==, , , , =, =

2) 算术运算:+, -, *, /, %

3) 逻辑运算:AND, , OR, ||

4) 复杂的数据结构

5) 数学函数:(sign, ln, cos, etc)

6) 字符串函数:

3、 UDF

4、 UDAF

5、 用户定义的序列化格式

6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN

7、 unions操作:

8、 子查询: SELECT col FROM ( SELECT a + b AS col from t1) t2

9、Sampling

10、 Explain

11、 分区表

12、 视图

13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE

14、 支持的数据类型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT

三、Spark sql 在客户端编程方式进行查询数据

1、启动spark-shell

./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2

2、编写程序

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("../examples/src/main/resources/people.json")

查看所有数据:df.show()

查看表结构:df.printSchema()

只看name列:df.select("name").show()

对数据运算:df.select(df("name"), df("age") + 1).show()

过滤数据:df.filter(df("age") 21).show()

分组统计:df.groupBy("age").count().show()

1、查询txt数据

import sqlContext.implicits._

case class Person(name: String, age: Int)

val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p = Person(p(0), p(1).trim.toInt)).toDF()

people.registerTempTable("people")

val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age = 13 AND age = 19")

2、parquet文件

val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")

3、hdfs文件

val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")

4、保存查询结果数据

val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")

df.select("name", "favorite_color").write.save("namesAndFavColors.parquet“)

四、Spark sql性能调优

缓存数据表:sqlContext.cacheTable("tableName")

取消缓存表:sqlContext.uncacheTable("tableName")

spark.sql.inMemoryColumnarStorage.compressedtrue 当设置为true时,Spark SQL将为基于数据统计信息的每列自动选择一个压缩算法。

spark.sql.inMemoryColumnarStorage.batchSize 10000 柱状缓存的批数据大小。更大的批数据可以提高内存的利用率以及压缩效率,但有OOMs的风险

阅读
分享