Spark原理|SparkSQL DatasourceV2 之 Multiple Catalog

栏目: IT技术 · 发布时间: 5年前

[实习] 阿里巴巴计算平台事业部 2021 应届生实习生招聘 (大数据计算、云计算平台、产品等等)

面向人群:海内外院校2021届毕业生,毕业时间2020年11月-2021年10月

阿里巴巴计算平台事业部是阿里巴巴支撑所有计算服务的大中台,为整个集团提供了包括批计算、流计算、机器学习等各种计算服务。面对着双十一的各项挑战,复杂的业务场景,飞速的业务增长,高并发的大促洪峰,让我们一直贴近业务,解决难题,不断进行技术创新。

我们的产品主要是,基于Hadoop、Kubernetes和Flink等开源生态,结合阿里巴巴电商业务场景,研发阿里巴巴新一代大数据计算平台,包括计算引擎、 SQL 引擎、分布式存储、资源调度等核心技术,统一支持批计算、流计算、机器学习等计算需求,支持阿里集团所有实时和离线核心业务(搜索,推荐,双11大屏,机器学习后端计算),同时在阿里云上提供大数据计算服务。

如果你对开源大数据感兴趣,这里有最好的土壤。这里有 Apache Flink 的核心开发和创始团队,多位Flink PMC & Committer, Calcite PMC & Commiter, Hadoop PMC & Committer、Hive PMC & Committer 以及Hbase PMC & Committer,加入我们可以跟大神面对面交流哦。

招聘岗位:分布式计算、云计算平台、大数据处理、SQL 引擎、存储、云产品运营

工作地点:杭州、上海、北京

简历投递: jiangjie.qj@alibaba-inc.com

Spark原理|SparkSQL DatasourceV2 之 Multiple Catalog

SparkSQL DatasourceV2作为Spark2.3引入的特性,在Spark 3.0 preview(2019/12/23)版本中又有了新的改进以更好的支持各类数据源。本文将从catalog角度,介绍新的数据源如何和Spark DatasourceV2进行集成。

问题

SparkSQL是Spark的一个子模块,主要功能是用于处理结构化数据,目前在大数据OLAP领域已经有了广泛的应用。Iceberg作为一个通用的表格式,也已经在数据湖的解决方案中逐渐展现了它的优势。

那该如何将这2者相结合,使得应用SparkSQL + Iceberg可以和SparkSQL + Hive一样方便,如,基于SQL直接访问数据或进行DDL操作:

select c1 from iceberg_db.t;

drop table iceberg_db.t;

SparkSQL 基本原理

先来看下SparkSQL处理SQL的基本流程: Spark原理|SparkSQL DatasourceV2 之 Multiple Catalog

如上图所示,在提交SQL后,spark内部会经历语法解析生成逻辑计划,解析逻辑计划,优化逻辑计划,生成执行计划,执行。在解析逻辑计划的过程中,引入了catalog,它的作用是来判断SQL中引用的数据库,表,列,函数等是否存在。

在Spark + Hive的解决方案中,基于 ExternalCatalog 接口,实现了 HiveExternalCatalog ,该类中的Hive客户端和Hive的metastore进行交互,从而能解析SQL中的库表列是否存在,并能基于Hive客户端进行Hive表的DDL操作,比如create table, drop table等。

Multiple Catalog解析

那Spark + Iceberg是否只需要实现 ExternalCatalog 接口,就能基于SQL直接访问数据或进行DDL操作吗?答案是肯定的,但是,由于解析SQL过程中只能支持一种catalog,如果要实现Hive table joion Iceberg table该怎么办,如:

select *

from iceberg_db.t1

join hive_db.t2

on t1.k1 = t2.k1;

为了更为通用的解决这类问题,在Spark 3.0 preview版本中引入了multiple catalog功能,该功能对于catalog做了如下变化:

  • CatalogPlugin
    CatalogManager
    CatalogPlugin
    
spark.sql.catalog.<catalog-name>=<YourCatalogClass>

这里 <catalog-name> 设置为需要的CatalogName, <YourCatalogClass> 设置为具体的实现类,如,

spark.sql.catalog.iceberg_catalog = org.apache.iceberg.spark.SparkCatalog

接口定义如下:

public interface CatalogPlugin {


void initialize(String name, CaseInsensitiveStringMap options);


String name();

}

  • 增加了 TableCatalog 的接口,该接口继承自 CatalogPlugin ,定义了相关的方法用来解析SQL中的元数据,如,tableExists,还定义了一系列方法进行DDL操作,如,createTable,alterTable,dropTable,接口定义如下,

public interface TableCatalog extends CatalogPlugin {


Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException;


Table loadTable(Identifier ident) throws NoSuchTableException;


default void invalidateTable(Identifier ident) {

}


default boolean tableExists(Identifier ident) {

try {

return loadTable(ident) != null;

} catch (NoSuchTableException e) {

return false;

}

}


Table createTable(

Identifier ident,

StructType schema,

Transform[] partitions,

Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;


Table alterTable(

Identifier ident,

TableChange... changes) throws NoSuchTableException;


boolean dropTable(Identifier ident);


void renameTable(Identifier oldIdent, Identifier newIdent)

throws NoSuchTableException, TableAlreadyExistsException;

}

  • CatalogManager
    CatalogPlugin
    CatalogManager
    
Spark原理|SparkSQL DatasourceV2 之 Multiple Catalog

在解析过程中,根据catalogName从 CatalogManager 获取具体的 CatalogPlugin 实现, V2SessionCatalog 是为了兼容之前的catalog的实现机制,而 CustomerCatalog 是自定义的CatalogPlugin实现。同时,CatalogManager还会管理当前的Catalog/Namespace,相关方法如下:

def currentNamespace: Array[String]

def setCurrentNamespace(namespace: Array[String]): Unit

def currentCatalog: CatalogPlugin

def setCurrentCatalog(catalogName: String): Unit

  • 命名结构变更为 [<catalogName>.][<namespaceName>.]*<tableName> ,对于表名,原本只支持2层的命名结构, databaseName.tableName ,但是在业界流行的数据库中(如 MySQL,PostgreSQL ),已经支持3层的命名结构, database.schema.table 。而在multiple catalog实现过程中,引入了Namespace概念,使得SparkSQL能支持多层命名结构,如, catalog.ns1.ns2.table
  • 由于引入了catalog和namespace概念,SparkSQL还增加相关命令支持catalog/namespace的管理,如,

CREATE/DROP/SHOW NAMESPACES

USE <catalogName>.<namespaceName>

除了multiple catalog以外,SparkSQL DatasourceV2还重构生成了 SupportsRead / SupportsWrite 等接口,用来支持数据源的各类操作,由于篇幅有限,就不在本文中具体展开。

基于 Spark 3.0 preview使用Iceberg + SparkSQL

在Spark DatasourceV2增加了multiple catalog等功能后,回到我们想要查询的SQL,实现步骤如下:

  1. 在Iceberg侧对 CatalogPlugin/TableCatalog/SupportsRead 等接口进行实现,实现类名如: org.apache.iceberg.spark.SparkCatalog
  2. 在spark的配置文件中设置:

spark.sql.catalog.iceberg_catalog = org.apache.iceberg.spark.SparkCatalog
  1. 基于配置的catalogName,调整SQL如下,就可以进行基于SQL的跨数据源查询了。

select *

from iceberg_catalog.ns1.t1

join hive_db.t2 on t1.k1 = t2.k1;

  1. 除了跨数据源数据分析以外,现在还可以对Iceberg的表进行DDL操作了,如,

create table iceberg_catalog.t1 ......

drop table iceberg_catalog.t1

总结

Spark 3.0 preview在DatasourceV2的功能方面较Spark2.4做了比较大的改动,Multiple Catalog作为比较重要的新增功能,使得新的数据源能很便捷的和SparkSQL进行整合,提供元数据相关服务。除了Multiple Catalog以外,还增加了诸如 SupportsReadSupportsWriteSupportsPushDownFilters 等一系列接口增强对数据源的整合。Iceberg作为新兴的表格式,能很好的利用DatasourceV2的新功能,结合SparkSQL构建数据湖解决方案。目前Iceberg开源代码还未针对新的DatasourceV2特性进行更新,在我们内部项目中已经对这块整合进行了相关实践,并计划贡献给社区,使得基于Iceberg的数据湖解决方案能更完善。

数据湖,大数据的下一个变革!

为什么我选择Apache Iceberg

Apache Iceberg快速入门

Apache Iceberg的Schema Evolution详解

Apache Iceberg 对推荐应用架构的优化及读写流程解析

Spark原理|SparkSQL DatasourceV2 之 Multiple Catalog


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Two Scoops of Django

Two Scoops of Django

Daniel Greenfeld、Audrey M. Roy / CreateSpace Independent Publishing Platform / 2013-4-16 / USD 29.95

Two Scoops of Django: Best Practices For Django 1.5 is chock-full of material that will help you with your Django projects. We'll introduce you to various tips, tricks, patterns, code snippets, and......一起来看看 《Two Scoops of Django》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

html转js在线工具
html转js在线工具

html转js在线工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具