Spark SQL 的 Parquet 那些事儿

栏目: 数据库 · 发布时间: 4年前

内容简介:Parquet是一种列式存储格式,很多种处理引擎都支持这种存储格式,也是sparksql的默认存储格式。Spark SQL支持灵活的读和写Parquet文件,并且对parquet文件的schema可以自动解析。当Spark SQL需要写成Parquet文件时,处于兼容的原因所有的列都被自动转化为了nullable。1读写Parquet文件

Parquet是一种列式存储格式,很多种处理引擎都支持这种存储格式,也是sparksql的默认存储格式。Spark SQL支持灵活的读和写Parquet文件,并且对parquet文件的schema可以自动解析。当Spark SQL需要写成Parquet文件时,处于兼容的原因所有的列都被自动转化为了nullable。

1

读写Parquet文件

<span style="">// Encoders for most common types are automatically provided by importing spark.implicits._</span>

<span><span>import</span> spark.implicits._</span>

<span><br /></span>

<span><span>val</span> peopleDF = spark.read.json(<span>&quot;examples/src/main/resources/people.json&quot;</span>)</span>

<span style="">// DataFrames can be saved as Parquet files, maintaining the schema information</span>

<span>peopleDF.write.parquet(<span>&quot;people.parquet&quot;</span>)</span>

<span><br /></span>

<span><br /></span>

<span style="">// Read in the parquet file created above</span>

<span style="">// Parquet files are self-describing so the schema is preserved</span>

<span style="">// The result of loading a Parquet file is also a DataFrame</span>

<span><span>val</span> parquetFileDF = spark.read.parquet(<span>&quot;people.parquet&quot;</span>)</span>

<span><br /></span>

<span><br /></span>

<span style="">// Parquet files can also be used to create a temporary view and then used in SQL statements</span>

<span>parquetFileDF.createOrReplaceTempView(<span>&quot;parquetFile&quot;</span>)</span>

<span><span>val</span> namesDF = spark.sql(<span>&quot;SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19&quot;</span>)</span>

<span>namesDF.map(attributes =&gt; <span>&quot;Name: &quot;</span> + attributes(<span>0</span>)).show()</span>

<span style="">// +------------+</span>

<span style="">// | value|</span>

<span style="">// +------------+</span>

<span style="">// |Name: Justin|</span>

<span style="">// +------------+</span>

2

分区发现

分区表时很多系统支持的,比如hive,对于一个分区表,往往是采用表中的某一或多个列去作为分区的依据,分区是以文件目录的形式体现。所有内置的文件源(Text/CSV/JSON/ORC/Parquet)都支持自动的发现和推测分区信息。例如,我们想取两个分区列,gender和country,先按照性别分区,再按照国家分区:

<span>path</span>

<span>└── to</span>

<span> └── table</span>

<span> ├── gender=male</span>

<span> │ ├── ...</span>

<span> │ │</span>

<span> │ ├── country=US</span>

<span> │ │ └── <span>data</span>.parquet</span>

<span> │ ├── country=CN</span>

<span> │ │ └── <span>data</span>.parquet</span>

<span> │ └── ...</span>

<span> └── gender=female</span>

<span> ├── ...</span>

<span> │</span>

<span> ├── country=US</span>

<span> │ └── <span>data</span>.parquet</span>

<span> ├── country=CN</span>

<span> │ └── <span>data</span>.parquet</span>

<span> └── ...</span>

SparkSession.read.parquet 或者 SparkSession.read.load读取的目录为path/to/table的时候,会自动从路径下抽取分区信息。返回DataFrame的表结构为:

<span>root</span>

<span>|-- name: string (<span>nullable</span> = <span>true</span>)</span>

<span>|-- age: <span>long</span> (<span>nullable</span> = <span>true</span>)</span>

<span>|-- gender: string (<span>nullable</span> = <span>true</span>)</span>

<span>|-- country: string (<span>nullable</span> = <span>true</span>)</span>

细细分析一下你也会发现分区列的数据类型也是自动推断的。当前支持的数据类型有,数字类型,date,timestamp和string类型。有时候用户可能不希望自动推断分区列的类型,这时候只需要将spark.sql.sources.partitionColumnTypeInference.enabled配置为false即可。如果分区列的类型推断这个参数设置为了false,那么分区列的类型会被认为是string。

从spark 1.6开始,分区发现默认情况只会发现给定路径下的分区。比如,上面的分区表,假如你讲路径path/to/table/gender=male传递给SparkSession.read.parquet 或者 SparkSession.read.load 那么gender不会被认为是分区列。如果想检测到该分区,传给spark的路径应该是其父路径也即是path/to/table/,这样gender就会被认为是分区列。

3

schema合并

跟protocol buffer,avro,thrift一样,parquet也支持schema演变升级。用户可以在刚开始的时候创建简单的schema,然后根据需要随时扩展新的列。

spark sql 用Parquet 数据源支持自动检测新增列并且会合并schema。

由于合并schema是一个相当耗费性能的操作,而且很多情况下都是不必要的,所以从spark 1.5开始就默认关闭掉该功能。有两种配置开启方式:

1.通过数据源option设置mergeSchema为true。

2.在全局 sql 配置中设置spark.sql.parquet.mergeSchema 为true.

<span>// This is used to implicitly convert an RDD to a DataFrame.</span>

<span>import spark.implicits._</span>

<span><br /></span>

<span><br /></span>

<span>// <span>Create</span> a simple DataFrame, <span>store</span> <span>into</span> a <span>partition</span> <span>directory</span></span>

<span>val squaresDF = spark.sparkContext.makeRDD(<span>1</span> <span>to</span> <span>5</span>).map(i =&gt; (i, i * i)).toDF(<span>&quot;value&quot;</span>, <span>&quot;square&quot;</span>)</span>

<span>squaresDF.write.parquet(<span>&quot;data/test_table/key=1&quot;</span>)</span>

<span><br /></span>

<span><br /></span>

<span>// <span>Create</span> another DataFrame <span>in</span> a <span>new</span> <span>partition</span> <span>directory</span>,</span>

<span>// adding a <span>new</span> <span>column</span> <span>and</span> dropping an existing <span>column</span></span>

<span>val cubesDF = spark.sparkContext.makeRDD(<span>6</span> <span>to</span> <span>10</span>).map(i =&gt; (i, i * i * i)).toDF(<span>&quot;value&quot;</span>, <span>&quot;cube&quot;</span>)</span>

<span>cubesDF.write.parquet(<span>&quot;data/test_table/key=2&quot;</span>)</span>

<span><br /></span>

<span><br /></span>

<span>// <span>Read</span> the partitioned <span>table</span></span>

<span>val mergedDF = spark.read.option(<span>&quot;mergeSchema&quot;</span>, <span>&quot;true&quot;</span>).parquet(<span>&quot;data/test_table&quot;</span>)</span>

<span>mergedDF.printSchema()</span>

<span><br /></span>

<span><br /></span>

<span>// The <span>final</span> <span>schema</span> consists <span>of</span> <span>all</span> <span>3</span> <span>columns</span> <span>in</span> the Parquet files together</span>

<span>// <span>with</span> the partitioning <span>column</span> appeared <span>in</span> the <span>partition</span> <span>directory</span> paths</span>

<span>// root</span>

<span>// |<span>-- value: int (nullable = true)</span></span>

<span>// |<span>-- square: int (nullable = true)</span></span>

<span>// |<span>-- cube: int (nullable = true)</span></span>

<span>// |<span>-- key: int (nullable = true)</span></span>

4

hive metastore Parquet表转换

当读写hive metastore parquet格式表的时候,Spark SQL为了较好的性能会使用自己默认的parquet格式而不是采用hive SerDe。该行为是通过参数spark.sql.hive.convertMetastoreParquet空值,默认是true。

5

Hive和parquet兼容性

从表schema处理角度讲hive和parquet有两个主要的区别

  1. hive是大小写敏感的,但是parquet不是。

  2. hive会讲所有列视为nullable,但是nullability在parquet里有独特的意义。

由于上面的原因,在将hive metastore parquet转化为spark parquet表的时候,需要处理兼容一下hive的schema和parquet的schema。兼容处理的原则是:

  1. 有相同名字的字段必须要有相同的数据类型,忽略nullability。兼容处理的字段应该保持parquet侧的数据类型,这样就可以处理到nullability类型了。

  2. 兼容处理的schema应直接包含在hive元数据里的schema信息:

    1. 任何仅仅出现在parquet schema的字段将会被删除

    2. 任何仅仅出现在hive 元数据里的字段将会被视为nullable。

6

元数据刷新

Spark SQL为了更好的性能会缓存parquet的元数据。当spark 读取hive表的时候,schema一旦从hive转化为spark sql的,就会被spark sql缓存,如果此时表的schema被hive或者其他外部 工具 更新,必须要手动的去刷新元数据,才能保证元数据的一致性。

spark.catalog.refreshTable("my_table")

7

配置

parquet的相关的参数可以通过setconf或者set key=value的形式配置。

  • spark.sql.parquet.binaryAsString 默认值是false。一些parquet生产系统,尤其是impala,hive和老版本的spark sql,不区分binary和string类型。该参数告诉spark 讲binary数据当作字符串处理。

  • spark.sql.parquet.int96AsTimestamp 默认是true。有些parquet生产系统,尤其是parquet和hive,将timestamp翻译成INT96.该参数会提示Spark SQL讲INT96翻译成timestamp。

  • spark.sql.parquet.compression.codec 默认是snappy。当写parquet文件的时候设置压缩格式。如果在option或者properties里配置了compression或者parquet.compression优先级依次是:compression,parquet.compression,spark.sql.parquet.compression.codec。支持的配置类型有:none,uncompressed,snappy,gzip,lzo,brotli,lz4,zstd。在hadoop2.9.0之前,zstd需要安装ZstandardCodec,brotli需要安装BrotliCodec。

  • spark.sql.parquet.filterPushdown 默认是true。设置为true代表开启parquet下推执行优化。

  • spark.sql.hive.convertMetastoreParquet 默认是true。假如设置为false,spark sql会读取hive parquet表的时候使用Hive SerDe,替代内置的。

  • spark.sql.parquet.mergeSchema 默认是false。当设置为true的时候,parquet数据源会合并读取所有的parquet文件的schema,否则会从summary文件或者假如没有summary文件的话随机的选一些数据文件来合并schema。

  • spark.sql.parquet.writeLegacyFormat 默认是false。如果设置为true 数据会以spark 1.4和更早的版本的格式写入。比如,decimal类型的值会被以apache parquet的fixed-length byte array格式写出,该格式是其他系统例如hive,impala等使用的。如果是false,会使用parquet的新版格式。例如,decimals会以int-based格式写出。如果spark sql要以parquet输出并且结果会被不支持新格式的其他系统使用的话,需要设置为true。

推荐阅读

spark面试该准备点啥

Spark SQL从入门到精通

Spark Streaming 场景应用

Spark SQL 的 Parquet 那些事儿


以上所述就是小编给大家介绍的《Spark SQL 的 Parquet 那些事儿》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

计算机算法导引

计算机算法导引

卢开澄 / 第2版 (2006年1月1日) / 2006-1 / 38.0

本书为《计算机算法导引——设计与分析》的第2版。书中内容分3部分:第1部分是基本算法,按方法论区分,包含优先策略与分治策略、动态规划、概率算法、并行算法、搜索法、数据结构等;第2部分是若干专题,包括排序算法、计算几何及计算数论、线性规划;第3部分是复杂性理论与智能型算法,其中,智能型算法主要介绍了遗传算法和模拟退火算法。本书可作为计算机系本科学生及研究生教材,数学系师生和科研T作者也可将其作为参考......一起来看看 《计算机算法导引》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码