Spark SQL和DataFrame的学习总结
1、DataFrame 一个以命名列组织的分布式数据集。概念上相当于关系数据库中一张表或在R / Python中的data frame数据结构,但DataFrame有丰富的优化。在spark 1.3之前,核心的新类型为RDD-schemaRDD,现改为DataFrame。spark 通过DataFrame操作大量的数据源,包
1、DataFrame
一个以命名列组织的分布式数据集。概念上相当于关系数据库中一张表或在R / Python中的data frame数据结构,但DataFrame有丰富的优化。在spark 1.3之前,核心的新类型为RDD-schemaRDD,现改为DataFrame。spark 通过DataFrame操作大量的数据源,包括外部文件(如 json、avro、parquet、sequencefile 等等)、hive、关系数据库、cassandra等。
DataFrame与RDD区别:
RDD以record为单位,spark优化时无法洞悉record内部的细节,无法深度优化,限制sparkSQL性能的提升;DataFrame包含了每个record的metadata元数据信息,DataFrame的优化可以对列内部优化。
(1)DataFrame的创建
Spark中所有相关功能的入口点是SQLContext类或者它的子类,创建一个SQLContext的所有需要仅仅是一个SparkContext。
val sc:SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
除了一个基本的 SQLContext,你也能够创建一个 HiveContext,它支持基本 SQLContext 所支持功能的一个超集。它的额外功能包括用更完整的 HiveQL 分析器写查询去访问 HiveUDFs 的能力、 从 Hive 表读取数据的能力。用 HiveContext 你不需要一个已经存在的 Hive 开启,SQLContext 可用的数据源对 HiveContext 也可用。
使用SQLContext,应用可以从已存在的RDD、hive表或者数据源DataSources中创建DataFrame
示例:从本地 json文件创建
val df = sqlContext.jsonFile(“file:///home/hdfs/people.json”)
df.show()
age name
null Michael
30 Andy
19 Justin
df.printSchema()
|– age: long (nullable = true)
|– name: string (nullable = true)
(2)DataFrame的操作
DataFrame支持RDD的系列操作,可以对表进行过滤和进行多表关联
df.select(“name”).show()
name
Michael
Andy
Justin
df.select(df(“name”),df(“age”)+1).show()
name (age + 1)
Michael null
Andy 31
Justin 20
df.filter(df(“age”)>21).select(“name”).show()
name
Andy
df.groupBy(“age”).count().show()
age count
null 1
19 1
30 1
表之间的连接,3个等号
df.join(df2,df(“name”) === df2(“name”),”left”).show()df.filter(“age > 30”)
.join(department, df(“deptId”) === department(“id”))
.groupBy(department(“name”), “gender”)
.agg(avg(df(“salary”)), max(df(“age”)))
2、SparkSQL中的数据源
Spark SQL支持通过SchemaRDD接口操作各种数据源。一个SchemaRDD能够作为一个一般的RDD被操作,也可以被注册为一个临时的表。注册一个SchemaRDD为一个表就可以允许你在其数据上运行SQL查询。
加载数据为SchemaRDD的多种数据源,包括RDDs、parquent文件(列式存储)、JSON数据集、Hive表,以下主要介绍将RDDs转换为schemaRDD的两种方法
(1)利用反射推断模式
使用反射来推断包含特定对象类型的RDD的模式(schema)。适用于写spark程序的同时,已经知道了模式,使用反射可以使得代码简洁。结合样本的名字,通过反射读取,作为列的名字。这个RDD可以隐式转化为一个SchemaRDD,然后注册为一个表。表可以在后续的sql语句中使用。
<code class=" hljs avrasm">val sqlContext = new org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.spark</span><span class="hljs-preprocessor">.sql</span><span class="hljs-preprocessor">.SQLContext</span>(sc) import sqlContext<span class="hljs-preprocessor">.implicits</span>._ case class Person(name:String,age:Int) val people = sc<span class="hljs-preprocessor">.textFile</span>(<span class="hljs-string">"file:///home/hdfs/people.txt"</span>)<span class="hljs-preprocessor">.map</span>(_<span class="hljs-preprocessor">.split</span>(<span class="hljs-string">","</span>))<span class="hljs-preprocessor">.map</span>(p => Person(p(<span class="hljs-number">0</span>),p(<span class="hljs-number">1</span>)<span class="hljs-preprocessor">.trim</span><span class="hljs-preprocessor">.toInt</span>))<span class="hljs-preprocessor">.toDF</span>() people<span class="hljs-preprocessor">.registerTempTable</span>(<span class="hljs-string">"people"</span>) val teenagers = sqlContext<span class="hljs-preprocessor">.sql</span>(<span class="hljs-string">"SELECT name,age FROM people WHERE age>= 19 AND age <=30"</span>) teenagers<span class="hljs-preprocessor">.map</span>(t => <span class="hljs-string">"Name:"</span>+t(<span class="hljs-number">0</span>))<span class="hljs-preprocessor">.collect</span>()<span class="hljs-preprocessor">.foreach</span>(println) teenagers<span class="hljs-preprocessor">.map</span>(t => <span class="hljs-string">"Name:"</span> + t<span class="hljs-preprocessor">.getAs</span>[String](<span class="hljs-string">"name"</span>))<span class="hljs-preprocessor">.collect</span>()<span class="hljs-preprocessor">.foreach</span>(println) teenagers<span class="hljs-preprocessor">.map</span>(_<span class="hljs-preprocessor">.getValueMap</span>[Any](List(<span class="hljs-string">"name"</span>,<span class="hljs-string">"age"</span>)))<span class="hljs-preprocessor">.collect</span>()<span class="hljs-preprocessor">.foreach</span>(println)</code>
(2)编程指定模式
通过一个编程接口构造模式来实现,然后可在存在的RDDs上使用它。适用于当前样本模式未知
一个SchemaRDD可以通过三步来创建。
从原来的RDD创建一个行的RDD
创建由一个StructType表示的模式与第一步创建的RDD的行结构相匹配
在行RDD上通过applySchema方法应用模式
<code class=" hljs avrasm">val people = sc<span class="hljs-preprocessor">.textFile</span>(<span class="hljs-string">"file:///home/hdfs/people.txt"</span>) val schemaString = <span class="hljs-string">"name age"</span> import org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.spark</span><span class="hljs-preprocessor">.sql</span><span class="hljs-preprocessor">.Row</span><span class="hljs-comment">;</span> import org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.spark</span><span class="hljs-preprocessor">.sql</span><span class="hljs-preprocessor">.types</span>.{StructType,StructField,StringType}<span class="hljs-comment">;</span> val schema = StructType(schemaString<span class="hljs-preprocessor">.split</span>(<span class="hljs-string">" "</span>)<span class="hljs-preprocessor">.map</span>(fieldName => StructField(fieldName,StringType,true))) val rowRDD = people<span class="hljs-preprocessor">.map</span>(_<span class="hljs-preprocessor">.split</span>(<span class="hljs-string">","</span>))<span class="hljs-preprocessor">.map</span>(p => Row(p(<span class="hljs-number">0</span>),p(<span class="hljs-number">1</span>)<span class="hljs-preprocessor">.trim</span>)) val peopleSchemaRDD = sqlContext<span class="hljs-preprocessor">.applySchema</span>(rowRDD,schema) peopleSchemaRDD<span class="hljs-preprocessor">.registerTempTable</span>(<span class="hljs-string">"people"</span>) val results = sqlContext<span class="hljs-preprocessor">.sql</span>(<span class="hljs-string">"SELECT name FROM people"</span>) //DataFrame <span class="hljs-keyword">and</span> support all the normal RDD operations results<span class="hljs-preprocessor">.map</span>(t => <span class="hljs-string">"Name:"</span>+t(<span class="hljs-number">0</span>))<span class="hljs-preprocessor">.collect</span>()<span class="hljs-preprocessor">.foreach</span>(println)</code>
结果输出
Name:Andy
Name:Justin
Name:JohnSmith
Name:Bob
3、性能调优
主要通过在内存中缓存数据或者设置实验选项来提高性能,降低工作负载
(1)在内存中缓存数据
Spark SQL可以通过调用sqlContext.cacheTable(“tableName”)方法来缓存使用柱状格式的表。然后,Spark将会仅仅浏览需要的列并且自动地压缩数据以减少内存的使用以及垃圾回收的压力。
也可以在SQLContext上使用setConf方法或者在用SQL时运行SET key=value命令来配置内存缓存。
(2)配置选项
可以通过spark.sql.shuffle.partitions、spark.sql.codegen等选项来调整查询执行的性能。
4、其他
Spark SQL也支持直接运行SQL查询的接口,不用写任何代码。在Spark目录运行下面的命令可以启动Spark SQL CLI。
./bin/spark-sql

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









HQL と SQL は Hibernate フレームワークで比較されます。HQL (1. オブジェクト指向構文、2. データベースに依存しないクエリ、3. タイプ セーフティ)、SQL はデータベースを直接操作します (1. データベースに依存しない標準、2. 複雑な実行可能ファイル)。クエリとデータ操作)。

「OracleSQLでの除算演算の使用方法」 OracleSQLでは、除算演算は一般的な数学演算の1つです。データのクエリと処理中に、除算演算はフィールド間の比率を計算したり、特定の値間の論理関係を導出したりするのに役立ちます。この記事では、OracleSQL での除算演算の使用法を紹介し、具体的なコード例を示します。 1. OracleSQL における除算演算の 2 つの方法 OracleSQL では、除算演算を 2 つの異なる方法で実行できます。

Oracle と DB2 は一般的に使用される 2 つのリレーショナル データベース管理システムであり、それぞれに独自の SQL 構文と特性があります。この記事では、Oracle と DB2 の SQL 構文を比較し、相違点を示し、具体的なコード例を示します。データベース接続 Oracle では、次のステートメントを使用してデータベースに接続します: CONNECTusername/password@database DB2 では、データベースに接続するステートメントは次のとおりです: CONNECTTOdataba

MyBatis 動的 SQL タグの解釈: Set タグの使用法の詳細な説明 MyBatis は、豊富な動的 SQL タグを提供し、データベース操作ステートメントを柔軟に構築できる優れた永続層フレームワークです。このうち、Set タグは、UPDATE ステートメントで SET 句を生成するために使用され、更新操作でよく使用されます。この記事では、MyBatis での Set タグの使用法を詳細に説明し、特定のコード例を通じてその機能を示します。 SetタグとはMyBatiで使用するSetタグです。

解決策: 1. ログインしているユーザーがデータベースにアクセスまたは操作するための十分な権限を持っているかどうかを確認し、ユーザーが正しい権限を持っているかどうかを確認します; 2. SQL Server サービスのアカウントに指定されたファイルまたはデータベースにアクセスする権限があるかどうかを確認します。 3. 指定されたデータベース ファイルが他のプロセスによって開かれているかロックされているかどうかを確認し、ファイルを閉じるか解放して、クエリを再実行します。管理者として試してください。Management Studio をなどとして実行します。

Linux での system() 関数の概要 Linux システムでは、system() 関数は非常に一般的に使用される関数であり、コマンド ライン コマンドの実行に使用できます。この記事では、system() 関数を詳細に紹介し、いくつかの具体的なコード例を示します。 1. system() 関数の基本的な使用法 system() 関数の宣言は次のとおりです: intsystem(constchar*command); コマンド パラメーターは文字です。

データベース技術コンテスト: Oracle と SQL の違いは何ですか?データベース分野では、Oracle と SQL Server の 2 つは非常に評判の高いリレーショナル データベース管理システムです。どちらもリレーショナル データベースのカテゴリに属しますが、両者の間には多くの違いがあります。この記事では、Oracle と SQL Server の違い、実際のアプリケーションにおけるそれらの機能と利点について詳しく説明します。まず、Oracle と SQL Server の間には構文に違いがあります。

C言語学習の魅力:プログラマーの可能性を引き出す テクノロジーの発展に伴い、コンピュータプログラミングは大きな注目を集めている分野です。数あるプログラミング言語の中でもC言語は常にプログラマーに愛されています。そのシンプルさ、効率性、幅広い用途により、C 言語の学習は、多くの人にとってプログラミングの分野に入る最初のステップとなっています。この記事では、C言語を学ぶ魅力と、C言語を学ぶことでプログラマーの可能性を引き出す方法について解説します。 C言語学習の魅力は、まずその簡単さにあります。他のプログラミング言語と比較すると、C言語は
