Enzyme SQL引擎的实现与优化

栏目: 数据库 · 发布时间: 5年前

内容简介:Enzyme是挖财数据团队自研的SQL执行引擎,适用于小规模或者中型数据集的快速计算。基于Spark Catalyst实现,Enzyme SQL在查询层面 和Spark SQL完全兼容。至于Dataframe,在Enzyme中有对应的Protein。在API的层次上,Protein和Spark Dataframe几乎完全一致。Enzyme SQL目前应用于信贷风控体系中的变量中心。变量,也就是指标或者特征,是描述一个用户的一个值。最初,变量的加工逻辑由负责风控的数据分析师提供,需要通过数据团队的工程师用Ja

Enzyme是挖财数据团队自研的 SQL 执行引擎,适用于小规模或者中型数据集的快速计算。基于Spark Catalyst实现,Enzyme SQL在查询层面 和Spark SQL完全兼容。至于Dataframe,在Enzyme中有对应的Protein。在API的层次上,Protein和Spark Dataframe几乎完全一致。

应用

Enzyme SQL目前应用于信贷风控体系中的变量中心。变量,也就是指标或者特征,是描述一个用户的一个值。最初,变量的加工逻辑由负责风控的数据分析师提供,需要通过数据团队的工程师用 Java 代码实现。这种方式比较原始,研发的链路和周期也相对冗长。故而,我们使用SQL作为一种加工变量的DSL,提供在离线和实时两个平台上的一致语义。

为什么要使用SQL呢?首先,自研DSL需要做很多设计,包括易用性、实现层面的性能等等;其次,自研的DSL最终被接受被高效使用,不可避免会有一个相对较长的磨合周期;最后,SQL作为数据分析师的看家本领,没有使用的障碍和语义上的歧义,其实现也已经有大量现有的代码可供参考。

Enzyme SQL引擎极致的性能表现和非常低的CPU占用与内存消耗,有效地支撑了变量中心庞大的计算量(一个用户就会触发数以千计的变量计算)。

实践

Enzyme设计之初就是以兼容Spark SQL为目标的,故而在使用上,和Spark SQL的API大体是一致的。EnzymeSession即SparkSession,Protein即Dataframe。

我们从构建一个Protein数据集开始:

// a session for computing
val conf = new EnzymeConf
val session = new EnzymeSession(conf)

// construct a protein from rows and schemas
val schema = StructType(Seq(
  StructField("x", LongType),
  StructField("y", StringType),
  StructField("z", DoubleType),
  StructField("in", IntegerType)
))
val rows = Seq(Row(1L, "234L", 1.1, 12),
  Row(2L, "23L", 23.4, 4245),
  Row(2L, "65L", 5244.2, 234),
  Row(null, "7L", 245.234, 5245),
  Row(4L, "7L", 245.234, 5245))
val df = new Protein(session, rows, schema)
复制代码

这样的一个数据集可以直接展示:

> df.show()

+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|null|  7L|245.234|5245|
|   4|  7L|245.234|5245|
+----+----+-------+----+
复制代码

如果要使用SQL,首先我们要把这个数据集和一个表名关联起来:

> session.register(tableName = "a", df)
> session.sql("select * from a").show()
+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|null|  7L|245.234|5245|
|   4|  7L|245.234|5245|
+----+----+-------+----+
复制代码

上面的代码中 session.sql() 的结果还是一个Protein。除了使用SQL,我们还可以使用Protein里面丰富的API:

> session.sql("select * from a order by x asc").show()
+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|null|  7L|245.234|5245|
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|   4|  7L|245.234|5245|
+----+----+-------+----+

> df.sort("x").show()
+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|null|  7L|245.234|5245|
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|   4|  7L|245.234|5245|
+----+----+-------+----+
复制代码

更多用法的细节可以查看Spark SQL的文档,也可以查看Enzyme的文档。

实现

Enzyme基于Spark Catalyst实现,而Catalyst对标的开源项目是Apache Calcite。Apache Phoenix和Apache Hive等众多项目都在使用Calcite。因为我们的目标是兼容Spark SQL,自然而然选择了Catalyst,作为SQL的解析器、逻辑计划的执行器和优化器。

Enzyme SQL引擎的实现与优化

Spark Catalyst概览

  • SQL Text
    • (parse): Unresolved Logical Plan
      • (analyze): Resolved Logical Plan
        • (optimize): Optimized Logical Plan(s) ----- RBO
          • (planning): Physical Planning ------ CBO

上面的层次结构简明地概括了一个SQL从最原始的SQL文本,到最后执行的各个阶段。其中加粗的部分是Enzyme中所实现的,未加粗的部分是Catalyst所提供的功能。

解析,就是用Antlr4将SQL文本变成一棵AST树,这个AST树经过转换,变成了最原始的逻辑计划。在这样的逻辑计划中,我们是不知道 * 所表示的字段究竟是哪些。

分析,就是结合Catalog中的元数据信息,将原始的逻辑计划中各个未确定的部分(比如 * )和元数据匹配确定下来。如果发现类型无法满足或者所引用的字段根本不存在,就直接抛出AnalysisException。

优化,即通过逻辑计划的等价变换,转换得到最优的逻辑计划。Catalyst中内置了一系列既有的优化规则,比如谓词下推和列剪裁。我们也可以通过Catalyst提供的接口,将自己研发的优化规则加入其中。这里的优化就是RBO,基于规则的优化。

最后是物理计划的生成,一个优化过后的逻辑计划其实可以生成多种等效的物理计划,数据最终决定了其中一个物理计划是最优的。在没有时光机的当下,我们无法将所有物理计划都运行一遍,再选择最优的那个。所以通常的做法是,收集一些关于底层表的统计信息,依据这些信息,预判出执行效率最高的物理计划。这就是所谓的CBO,基于代价的优化。

一个SQL的一生

SELECT *
FROM employee
INNER JOIN department
ON employee.DepartmentID = department.DepartmentID
复制代码

我们用上面这个SQL来详细了解一下上述各个阶段。

分析阶段

Project [*]
+- 'Join Inner, ('employee.DepartmentID = 'department.DepartmentID)
   :- 'UnresolvedRelation `employee`
   +- 'UnresolvedRelation `department`


Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
+- Join Inner, (DepartmentID#7L = DepartmentID#0L)
   :- SubqueryAlias employee
   :  +- LocalRelation [LastName#6, DepartmentID#7L]
   +- SubqueryAlias department
      +- LocalRelation [DepartmentID#0L, DepartmentName#1]
复制代码

我们看到 * 已经被展开成了四个明确的字段,而且每个字段都有明确的ID标志,从而可以明确判定这个字段来自于哪一个表。当我们需要对Spark SQL做精确到字段级别的权限控制的时候,我们所需要的其实就是经过分析的逻辑计划。

优化

Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
+- Join Inner, (DepartmentID#7L = DepartmentID#0L)
   :- SubqueryAlias employee
   :  +- LocalRelation [LastName#6, DepartmentID#7L]
   +- SubqueryAlias department
      +- LocalRelation [DepartmentID#0L, DepartmentName#1]

Join Inner, (DepartmentID#7L = DepartmentID#0L)
:- Filter isnotnull(DepartmentID#7L)
:  +- LocalRelation [LastName#6, DepartmentID#7L]
+- Filter isnotnull(DepartmentID#0L)
   +- LocalRelation [DepartmentID#0L, DepartmentName#1]
复制代码

因为这是一个inner join,所以这里的一个优化点其实是在做join之前,把join key为null的行过滤掉。

物理计划的生成

我们模仿Spark SQL中SparkPlan的实现,提供了简化的EnzymePlan:

abstract class EnzymePlan extends QueryPlan[EnzymePlan] {
  def iterator: Iterator[InternalRow]
  override def output: Seq[Attribute]
  ...
}

trait LeafExecNode extends EnzymePlan {
  override final def children: Seq[EnzymePlan] = Nil
}

trait UnaryExecNode extends EnzymePlan {
  def child: EnzymePlan

  override final def children: Seq[EnzymePlan] = child :: Nil
}

trait BinaryExecNode extends EnzymePlan {
  def left: EnzymePlan

  def right: EnzymePlan

  override final def children: Seq[EnzymePlan] = Seq(left, right)
}
复制代码

在这个代码片段中,EnzymePlan是核心,其中output表示一个物理计划的节点上结果集的元数据信息,而iterator则是调用这个物理计划节点的入口。我们看到有三类物理计划:

  • LeafExecNode : LocalTableScanExec, LazyLocalTableScanExec
  • UnaryExecNode : ProjectExec, LimitExec, FilterExec
  • BinaryExecNode : HashJoinExec, NestedLoopExec

Enzyme中的部分物理计划实现分类之后,如上所示。物理计划整体上是一棵树,数据实际上是从叶节点(Leaf)开始,经过过滤或者转换(Unary)或者合流(Binary),最终汇聚到根节点,得到计算结果。叶节点就是我们的数据源。有两个输入源的是Union或者Join,而只有一个输入源的就是Projection,Filter,Sort等算子。

上一节中优化之后的逻辑计划可以生成这样的物理计划:

HashJoinExec [DepartmentID#11L], [DepartmentID#4L]
         , BuildRight, Inner
:- FilterExec isnotnull(DepartmentID#11L)
:  +- LazyLocalTableScan [LastName#10, DepartmentID#11L],
                         employee, catalog@60dcf9ec
 +- FilterExec isnotnull(DepartmentID#4L)
   +- LazyLocalTableScan [DepartmentID#4L, DepartmentName#5],
                         department, catalog@60dcf9ec
复制代码

计算通过在根节点调用iterator方法,层层回溯:

HashJoinExec.iterator
 + FilterExec.iterator
   + LazyLocalTableScan(employee).iterator
 + FilterExec.iterator
   + LazyLocalTableScan(department).iterator
复制代码

性能调优

首先,我们需要定位性能瓶颈。JVM生态中有很多做Profiling的工具。Enzyme在优化过程中,使用的是JDK中自带的jmc命令和FlightRecord。通过jmc的分析,可以定位到热点的方法,耗时的方法等有帮助的信息。我们有两种优化的策略。

  • 其一,直接替换掉慢的部分
  • 其二,对无法优化的部分做必要的缓存
  • 其三,逻辑计划优化

优化点一:动态代码生成调优

Spark的钨丝计划引入了动态代码生成的技术,比较有效地解决了三方面的问题(详见参考资料2):

  • 大量虚函数调用,生成的实际代码不再需要执行表达式系统中统一定义的虚函数
  • 判断数据类型和操作算子等内容的大型分支选择语句
  • 常数传播限制,生成的代码能够确定性地折叠常量

对于Enzyme的使用场景,动态代码生成并不一定有性能优化的效果,我们使用JMH做基准测试,将一部分使得性能变差的代码生成关闭掉。

数以千计的SQL会生成大量Java类,在引擎中编译并缓存,会带来一些内存上的占用和CPU的消耗,也是我们做取舍的其中一个原因。

优化点二:缓存

我们做的最主要的缓存就是从Unresolve Logical Plan到Physical Plan的生成。为什么不是直接从SQL到Physical Plan呢?因为SQL解析的开销实际上很小,而且略有差异的SQL所生成的Unresolved Logical Plan可能是一模一样的。

在物理计划的缓存中,还有两点需要注意:

  • 其一,物理计划必须和数据隔离
  • 其二,物理计划的计算不能有副作用

只有这样,我们的缓存才是有效的、正确无误的。另外,在表的schema发生改变的时候,我们还需要让所缓存的相关物理计划失效。

优化点三:新增逻辑计划优化规则

Catalyst中的优化器提供了可扩展的接口,使得我们可以自定义逻辑计划优化的规则。Databricks在Spark Summit上做过一个题为A Deep Dive into Spark SQL's Catalyst Optimizer的讲座,其中有细节的介绍。

具体的接口如下:

spark.experimental.extraStrategies = CustomizedExtraStrategy :: Nil
复制代码

我们利用这个接口,针对我们的业务数据,专门定制了一系列额外的优化规则,极大地提升了引擎的性能。

Enzyme的未来

  1. 开源
  2. 做更多针对小数据集的优化,进一步改善性能
  3. 基于Enzyme,做一些上层生态的扩展

对于第三点,我们想做的实际上是让Enzyme和其他生态更好地结合。比如如何将Enzyme运用到Spark Streaming或者Flink Streaming中,如何在Spring Boot中更加方便地使用Enzyme,如何在机器学习中使用Enzyme。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

python 3标准库

python 3标准库

道格·赫尔曼 / 机械工业出版社 / 2018-10 / 199

在本书中,你会看到用来处理文本、数据类型、算法、数学计算、文件系统、网络通信、Internet、XML、Email、加密、并发性、运行时和语言服务等各个方面的实用代码和解决方案。在内容安排上,每一节都会全面介绍一个模块,并提供一些很有价值的补充资源链接,这使得本书成为一本理想的Python标准库参考手册。一起来看看 《python 3标准库》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

在线进制转换器
在线进制转换器

各进制数互转换器

html转js在线工具
html转js在线工具

html转js在线工具