1、DataFrame 一个以命名列组织的分布式数据集。概念上相当于关系数据库中一张表或在R / Python中的data frame数据结构,但DataFrame有丰富的优化。在spark 1.3之前,核心的新类型为RDD-schemaRDD,现改为DataFrame。spark 通过DataFrame操作大量的数据源,包
1、DataFrame
一个以命名列组织的分布式数据集。概念上相当于关系数据库中一张表或在R / Python中的data frame数据结构,但DataFrame有丰富的优化。在spark 1.3之前,核心的新类型为RDD-schemaRDD,现改为DataFrame。spark 通过DataFrame操作大量的数据源,包括外部文件(如 json、avro、parquet、sequencefile 等等)、hive、关系数据库、cassandra等。
DataFrame与RDD区别:
RDD以record为单位,spark优化时无法洞悉record内部的细节,无法深度优化,限制sparkSQL性能的提升;DataFrame包含了每个record的metadata元数据信息,DataFrame的优化可以对列内部优化。
(1)DataFrame的创建
Spark中所有相关功能的入口点是SQLContext类或者它的子类,创建一个SQLContext的所有需要仅仅是一个SparkContext。
val sc:SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
除了一个基本的 SQLContext,你也能够创建一个 HiveContext,它支持基本 SQLContext 所支持功能的一个超集。它的额外功能包括用更完整的 HiveQL 分析器写查询去访问 HiveUDFs 的能力、 从 Hive 表读取数据的能力。用 HiveContext 你不需要一个已经存在的 Hive 开启,SQLContext 可用的数据源对 HiveContext 也可用。
使用SQLContext,应用可以从已存在的RDD、hive表或者数据源DataSources中创建DataFrame
示例:从本地 json文件创建
val df = sqlContext.jsonFile(“file:///home/hdfs/people.json”)
df.show()
age name
null Michael
30 Andy
19 Justin
df.printSchema()
|– age: long (nullable = true)
|– name: string (nullable = true)
(2)DataFrame的操作
DataFrame支持RDD的系列操作,可以对表进行过滤和进行多表关联
df.select(“name”).show()
name
Michael
Andy
Justin
df.select(df(“name”),df(“age”)+1).show()
name (age + 1)
Michael null
Andy 31
Justin 20
df.filter(df(“age”)>21).select(“name”).show()
name
Andy
df.groupBy(“age”).count().show()
age count
null 1
19 1
30 1
表之间的连接,3个等号
df.join(df2,df(“name”) === df2(“name”),”left”).show()df.filter(“age > 30”)
.join(department, df(“deptId”) === department(“id”))
.groupBy(department(“name”), “gender”)
.agg(avg(df(“salary”)), max(df(“age”)))
2、SparkSQL中的数据源
Spark SQL支持通过SchemaRDD接口操作各种数据源。一个SchemaRDD能够作为一个一般的RDD被操作,也可以被注册为一个临时的表。注册一个SchemaRDD为一个表就可以允许你在其数据上运行SQL查询。
加载数据为SchemaRDD的多种数据源,包括RDDs、parquent文件(列式存储)、JSON数据集、Hive表,以下主要介绍将RDDs转换为schemaRDD的两种方法
(1)利用反射推断模式
使用反射来推断包含特定对象类型的RDD的模式(schema)。适用于写spark程序的同时,已经知道了模式,使用反射可以使得代码简洁。结合样本的名字,通过反射读取,作为列的名字。这个RDD可以隐式转化为一个SchemaRDD,然后注册为一个表。表可以在后续的sql语句中使用。
<code class=" hljs avrasm">val sqlContext = new org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.spark</span><span class="hljs-preprocessor">.sql</span><span class="hljs-preprocessor">.SQLContext</span>(sc) import sqlContext<span class="hljs-preprocessor">.implicits</span>._ case class Person(name:String,age:Int) val people = sc<span class="hljs-preprocessor">.textFile</span>(<span class="hljs-string">"file:///home/hdfs/people.txt"</span>)<span class="hljs-preprocessor">.map</span>(_<span class="hljs-preprocessor">.split</span>(<span class="hljs-string">","</span>))<span class="hljs-preprocessor">.map</span>(p => Person(p(<span class="hljs-number">0</span>),p(<span class="hljs-number">1</span>)<span class="hljs-preprocessor">.trim</span><span class="hljs-preprocessor">.toInt</span>))<span class="hljs-preprocessor">.toDF</span>() people<span class="hljs-preprocessor">.registerTempTable</span>(<span class="hljs-string">"people"</span>) val teenagers = sqlContext<span class="hljs-preprocessor">.sql</span>(<span class="hljs-string">"SELECT name,age FROM people WHERE age>= 19 AND age <=30"</span>) teenagers<span class="hljs-preprocessor">.map</span>(t => <span class="hljs-string">"Name:"</span>+t(<span class="hljs-number">0</span>))<span class="hljs-preprocessor">.collect</span>()<span class="hljs-preprocessor">.foreach</span>(println) teenagers<span class="hljs-preprocessor">.map</span>(t => <span class="hljs-string">"Name:"</span> + t<span class="hljs-preprocessor">.getAs</span>[String](<span class="hljs-string">"name"</span>))<span class="hljs-preprocessor">.collect</span>()<span class="hljs-preprocessor">.foreach</span>(println) teenagers<span class="hljs-preprocessor">.map</span>(_<span class="hljs-preprocessor">.getValueMap</span>[Any](List(<span class="hljs-string">"name"</span>,<span class="hljs-string">"age"</span>)))<span class="hljs-preprocessor">.collect</span>()<span class="hljs-preprocessor">.foreach</span>(println)</code>
(2)编程指定模式
通过一个编程接口构造模式来实现,然后可在存在的RDDs上使用它。适用于当前样本模式未知
一个SchemaRDD可以通过三步来创建。
从原来的RDD创建一个行的RDD
创建由一个StructType表示的模式与第一步创建的RDD的行结构相匹配
在行RDD上通过applySchema方法应用模式
<code class=" hljs avrasm">val people = sc<span class="hljs-preprocessor">.textFile</span>(<span class="hljs-string">"file:///home/hdfs/people.txt"</span>) val schemaString = <span class="hljs-string">"name age"</span> import org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.spark</span><span class="hljs-preprocessor">.sql</span><span class="hljs-preprocessor">.Row</span><span class="hljs-comment">;</span> import org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.spark</span><span class="hljs-preprocessor">.sql</span><span class="hljs-preprocessor">.types</span>.{StructType,StructField,StringType}<span class="hljs-comment">;</span> val schema = StructType(schemaString<span class="hljs-preprocessor">.split</span>(<span class="hljs-string">" "</span>)<span class="hljs-preprocessor">.map</span>(fieldName => StructField(fieldName,StringType,true))) val rowRDD = people<span class="hljs-preprocessor">.map</span>(_<span class="hljs-preprocessor">.split</span>(<span class="hljs-string">","</span>))<span class="hljs-preprocessor">.map</span>(p => Row(p(<span class="hljs-number">0</span>),p(<span class="hljs-number">1</span>)<span class="hljs-preprocessor">.trim</span>)) val peopleSchemaRDD = sqlContext<span class="hljs-preprocessor">.applySchema</span>(rowRDD,schema) peopleSchemaRDD<span class="hljs-preprocessor">.registerTempTable</span>(<span class="hljs-string">"people"</span>) val results = sqlContext<span class="hljs-preprocessor">.sql</span>(<span class="hljs-string">"SELECT name FROM people"</span>) //DataFrame <span class="hljs-keyword">and</span> support all the normal RDD operations results<span class="hljs-preprocessor">.map</span>(t => <span class="hljs-string">"Name:"</span>+t(<span class="hljs-number">0</span>))<span class="hljs-preprocessor">.collect</span>()<span class="hljs-preprocessor">.foreach</span>(println)</code>
结果输出
Name:Andy
Name:Justin
Name:JohnSmith
Name:Bob
3、性能调优
主要通过在内存中缓存数据或者设置实验选项来提高性能,降低工作负载
(1)在内存中缓存数据
Spark SQL可以通过调用sqlContext.cacheTable(“tableName”)方法来缓存使用柱状格式的表。然后,Spark将会仅仅浏览需要的列并且自动地压缩数据以减少内存的使用以及垃圾回收的压力。
也可以在SQLContext上使用setConf方法或者在用SQL时运行SET key=value命令来配置内存缓存。
(2)配置选项
可以通过spark.sql.shuffle.partitions、spark.sql.codegen等选项来调整查询执行的性能。
4、其他
Spark SQL也支持直接运行SQL查询的接口,不用写任何代码。在Spark目录运行下面的命令可以启动Spark SQL CLI。
./bin/spark-sql