大数据分析工程师入门(九):Spark SQL

栏目: 数据库 · 发布时间: 4年前

内容简介:本文为《大数据分析师入门课程》系列的第9篇,在本系列的第8篇-Spark基础中,已经对Spark做了一个入门介绍,在此基础上本篇拎出Spark SQL,主要站在使用者的角度来进行讲解,需要注意的是本文中的例子的代码均使用Scala语言。主要包括以下内容:

大数据分析工程师入门(九):Spark SQL

本文为《大数据分析师入门课程》系列的第9篇,在本系列的第8篇-Spark基础中,已经对Spark做了一个入门介绍,在此基础上本篇拎出Spark SQL,主要站在使用者的角度来进行讲解,需要注意的是本文中的例子的代码均使用Scala语言。

主要包括以下内容:

  • 你该了解的Spark SQL

  • 简单入门操作

  • 不得不说的数据源

你该了解的 Spark SQL

1.什么是Spark SQL?

Spark SQL是Spark专门用来处理结构化数据的模块,是Spark的核心组件,在1.0时发布。

SparkSQL替代的是HIVE的查询引擎,HIVE的默认引擎查询效率低是由于其基于MapReduce实现 SQL 查询,而MapReduce的shuffle是基于磁盘的。

2.Spark SQL特性

其实最初Spark团队推出的是Shark-基于Hive对内存管理、物理计划、执行做了优化,底层使用Spark基于内存的计算引擎,对比Hive性能提升一个数量级。

即便如此高的性能提升,但是由于Shark底层依赖Hive的语法解析器、查询优化器等组件制约其性能的进一步提升。最终Spark团队放弃了Shark,推出了Spark SQL项目,其具备以下特性:

  • 标准的数据连接,支持多种数据源

  • 多种性能优化技术

  • 组件的可扩展性

  • 支持多语言开发:Scala、 JavaPython 、R

  • 兼容Hive

3.Spark SQL可以做什么?

  • 大数据处理

使用SQL进行大数据处理,使传统的RDBMS人员也可以进行大数据处理,不需要掌握像mapreduce的编程方法。

  • 使用高级API进行开发

SparkSQL支持SQL API,DataFrame和Dataset API多种API,使用这些高级API进行编程和采用Sparkcore的RDD API 进行编程有很大的不同。

使用RDD进行编程时,开发人员在采用不同的编程语言和不同的方式开发应用程序时,其应用程序的性能千差万别,但如果使用DataFrame和Dataset进行开发时,资深开发人员和初级开发人员开发的程序性能差异很小,这是因为SparkSQL 内部使用Catalyst optimizer 对执行计划做了很好的优化。

简 单 入 门 操 作

1.构建入口

Spark SQL中所有功能的入口点是SparkSession类-Spark 2.0引入的新概念,它为用户提供统一的切入点。

早期Spark的切入点是SparkContext,通过它来创建和操作数据集,对于不同的API需要不同的context。

比如:使用sql-需要sqlContext,使用hive-需要hiveContext,使用streaming-需要StreamingContext。SparkSession封装了SparkContext和SQLContext。

要创建一个 SparkSession使用SparkSession.builder():


 

import org.apache.spark.sql.SparkSession

val spark = SparkSession

.builder()

.appName("Spark SQL basic example")

.config("spark.some.config.option", "some-value")

.getOrCreate()

2.创建DataFrame

在一个SparkSession中,应用程序可以从结构化的数据文件、Hive的table、外部数据库和RDD中创建一个DataFrame。

举个例子, 下面就是基于一个JSON文件创建一个DataFrame:


 

val df =spark.read.json("examples/src/main/resources/people.json")

// 显示出DataFrame的内容

df.show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19| Justin|

// +----+-------+

3.执行SQL查询


 

// 将DataFrame注册成一个临时视图

df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")

sqlDF.show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19|Justin|

// +----+-------+

SparkSession的SQL函数可以让应用程序以编程的方式运行SQL查询,并将结果作为一个 DataFrame返回。

例子中createOrReplaceTempView创建的临时视图是session级别的,也就是会随着session的消失而消失。如果你想让一个临时视图在所有session中相互传递并且可用,直到Spark 应用退出,你可以建立一个全局的临时视图,全局的临时视图存在于系统数据库global_temp中,我们必须加上库名去引用它。


 

// 将一个DataFrame注册成一个全局临时视图

df.createGlobalTempView("people")

// 注意这里的global_temp

spark.sql("SELECT * FROM global_temp.people").show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19|Justin|

// +----+-------+

// 新的session同样可以访问

spark.newSession().sql("SELECT * FROM global_temp.people").show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19|Justin|

// +----+-------+

4.DataFrame操作示例


 

import spark.implicits._ //导入隐式转换的包

//打印schema

df.printSchema()

// root

// |-- age: long (nullable = true)

// |-- name: string (nullable = true)

//选择一列进行打印

df.select("name").show()

// +-------+

// | name|

// +-------+

// |Michael|

// | Andy|

// | Justin|

// +-------+

//年龄加1

df.select($"name", $"age" +1).show()

// +-------+---------+

// | name|(age + 1)|

// +-------+---------+

// |Michael| null|

// | Andy| 31|

// | Justin| 20|

// +-------+---------+

//选取年龄大于21的

df.filter($"age" > 21).show()

// +---+----+

// |age|name|

// +---+----+

// | 30|Andy|

// +---+----+

//聚合操作

df.groupBy("age").count().show()

// +----+-----+

// | age|count|

// +----+-----+

// | 19| 1|

// |null| 1|

// | 30| 1|

// +----+-----+

5.创建DataSet

Dataset和RDD比较类似,与RDD不同的是实现序列化和反序列化的方式,RDD是使用Java serialization或者Kryo,而Dataset是使用Encoder。

Encoder的动态特性使得Spark可以在执行filtering、sorting和hashing等许多操作时无需把字节反序列化为对象。


 

// 一个简单的Seq转成DataSet,会有默认的schema

val primitiveDS = Seq(1, 2, 3).toDS().show

// +-----+

// |value|

// +-----+

// | 1|

// | 2|

// | 3|

// +-----+

case class Person(name: String, age: Long)

// 通过反射转换为DataSet

val caseClassDS = Seq(Person("Andy",32)).toDS()

caseClassDS.show()

// +----+---+

// |name|age|

// +----+---+

// |Andy| 32|

// +----+---+

// DataFrame指定一个类则为DataSet

val path = "examples/src/main/resources/people.json"

val peopleDS = spark.read.json(path).as[Person]

peopleDS.show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19| Justin|

// +----+-------+

通过上述的代码可以看出创建DataSet的代码很简单,一个toDs就可以自动推断出schema的类型,读取json这种结构化的数据得到的是一个DataFrame,再指定它的类则为DataSet。

6.RDD的互操作性

RDD的互操作性指的是RDD和DataFrame的相互转换,DataFrame转RDD很简单,复杂的是RDD转DataFrame。

目前Spark SQL有两种方法:

  • 反射推断

Spark SQL 的 Scala 接口支持自动转换一个包含 Case Class的 RDD 为DataFrame。Case Class 定义了表的Schema。Case class 的参数名使用反射读取并且成为了列名。Case class 也可以是嵌套的或者包含像 Seq 或者 Array 这样的复杂类型,这个 RDD 能够被隐式转换成一个 DataFrame 然后被注册为一个表。


 

// 开启隐式转换

import spark.implicits._

// 读入文本文件并最终转化成DataFrame

val peopleDF = spark.sparkContext

.textFile("examples/src/main/resources/people.txt")

.map(_.split(","))

.map(attributes => Person(attributes(0), attributes(1).trim.toInt))

.toDF()

// 将DataFrame注册成表

peopleDF.createOrReplaceTempView("people")

// 执行一条sql查询

val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// 通过map操作后得到的是RDD

teenagersDF.map(teenager => "Name: " +teenager(0)).show()

// +------------+

// | value|

// +------------+

// |Name: Justin|

// +------------+

另一种更加简单的操作是将RDD中每一行类型变为tuple类型,然后使用toDF依次赋予字段名,需要注意的是使用tuple最高可以支持22个字段。


 

// 开启隐式转换

import spark.implicits._

// 读入文本文件并最终转化成DataFrame

val peopleDF = spark.sparkContext

.textFile("examples/src/main/resources/people.txt")

.map(_.split(","))

.map(attributes => (attributes(0), attributes(1).trim.toInt))

.toDF("name","age")

//peopleDF: org.apache.spark.sql.DataFrame = [name:string, age: int]

  • 构造Schema

在无法提前定义schema的情况下,RDD转DataFrame或者DataSet需要构造Schema。

构建一个Schema并将它应用到一个已存在的RDD编程接口需要以下四个步骤:

a.从原始的RDD创建一个tuple或者列表类型的RDD

b.创建一个StructType来匹配RDD中的结构

c.将生成的RDD转换成Row类型的RDD

d.通过createDataFrame方法将Schema应用到RDD


 

//需要导入类型相关的包

import org.apache.spark.sql.types._

//读取hdfs上的文本文件,保存到rdd中

val peopleRDD =spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// 这里的schema是一个字符串,可以来自于其他未知内容的文件,需要注意的是-这里明确写出来只是为了演示,并不代表提前知道schema信息。

val schemaString = "name age"

// 将有schema信息的字符串转变为StructField类型

val fields = schemaString.split(" ")

.map(fieldName => StructField(fieldName, StringType, nullable =true))

//通过StructType方法读入schema

val schema = StructType(fields)

// 将RDD转换成Row类型的RDD

val rowRDD = peopleRDD

.map(_.split(","))

.map(attributes => Row(attributes(0), attributes(1).trim))

// 应用schema信息到Row类型的RDD

val peopleDF = spark.createDataFrame(rowRDD,schema)

不得不说的数据源

在工作中使用Spark SQL进行处理数据的第一步就是读取数据,Spark SQL通过统一的接口去读取和写入数据。主要是read和write操作,不同的数据源相应的Option(附加设置)会有所不同,下面通过例子来具体说明。

1.数据读取

  • parquet

1)读取Parquet文件

parquet文件自带schema,读取后是DataFrame格式。


 

val usersDF =spark.read.load("examples/src/main/resources/users.parquet")

//usersDF: org.apache.spark.sql.DataFrame = [name:string, favorite_color: string ... 1 more field]

2)解析分区信息

parquet文件中如果带有分区信息,那么SparkSQL会自动解析分区信息。比如,这样一份人口数据按照gender和country进行分区存储,目录结构如下:


 

test

└── spark-sql

└── test

├──gender=male

│ │

│ ├── country=US

│ │ └── data.parquet

│ ├── country=CN

│ │ └── data.parquet

│ └── ...

└──gender=female

├── country=US

│ └── data.parquet

├── country=CN

│ └── data.parquet

└── ...

通过spark.read.load读取该目录下的文件SparkSQL将自动解析分区信息,返回的DataFrame的Schema如下:


 

root

|-- name: string (nullable = true)

|-- age: long (nullable = true)

|-- gender: string (nullable = true)

|-- country: string (nullable = true)

目前自动解析分区支持数值类型和字符串类型。

自动解析分区类型的参数为:spark.sql.sources.partitionColumnTypeInference.enabled,默认值为true。可以关闭该功能,直接将该参数设置为disabled。此时,分区列数据格式将被默认设置为string类型,不会再进行类型解析。

3)Schema合并

如果读取的多个parquet文件中的Schema信息不一致,Spark SQL可以设置参数进行合并,但是Schema合并是一个高消耗的操作,在大多数情况下并不需要,所以Spark SQL从1.5.0开始默认关闭了该功能。

可以通过下面两种方式开启该功能:

a.读取文件的时候,开启合并功能,只对本次读取文件进行合并Schema操作

b.设置全局SQL选项spark.sql.parquet.mergeSchema为true,每次读取文件都会进行合并Schema操作

具体请看下面的例子:


 

// sqlContext是之前例子中生成的

// 导入隐式转换

import sqlContext.implicits._

// 创建一个简单的DataFrame并保存

val df1 = sc.makeRDD(1 to 5).map(i => (i, i *2)).toDF("single", "double")

df1.write.parquet("data/test_table/key=1")

// 创建另一个DataFrame,注意字段名

val df2 = sc.makeRDD(6 to 10).map(i => (i, i *3)).toDF("single", "triple")

df2.write.parquet("data/test_table/key=2")

// 读取这两个parquet文件,增加开启合并Schema的设置

val df3 =sqlContext.read.option("mergeSchema","true").parquet("data/test_table")

df3.printSchema()

// 不同名称的字段都保留下来了

// root

// |-- single: int (nullable = true)

// |-- double: int (nullable = true)

// |-- triple: int (nullable = true)

// |-- key : int (nullable = true)

关于schema合并,有一点需要特别关注,那就是当不同parquet文件的schema有冲突时,合并会失败,如同名的字段,其类型不一致的情况。这时如果你读取的是hive数据源,可能会出现读取失败或者读取字段值全部为NULL的情况。如果大家遇到类型场景,可以考虑是否是这个因素导致。

  • json

json文件和parquet文件一样也是带有schema信息,不过需要指明是json文件,才能准确的读取。


 

val peopleDF =spark.read.format("json").load("examples/src/main/resources/people.json")

//peopleDF: org.apache.spark.sql.DataFrame = [age:bigint, name: string]

  • MySQL

读取 MySQL 中的数据是通过jdbc的方式,需要知道要访问的MySQL数据库、表等信息,具体请看下面的代码:


 

//MySQL数据的访问ip、端口号和数据库名

val url ="jdbc:mysql://192.168.100.101:3306/testdb"

//要访问的表名

val table = "test"

//建立一个配置变量

val properties = new Properties()

//将用户名存入配置变量

properties.setProperty("user","root")

//将密码存入配置变量

properties.setProperty("password","root")

//需要传入Mysql的URL、表名、配置变量

val df = sqlContext.read.jdbc(url,table,properties)

这里要注意的一个点是,读取MySQL需要运行作业时,classpath下有MySQL的驱动jar,或者通过--jars添加驱动jar。

  • hive

读取hive数据的前提是要进行相关的配置,需要将hive-site.xml、core-site.xml、hdfs-site.xml以及hive的lib依赖放入spark的classpath下,或者在提交作业时通过--files和--jars来指定这些配置文件和jar包。之后,就可以很方便的使用hive的数据表了,示例代码如下:


 

import java.io.File

import org.apache.spark.sql.Row

import org.apache.spark.sql.SparkSession

case class Record(key: Int, value: String)

// 数仓地址指向默认设置

val warehouseLocation = newFile("spark-warehouse").getAbsolutePath

val spark = SparkSession

.builder()

.appName("Spark Hive Example")

.config("spark.sql.warehouse.dir", warehouseLocation)

.enableHiveSupport() //增加支持hive特性

.getOrCreate()

import spark.implicits._

import spark.sql

//使用sql创建一个表,并将hdfs中的文件导入到表中

sql("CREATE TABLE IF NOT EXISTS src (key INT,value STRING) USING hive")

sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 使用sql直接指向sql查询

sql("SELECT * FROM src").show()

// +---+-------+

// |key| value|

// +---+-------+

// |238|val_238|

// | 86| val_86|

// |311|val_311|

// ...

2.数据保存

  • write

保存用write方法,先看一个简单的例子,将一个DataFrame保存到parquet文件中。


 

//选取DataFrame中的两列保存到parquet文件中

usersDF.select("name","favorite_color").write.save("namesAndFavColors.parquet")

  • format

format可以指定保存文件的格式,支持json、csv、orc等


 

//选取DataFrame中的两列保存到json文件中

usersDF.select("name", "favorite_color").write.format("json").save("namesAndFavColors.json")

  • mode

在保存数据的时候,要不要考虑数据存不存在?是覆盖还是追加呢?通过mode可以进行设置。


 

//选取DataFrame中的两列保并追加到parquet文件中

usersDF.select("name","favorite_color").write.mode(SaveMode.append)

.save("namesAndFavColors.parquet")

除了append还有下列选项:

选项

含义

ErrorIfExists

如果数据已经存在,则会抛出异常

Append

如果数据已经存在,则会追加

Overwrite

每次都会覆盖

ignore

如果数据已经存在,则不做任何操作

总结

本文通过什么是Spark SQL,有哪些特性,可以做什么让读者对Spark SQL有个整体的了解,然后着重讲解了如何进行入门操作和多种数据源操作。

掌握了以上技能,大数据分析工程师面对Spark SQL相关的工作一定可以游刃有余。

参考文献:

[1] Apache Spark官网文档-http://spark.apache.org/docs/latest/sql-getting-started.html

[2] Spark SQL之前世今生,作者:ZFH__ZJ- https://www.cnblogs.com/linn/p/5325147.html

[3] Spark学习之路 (十八)SparkSQL简单使用,作者:扎心了,老铁 - https://www.cnblogs.com/qingyunzong/p/8987579.html

大数据分析工程师入门(九):Spark SQL


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

大数据时代

大数据时代

[英] 维克托•迈尔•舍恩伯格(Viktor Mayer-Schönberger) / 周涛 / 浙江人民出版社 / 2012-12 / 49.90元

《大数据时代》是国外大数据研究的先河之作,本书作者维克托•迈尔•舍恩伯格被誉为“大数据商业应用第一人”,拥有在哈佛大学、牛津大学、耶鲁大学和新加坡国立大学等多个互联网研究重镇任教的经历,早在2010年就在《经济学人》上发布了长达14页对大数据应用的前瞻性研究。 维克托•迈尔•舍恩伯格在书中前瞻性地指出,大数据带来的信息风暴正在变革我们的生活、工作和思维,大数据开启了一次重大的时代转型,并用三......一起来看看 《大数据时代》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

在线进制转换器
在线进制转换器

各进制数互转换器

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具