一点资讯 SparkSQL 查询引擎实践

栏目: 编程工具 · 发布时间: 5年前

内容简介:01—

“本文分析SparkSQL ThriftServer工作原理,修改Spark SQL源代码并实现了 SQL 查询进度的计算,最后展示了一点资讯基于Presto+SparkSQL+Hive的Web查询引擎

01

问题背景:SparkSQL ThriftServer 无法获取查询进度

目前公司的分布式Adhoc查询有以下几种:Presto,Hive,SparkSQL。公司外部开源系统里比较知名的有Redash,Zeppelin等查询工具。

以上查询 工具 均支持:Hive、SparkSQL、 Mysql 等查询数据源,SQL格式化,结果进行 排序 等。Redash还支持语法动态提示等功能,非常的方便。

然而很遗憾的是: 上述开源工具,无论是对接SparkSQL还是对接HiveServer2都有一个致命的问题就是—— 无法知晓查询的进度 ,甚至是像Presto这种本身带有进度的查询引擎,在接入Redash等系统后,也无法知晓当前查询的时间、进度、剩余时间等。

查询进度重要吗?

从人机交互的角度看,快查询进度条意义不大,但是Hive等MR查询 长时间的盲目等待是难以接受的(Hive命令行提供大致的MR进度展示)。 如MysqlWorkBench和Navicat Mysql等工具

  • Mysql查询大都是毫秒级别返回,不显示进度条。

  • 导入导出数据库时以上工具都有进度条,即长时间作业需要进度条。

我们的目标是 :深入分析SparkSQL ThriftServer的工作过程,并通过修改源代码等方式来获取执行的状态,让Presto查询、Hive查询、SparkSQL查询均能感知到进度。本文重点介绍了SparkSQL查询进度的实现。

02

SparkSQL ThriftServer工作原理剖析

一点资讯 SparkSQL 查询引擎实践

图1: SparkSQL ThriftServer工作的5个步骤

从上图可知,SparkSQL ThriftServer的启动过程分 5个步骤。 在步骤3 ExecuteStatement时,SparkSQL 为每一个查询创建Spark的Job,并随机生成UUID。

一点大数据团队修改SparkSQL源代码实现定制化的JobID生成,并跟踪每个SparkJob的运行过程实现SQL执行进度的计算。

1:SparkSQL初始化

一点资讯 SparkSQL 查询引擎实践

图2: SparkSQL ThriftServer 初始化

如上图所知,SparkSQL ThriftServer初始化包含以下几个步骤:

  1. Java Main函数启动,初始化HiveServer2,创建 ThriftBinaryCLIService

  2. 创建Hive模块内的 ThriftBinaryCLIService 并传入 SparkSQL模块内的SparkSqlCliService

  3. 创建SparkSQLSessionManager 通过反射方式将CliService中的SessionManager设置为自身

    1. 使用反射是 因为Hive代码中的CLiService的SessonManager为 Private方法

  4. Thrift BinaryService启动,并根据配置的ThriftSever端口、地址等信息创建TThreadPoolServer。

2:建立链接OpenSession:

客户端(Beeline CLI) 向服务器ThriftBinaryServer发送OpenSession的Thrift请求:

  1. SparkSQLCLIService 调用初始化过程中创建的SessionManager 创建Session

  2. 此时根据配置的不同为Thrift的会话绑定一个SparkSession

  3. 每个Session设置自己的SessionManager和OperationManager

    1. SessonManager用于管理整个Hive的查询周期,比如建立链接、关闭链接等。

    2. OperationManager 完成具体任务:用于执行具体的任务逻辑比如查看表信息、查看元数据

  4. SparkSQL重写OperationManager的 newExecuteStatementOperation()方法,  转换为Spark作业

3:发起SQL查询ExecuteStatement(重点)

一点资讯 SparkSQL 查询引擎实践

图3: Beeline 客户端发起查询SQL请求到 SparkSQL Server

  1. 客户端向服务器ThriftBinaryServer发送ExecuteStatement的Thrift请求,服务器接到请求后获取当且Thrift会话中的 HiveSession ,并调用 ExecuteStatement 方法。

  2. 继续调用 executeStatementInternal 函数。此方法调用 步骤2:建立链 接OpenSession : 中初始化好的 OperationManager执行 newExecuteStatementOperation ()

  3. HiveSession继续调用重写的 runInternal ()方法,将Hive请求进一步调用 execute() 来完成Spark的计算逻辑。

  4. Spark收到查询请求后,在 SparkExecuteStatementOperation 中为每一个查询语句随机生成UUID作为JobId:

    1. 此处是关键点即Spark通过UUID来识别每一个的Job,并在Spark UI显示

    2. 而UUID和SQL语句间没有直接的关联

  5. Spark执行SparkContext.Sql() 函数直接计算获取结果的DataFrame,结果计算完成将statement标记为完成状态。返回给客户端 OperationHandle 句柄作为查询结果的依据。

4: 客户端获取结果FetchResults

客户端检查Opeartion的状态,当发现是FINISHED状态时候,请求结果的元数据Meta信息和结果内容信息,分别由客户端的  FetchResults 请求和  GetResultSetMetadata 请求

  1. Spark 将DataFrame的每一行转换为Thrift结果的Row

  2. Spark 将DataFrame的 DataFrame的行列信息转换为 Hive的元数据信息。

5:关闭并释放链接

03

设计原理与代码修改

步骤1: 一点查询客户端根据SQL生成UUID并保存对应关系:

  • 我们根据Hive JDBC接口,封装了自己的查询客户端,并生成唯一的查询ID

  • QueryID 样例: 

    • 20181014_101745_57dbf79bc1e1f27f82911b00b91ddcde

  • 客户端通过 jdbc:hive2://IP:Port/dw? shark.sparksql.queryid=$QueryID  这样的连接来访问后端的ThriftServer。

步骤2: 服务器接受SQL和QueryID,将Spark的JobID设置为QueryID:

一点资讯 SparkSQL 查询引擎实践

图4: 如何实现HiveJDBC客户端传递UUID到服务器端

  1. Spark ThriftServer 建立连接请求的时候,根据Url中的参数信息,HiveSession在建立链接的时候解析 JDBC:hive2 url参数 并且将参数 “shark.sparksql.queryid“ , 以Key-Value 的形式存放于 HiveConf

  2. 在OperationManager中,当创建ExecuteStatementOperation时候,读取所有HiveConf中的配置信息,写入到SparkContxt的配置中。

  3. 执行 ExecuteStatementOperation 时,取上下文  “shark.sparksql.quer yid" 对应的值,并将其设置为JobID, 此ID在SparkUI中可以查询到

    1. statementId=sqlContext.getConf("shark.sparksql.queryid", statementId)

修改后的SparkSQL执行页面中JobID已经生效

步骤3: 客户端读取 Spark RESTful API 获取查询进度:

我们采用Spark的Restful API来获取每一个查询ID的进展情况

Spark RESTful API的访问地址为 

http://HOST:4041/api/v1/applications 

  • Step1:查询当前Spark的ApplicationID:http://HOST:4041/api/v1/applications 

  • Step2:查询当前Spark的所有Job列表:http://HOST:4041/api/v1/applications/application_id/jobs application_id为 Step_1中 取到的application_id

  • Step3: 根据QueryID,可以获取到当前QueryID下所有任务数(numTasks),已经完成的任务数(numCompletedTasks),跳过的Task数(numSkippedTasks)

    • 计算公式为:完成任务数+跳过任务数)/(总任务数)

一点资讯 SparkSQL 查询引擎实践

图6: Spark 的Restful API获取任务状态

步骤4:效果展示

通过以上对SparkSQL的改进,我们的Shark 一点大数据查询系统可以支持SparkSQL的进度查询。

一点资讯 SparkSQL 查询引擎实践

图7:  蓝色为SparkSQL 查询进度条 当前进度9%

另外我们通过SQL语法分析工具、LeaderLatch(ZK Leader选择)等技术实现了SQL格式化、高可用的Presto等,使得我们的Shark查询引擎达到了生产系统的要求。

一点资讯 SparkSQL 查询引擎实践

图8:  一点大数据查询系统与Redash、Zeppelin对比分析

04

未来规划

1: SparkSQL ThriftServer的分布式化:

  1. 智能的客户端根据查询类型、数据分区大小提交至不同的SparkSQL ThriftServer。

  2. 对数据取样、小查询等建立单独的SparkSQL查询后端服务,提高查询的响应速度对大数据量、高内存占用查询分配大内存的Executor保障查询的稳定性。

  3. 统一的Web客户端维护查询ID和后端SparkThriftServer的关系,支持失败重试、同时提交等功能。

2 : 用户无感知的 智能查询引擎选择

  1. 综合利用Presto、SparkSQL、Hive等查询系统。短查询优先Presto,长查询优先Hive

  2. 统一查询语法,消除Presto语法和Hive语法直接的不同。

  3. Presto和Hive的UDF不同,需要建立统一的UDF管理系统实现语法和代码的统一管理。


以上所述就是小编给大家介绍的《一点资讯 SparkSQL 查询引擎实践》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

编程卓越之道

编程卓越之道

海德 / 张菲 / 电子工业出版社 / 2007-4 / 69.00元

《编程卓越之道第二卷:运用底层语言思想编写高级语言代码》是《编程卓越之道》系列书的第二卷,将探讨怎样用高级语言(而非汇编语言)编程得到高效率机器代码。在书中,您可以学到如何分析编译器的输出,以便检验代码的所作所为,从而得到高质量的机器码;了解编译器为常见控制结构生成的典型机器指令,以便在编写高级语言程序时选用恰当的语句;掌握编译器将各种常量和变量类型转换成机器数据的方法,以便于使用这些数据写出又快......一起来看看 《编程卓越之道》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

URL 编码/解码
URL 编码/解码

URL 编码/解码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试