Spark SQL和DataFrame的学习总结
1、DataFrame 一个以命名列组织的分布式数据集。概念上相当于关系数据库中一张表或在R / Python中的data frame数据结构,但DataFrame有丰富的优化。在spark 1.3之前,核心的新类型为RDD-schemaRDD,现改为DataFrame。spark 通过DataFrame操作大量的数据源,包
1、DataFrame
一个以命名列组织的分布式数据集。概念上相当于关系数据库中一张表或在R / Python中的data frame数据结构,但DataFrame有丰富的优化。在spark 1.3之前,核心的新类型为RDD-schemaRDD,现改为DataFrame。spark 通过DataFrame操作大量的数据源,包括外部文件(如 json、avro、parquet、sequencefile 等等)、hive、关系数据库、cassandra等。
DataFrame与RDD区别:
RDD以record为单位,spark优化时无法洞悉record内部的细节,无法深度优化,限制sparkSQL性能的提升;DataFrame包含了每个record的metadata元数据信息,DataFrame的优化可以对列内部优化。
(1)DataFrame的创建
Spark中所有相关功能的入口点是SQLContext类或者它的子类,创建一个SQLContext的所有需要仅仅是一个SparkContext。
val sc:SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
除了一个基本的 SQLContext,你也能够创建一个 HiveContext,它支持基本 SQLContext 所支持功能的一个超集。它的额外功能包括用更完整的 HiveQL 分析器写查询去访问 HiveUDFs 的能力、 从 Hive 表读取数据的能力。用 HiveContext 你不需要一个已经存在的 Hive 开启,SQLContext 可用的数据源对 HiveContext 也可用。
使用SQLContext,应用可以从已存在的RDD、hive表或者数据源DataSources中创建DataFrame
示例:从本地 json文件创建
val df = sqlContext.jsonFile(“file:///home/hdfs/people.json”)
df.show()
age name
null Michael
30 Andy
19 Justin
df.printSchema()
|– age: long (nullable = true)
|– name: string (nullable = true)
(2)DataFrame的操作
DataFrame支持RDD的系列操作,可以对表进行过滤和进行多表关联
df.select(“name”).show()
name
Michael
Andy
Justin
df.select(df(“name”),df(“age”)+1).show()
name (age + 1)
Michael null
Andy 31
Justin 20
df.filter(df(“age”)>21).select(“name”).show()
name
Andy
df.groupBy(“age”).count().show()
age count
null 1
19 1
30 1
表之间的连接,3个等号
df.join(df2,df(“name”) === df2(“name”),”left”).show()df.filter(“age > 30”)
.join(department, df(“deptId”) === department(“id”))
.groupBy(department(“name”), “gender”)
.agg(avg(df(“salary”)), max(df(“age”)))
2、SparkSQL中的数据源
Spark SQL支持通过SchemaRDD接口操作各种数据源。一个SchemaRDD能够作为一个一般的RDD被操作,也可以被注册为一个临时的表。注册一个SchemaRDD为一个表就可以允许你在其数据上运行SQL查询。
加载数据为SchemaRDD的多种数据源,包括RDDs、parquent文件(列式存储)、JSON数据集、Hive表,以下主要介绍将RDDs转换为schemaRDD的两种方法
(1)利用反射推断模式
使用反射来推断包含特定对象类型的RDD的模式(schema)。适用于写spark程序的同时,已经知道了模式,使用反射可以使得代码简洁。结合样本的名字,通过反射读取,作为列的名字。这个RDD可以隐式转化为一个SchemaRDD,然后注册为一个表。表可以在后续的sql语句中使用。
<code class=" hljs avrasm">val sqlContext = new org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.spark</span><span class="hljs-preprocessor">.sql</span><span class="hljs-preprocessor">.SQLContext</span>(sc) import sqlContext<span class="hljs-preprocessor">.implicits</span>._ case class Person(name:String,age:Int) val people = sc<span class="hljs-preprocessor">.textFile</span>(<span class="hljs-string">"file:///home/hdfs/people.txt"</span>)<span class="hljs-preprocessor">.map</span>(_<span class="hljs-preprocessor">.split</span>(<span class="hljs-string">","</span>))<span class="hljs-preprocessor">.map</span>(p => Person(p(<span class="hljs-number">0</span>),p(<span class="hljs-number">1</span>)<span class="hljs-preprocessor">.trim</span><span class="hljs-preprocessor">.toInt</span>))<span class="hljs-preprocessor">.toDF</span>() people<span class="hljs-preprocessor">.registerTempTable</span>(<span class="hljs-string">"people"</span>) val teenagers = sqlContext<span class="hljs-preprocessor">.sql</span>(<span class="hljs-string">"SELECT name,age FROM people WHERE age>= 19 AND age <=30"</span>) teenagers<span class="hljs-preprocessor">.map</span>(t => <span class="hljs-string">"Name:"</span>+t(<span class="hljs-number">0</span>))<span class="hljs-preprocessor">.collect</span>()<span class="hljs-preprocessor">.foreach</span>(println) teenagers<span class="hljs-preprocessor">.map</span>(t => <span class="hljs-string">"Name:"</span> + t<span class="hljs-preprocessor">.getAs</span>[String](<span class="hljs-string">"name"</span>))<span class="hljs-preprocessor">.collect</span>()<span class="hljs-preprocessor">.foreach</span>(println) teenagers<span class="hljs-preprocessor">.map</span>(_<span class="hljs-preprocessor">.getValueMap</span>[Any](List(<span class="hljs-string">"name"</span>,<span class="hljs-string">"age"</span>)))<span class="hljs-preprocessor">.collect</span>()<span class="hljs-preprocessor">.foreach</span>(println)</code>
(2)编程指定模式
通过一个编程接口构造模式来实现,然后可在存在的RDDs上使用它。适用于当前样本模式未知
一个SchemaRDD可以通过三步来创建。
从原来的RDD创建一个行的RDD
创建由一个StructType表示的模式与第一步创建的RDD的行结构相匹配
在行RDD上通过applySchema方法应用模式
<code class=" hljs avrasm">val people = sc<span class="hljs-preprocessor">.textFile</span>(<span class="hljs-string">"file:///home/hdfs/people.txt"</span>) val schemaString = <span class="hljs-string">"name age"</span> import org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.spark</span><span class="hljs-preprocessor">.sql</span><span class="hljs-preprocessor">.Row</span><span class="hljs-comment">;</span> import org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.spark</span><span class="hljs-preprocessor">.sql</span><span class="hljs-preprocessor">.types</span>.{StructType,StructField,StringType}<span class="hljs-comment">;</span> val schema = StructType(schemaString<span class="hljs-preprocessor">.split</span>(<span class="hljs-string">" "</span>)<span class="hljs-preprocessor">.map</span>(fieldName => StructField(fieldName,StringType,true))) val rowRDD = people<span class="hljs-preprocessor">.map</span>(_<span class="hljs-preprocessor">.split</span>(<span class="hljs-string">","</span>))<span class="hljs-preprocessor">.map</span>(p => Row(p(<span class="hljs-number">0</span>),p(<span class="hljs-number">1</span>)<span class="hljs-preprocessor">.trim</span>)) val peopleSchemaRDD = sqlContext<span class="hljs-preprocessor">.applySchema</span>(rowRDD,schema) peopleSchemaRDD<span class="hljs-preprocessor">.registerTempTable</span>(<span class="hljs-string">"people"</span>) val results = sqlContext<span class="hljs-preprocessor">.sql</span>(<span class="hljs-string">"SELECT name FROM people"</span>) //DataFrame <span class="hljs-keyword">and</span> support all the normal RDD operations results<span class="hljs-preprocessor">.map</span>(t => <span class="hljs-string">"Name:"</span>+t(<span class="hljs-number">0</span>))<span class="hljs-preprocessor">.collect</span>()<span class="hljs-preprocessor">.foreach</span>(println)</code>
结果输出
Name:Andy
Name:Justin
Name:JohnSmith
Name:Bob
3、性能调优
主要通过在内存中缓存数据或者设置实验选项来提高性能,降低工作负载
(1)在内存中缓存数据
Spark SQL可以通过调用sqlContext.cacheTable(“tableName”)方法来缓存使用柱状格式的表。然后,Spark将会仅仅浏览需要的列并且自动地压缩数据以减少内存的使用以及垃圾回收的压力。
也可以在SQLContext上使用setConf方法或者在用SQL时运行SET key=value命令来配置内存缓存。
(2)配置选项
可以通过spark.sql.shuffle.partitions、spark.sql.codegen等选项来调整查询执行的性能。
4、其他
Spark SQL也支持直接运行SQL查询的接口,不用写任何代码。在Spark目录运行下面的命令可以启动Spark SQL CLI。
./bin/spark-sql

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题

HQL和SQL在Hibernate框架中进行比较:HQL(1.面向对象语法,2.数据库无关的查询,3.类型安全),而SQL直接操作数据库(1.与数据库无关的标准,2.可执行复杂查询和数据操作)。

《OracleSQL中除法运算的用法》在OracleSQL中,除法运算是常见的数学运算之一。在数据查询和处理过程中,除法运算可以帮助我们计算字段之间的比例或者得出特定数值的逻辑关系。本文将介绍OracleSQL中除法运算的用法,并提供具体的代码示例。一、OracleSQL中除法运算的两种方式在OracleSQL中,除法运算可以使用两种不同的方式进行

Oracle和DB2是两个常用的关系型数据库管理系统,它们都有自己独特的SQL语法和特点。本文将针对Oracle和DB2的SQL语法进行比较与区别,并提供具体的代码示例。数据库连接在Oracle中,使用以下语句连接数据库:CONNECTusername/password@database而在DB2中,连接数据库的语句如下:CONNECTTOdataba

MyBatis动态SQL标签解读:Set标签用法详解MyBatis是一个优秀的持久层框架,它提供了丰富的动态SQL标签,可以灵活地构建数据库操作语句。其中,Set标签是用于生成UPDATE语句中SET子句的标签,在更新操作中非常常用。本文将详细解读MyBatis中Set标签的用法,以及通过具体的代码示例来演示其功能。什么是Set标签Set标签用于MyBati

解决办法:1、检查登录用户是否具有足够的权限来访问或操作该数据库,确保该用户具有正确的权限;2、检查SQL Server服务的帐户是否具有访问指定文件或文件夹的权限,确保该帐户具有足够的权限来读取和写入该文件或文件夹;3、检查指定的数据库文件是否已被其他进程打开或锁定,尝试关闭或释放该文件,并重新运行查询;4、尝试以管理员身份运行Management Studio等等。

数据库技术大比拼:Oracle和SQL的区别有哪些?在数据库领域中,Oracle和SQLServer是两种备受推崇的关系型数据库管理系统。尽管它们都属于关系型数据库的范畴,但两者之间存在着诸多不同之处。在本文中,我们将深入探讨Oracle和SQLServer之间的区别,以及它们在实际应用中的特点和优势。首先,Oracle和SQLServer在语法方面存

学习C语言的魅力:解锁程序员的潜力随着科技的不断发展,计算机编程已经成为了一个备受关注的领域。在众多编程语言中,C语言一直以来都备受程序员的喜爱。它的简单、高效以及广泛应用的特点,使得学习C语言成为了许多人进入编程领域的第一步。本文将讨论学习C语言的魅力,以及如何通过学习C语言来解锁程序员的潜力。首先,学习C语言的魅力在于其简洁性。相比其他编程语言而言,C语

在word中编辑文字内容时,有时会需要输入公式符号。有的小伙们不知道在word根号输入的方法,小面就让小编跟小伙伴们一起分享下word根号输入的方法教程。希望对小伙伴们有所帮助。首先,打开电脑上的Word软件,然后打开要编辑的文件,并将光标移动到需要插入根号的位置,参考下方的图片示例。2.选择【插入】,再选择符号里的【公式】。如下方的图片红色圈中部分内容所示:3.接着选择下方的【插入新公式】。如下方的图片红色圈中部分内容所示:4.选择【根式】,再选择合适的根号。如下方的图片红色圈中部分内容所示:
