从 F1 Query 论文看 SQL 查询的执行模式

栏目: 数据库 · 发布时间: 5年前

内容简介:F1 是起源于 Google AdWords 的分布式 SQL 查询引擎,跟底下的 Spanner 分布式存储搭配,开启了分布式关系数据库——所谓 NewSQL 的时代。我们今天说的是 F1 团队在 VLDB2018 上发的文章F1 Query 是一个分布式的 SQL 执行引擎,现在大数据领域流行的 Presto、Spark SQL、Hive 等等,都可以算在这个范畴里。类似地,F1 Query 也支持对各种不同数据源的查询,既可以是传统的关系表、也可以是 Parquet 这样的半结构化数据。

从 F1 Query 论文看  <a href='https://www.codercto.com/topics/18630.html'>SQL</a>  查询的执行模式

F1 是起源于 Google AdWords 的分布式 SQL 查询引擎,跟底下的 Spanner 分布式存储搭配,开启了分布式关系数据库——所谓 NewSQL 的时代。我们今天说的是 F1 团队在 VLDB2018 上发的文章 F1 Query: Declarative Querying at Scale ,它和之前我们说的 F1 几乎是两个东西。

F1 Query 是一个分布式的 SQL 执行引擎,现在大数据领域流行的 Presto、Spark SQL、Hive 等等,都可以算在这个范畴里。类似地,F1 Query 也支持对各种不同数据源的查询,既可以是传统的关系表、也可以是 Parquet 这样的半结构化数据。

这样一来,不同数据格式的壁垒也被打破了:你可以在一个系统里完成对不同数据源的 Join,无论数据以什么形式存放在哪里。商业上管这个叫 Federated Query 或者 DataLake,几家云计算巨头都有类似的产品。

那 F1 Query 的贡献在哪里呢?

F1 Query 定义了三种不同类型的查询执行模式,根据查询的数据量大小或执行时间,将用户查询划分成:

  1. 单机执行(Centralized Execution)
  2. 分布式执行(Distributed Execution)
  3. 批处理执行(Batch Execution)

前两个是交互式的,即客户端会等待结果返回。最后一个批处理更像是 ETL:客户端输入任务之后就不再管了,查询结果会被写到指定的地方。

单机执行

单机执行对应我们熟悉的 OLTP 查询,例如单表点查、带索引的 Join 等。这类查询本身已经足够简单,只需几毫秒就能做完,处理它们的最好方式就是在收到请求的机器上立即执行。

在 F1 Query 系统中有 F1 Server 和 F1 Worker 等角色。F1 Server 负责接收客户端请求,如果它判断这个查询应当使用单机而不是分布式方式执行,它就亲力亲为、直接执行并返回结果。

这样的行为和绝大多数单机 OLTP 数据库是一致的,例如 MySQL 采用的是 Thread Pool + Dispatcher 的处理模型,Thread Pool 的规模是一定的,Dispatcher 根据高低优先级分派执行任务。最终一个请求只会被一个线程处理,换句话说,对某个查询来说其执行过程是单线程的。

从 F1 Query 论文看 SQL 查询的执行模式

MySQL 的线程池处理模型,一般存在多个 Thread Group,图中描绘了一个 Thread Group

F1 Query 单机查询的执行器同样也是教科书式的 Valcano 模型,但也无可厚非——对 OLTP 来说这已经足够好。如下图所示,从顶层算子开始递归地调用 GetNext() ,每次取出一行数据,直到没有更多数据为止。各个算子只需要实现 GetNext() 接口即可,简单清晰。

从 F1 Query 论文看 SQL 查询的执行模式

分布式执行

F1 Query 对更复杂的查询,例如没有索引的 Join 或聚合等,则采取分布式查询的方式。大部分 OLAP 查询、尤其是 Ad-hoc 的查询都落在这一分类中。这种情况下,分布式导致的网络、调度等 Overhead 已经远小于查询本身的成本;而且随着数据量的增加,单节点内存显然不够用了。

从 F1 Query 论文看 SQL 查询的执行模式

F1 Query 的系统架构,主要包含 F1 Master、F1 Server、F1 Worker 三个角色,其他 Catalog、UDF Server、Batch Metadata 用于存储查询相关的 Metadata 等

这时,上图中的 F1 Worker 就派上用场了。 F1 Server 此时仅仅作为协调者存在,将任务分配给多个 Worker ,直到 Worker 的任务全都完成,再把结果汇总发给客户端。

这个模式眼熟吗?你可能会想到 Greenplum 这类的数据仓库,已经很接近了。最相似的我认为是 Presto。Presto 是 Facebook 开发的一套分布式 SQL 引擎,如果单单只看 F1 Query 的分布式查询,和 Presto 大同小异。

与单机执行不同的是, 分布式查询中的算子可以有多个实例(Instance)并行执行,每个实例负责其中一部分数据 。在 F1 Query 里这样一个数据分片被称为 Fragment,在 Spark SQL 里叫 Partition,在 Presto 里叫 Split。

从 F1 Query 论文看 SQL 查询的执行模式

下面的例子是一个 Join-Aggregation-Sort 的查询,它分成了 4 个阶段:

  1. Scan(Clicks) 被分配给 1000 个 F1 Worker 上并行拉取数据,并根据每一行数据的 Hash(AdID) 发送给对应的 HashJoin 分片,即一般说的 shuffle 过程;
  2. Scan(Ads) 被分配给 200 个 F1 Worker 上并行拉去数据,并且也以同样的方式做 shuffle;
  3. HashJoinPartialAggregation :根据 Join Key 分成了 1000 个并行任务,各自做 Join 计算,并做一次聚合;
  4. 最后,F1 Server 把各个分片的聚合结果再汇总起来,返回给客户端。

从 F1 Query 论文看 SQL 查询的执行模式

Presto 具有的缺陷,F1 Query 分布式查询同样也有,比如:

  • 纯内存的计算方式,无法利用磁盘的存储空间,某些查询可能面临内存不足;
  • 没有 Fault-tolerance,对于一个涉及上千台 Worker 的查询,任何一台的重启都会导致查询失败。

批处理执行

F1 Query 还有个独特的批处理执行,这个模式定位于更大的数据量、更久的查询时间;另一方面,它的结果不再是返回给客户端,而是将查询结果写到指定的地方,例如 Colossus(第二代 GFS)上。

上一节我们提道,Presto 的模式没有 Fault-tolerance,这对于长时间运行的批处理任务是致命的,查询失败的概率会大大增加。批处理查询首先要解决的就是 Fault-tolerance 问题: 必须能以某种方式从 Worker 节点的失败中恢复

解决这个问题有两条路可走:一是 MapReduce 的模式,将计算分成若干个阶段(Stage),而中间结果持久化到 HDFS 这样的分布式文件系统上;二是 Spark RDD 模式,通过记录祖先(Lineage)信息,万一发生节点失败,就通过简单的重算来恢复丢失的数据分片,这样数据就可以放在内存里不用落盘。

Spark 的做法显然是更先进的,原因有很多,这里只说最重要的 2 条。欲知详情可以看我之前的博客文章 《一文读懂 Apache Spark》

  1. Spark 的计算基本再内存中,只有内存不够时才会溢出到磁盘,而 MR 的中间结果必须写入外部文件系统;
  2. Spark 可以把执行计划 DAG 中相互不依赖的 Stage 并行执行,而 MR 只能线性的一个 Stage 执行。

但是出乎意料的是,F1 Query 采用的是前者,也就是 MR 模式。这其中的原因我们不得而知,我猜想和 Google 自家的 FlumeJava 不够给力有关系。

如下图。左边的 Physical Plan 和上一节的分布式查询是一样的,不同之处是 在批处理模式下,它被转换成一系列的 MR 任务 ,之后交给调度器(Scheduler)去处理即可。

从 F1 Query 论文看 SQL 查询的执行模式

相比分布式查询的执行方式,MapReduce 模式下各个步骤都会持久化到外部文件系统。不仅如此, Pipeline 的执行也没法进行 。以上一节提到的 HashJoin 为例,左边 Clicks 的 Scan 和 HashJoin 原本是可以 Pipeline 执行的,但是在批处理模式下,必须等到 Scan(Clicks) 这个阶段完成才能进行下一步的 HashJoin 阶段。

单机并行执行

除了上面聊的 F1 Query 所支持的 3 种查询模式之外,事实上还有一种处理模型位于单线程执行和分布式执行之间:单机的并行查询。初看这似乎与分布式查询很相似,但又有些不同:

  • 不用考虑单个 Worker 的失败恢复,因为它们都在同一个进程里;
  • 各个 Worker 线程的内存是共享的,它们之间交换数据无需考虑网络通讯代价。

这种模式在传统的关系型数据库上也很常见,尤其是 Postgres、SQL Server 这类以 OLAP 查询见长的选手。以 Postgres 为例,在开启并行查询的情况下,查询优化器会根据代价选择是否生成并行执行计划;如果生成了并行执行计划,执行器会调度多个 Worker 一起完成工作。

下图是一个 Postgres 上并行 Hash Join 的例子,从执行计划上看和上一节几乎一样,唯一区别是这里的 shuffle 过程变得容易多了,不再是一件代价很高的事情。

从 F1 Query 论文看 SQL 查询的执行模式

相比分布式查询, 单机并行的最大优势在于响应速度更快 ,因为省去了大量的网络 IO 时间,而且调度一个 Worker 线程要比调度一个 Worker 机器快得多。

但别忘了,单机运算能力的 scale up 成本非常高,并且是存在上限的。对于 Google 之类的互联网公司,绝大部分查询都超出了单机的存储或计算能力,我猜测这也是 F1 Query 并未考虑单机并行的理由。

对 F1 Query 的评价

从论文透露的情况来看,F1 Query 还不算个完善、成熟的系统,其定位更像是一个解决业务需求的胶水系统,而非 Spanner 这样的“硬核”技术。它追求的是够用就好,很多地方其实还有很大的改进空间,举几个例子:

  • 对交互式查询,选择分布式还是单机计算目前还是基于启发式规则。
  • 三种模式的执行计划是用一样的优化器生成的。但是客观的说,这其中的差别可是不小的。
  • 优化器是基于规则的。之所以不做 CBO,论文给出的解释是数据源太多,不容易计算。
  • 批处理模式下用 Spark 取代 MR 的模式是更好的选择。

F1 Query 希望用一套系统解决所有 OLTP、OLAP、ETL 需求、用一套系统访问数据中心里各种格式的数据,这两点才是 F1 Query 的核心竞争力。

SQL 执行模式总结

从数据库的视角看,理想的数据库应当隐藏掉查询执行的种种细节,只要用户输入一个声明(例如 SQL),就能以最优的方式进行执行并给出答案。F1 Query 做了个勇敢的尝试, 它将多种执行模型揉合在一个系统中,共享同一套优化器和算子 ,以较低的开发成本获得其中最优的执行性能(在理想情况下)。

下面的表格总结了 4 种执行模式的优势和不足。

单线程 并行执行 分布式并行执行 批处理
代表系统 MySQL / Oracle Postgres / MSSQL Presto / Greenplum Spark / MapReduce
硬件架构 单核 SMP / NUMA MPP MPP
伸缩性 Scale Up 弹性 Scale Out 弹性 Scale Out
Fault-Toralence 重试整个查询 Worker 级 fail-over
典型数据量 若干个 Tuple 单机内存可容纳 大数据 大数据
典型响应时间 毫秒 数百毫秒 秒级 秒级到数小时

总而言之,所谓 No Free Launch —— 没有最优的方案, 数据量是决定能选用哪个执行模式的前提 。实践中,先确保数据量能够承载的下,再谈优化也就明白多了。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

JavaScript

JavaScript

Douglas Crockford / Yahoo Press / 2008-5 / GBP 23.99

Most programming languages contain good and bad parts, but JavaScript has more than its share of the bad, having been developed and released in a hurry before it could be refined. This authoritative b......一起来看看 《JavaScript》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换