Enzyme SQL引擎的实现与优化

栏目: 数据库 · 发布时间: 6年前

内容简介:Enzyme是挖财数据团队自研的SQL执行引擎,适用于小规模或者中型数据集的快速计算。基于Spark Catalyst实现,Enzyme SQL在查询层面 和Spark SQL完全兼容。至于Dataframe,在Enzyme中有对应的Protein。在API的层次上,Protein和Spark Dataframe几乎完全一致。Enzyme SQL目前应用于信贷风控体系中的变量中心。变量,也就是指标或者特征,是描述一个用户的一个值。最初,变量的加工逻辑由负责风控的数据分析师提供,需要通过数据团队的工程师用Ja

Enzyme是挖财数据团队自研的 SQL 执行引擎,适用于小规模或者中型数据集的快速计算。基于Spark Catalyst实现,Enzyme SQL在查询层面 和Spark SQL完全兼容。至于Dataframe,在Enzyme中有对应的Protein。在API的层次上,Protein和Spark Dataframe几乎完全一致。

应用

Enzyme SQL目前应用于信贷风控体系中的变量中心。变量,也就是指标或者特征,是描述一个用户的一个值。最初,变量的加工逻辑由负责风控的数据分析师提供,需要通过数据团队的工程师用 Java 代码实现。这种方式比较原始,研发的链路和周期也相对冗长。故而,我们使用SQL作为一种加工变量的DSL,提供在离线和实时两个平台上的一致语义。

为什么要使用SQL呢?首先,自研DSL需要做很多设计,包括易用性、实现层面的性能等等;其次,自研的DSL最终被接受被高效使用,不可避免会有一个相对较长的磨合周期;最后,SQL作为数据分析师的看家本领,没有使用的障碍和语义上的歧义,其实现也已经有大量现有的代码可供参考。

Enzyme SQL引擎极致的性能表现和非常低的CPU占用与内存消耗,有效地支撑了变量中心庞大的计算量(一个用户就会触发数以千计的变量计算)。

实践

Enzyme设计之初就是以兼容Spark SQL为目标的,故而在使用上,和Spark SQL的API大体是一致的。EnzymeSession即SparkSession,Protein即Dataframe。

我们从构建一个Protein数据集开始:

// a session for computing
val conf = new EnzymeConf
val session = new EnzymeSession(conf)

// construct a protein from rows and schemas
val schema = StructType(Seq(
  StructField("x", LongType),
  StructField("y", StringType),
  StructField("z", DoubleType),
  StructField("in", IntegerType)
))
val rows = Seq(Row(1L, "234L", 1.1, 12),
  Row(2L, "23L", 23.4, 4245),
  Row(2L, "65L", 5244.2, 234),
  Row(null, "7L", 245.234, 5245),
  Row(4L, "7L", 245.234, 5245))
val df = new Protein(session, rows, schema)
复制代码

这样的一个数据集可以直接展示:

> df.show()

+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|null|  7L|245.234|5245|
|   4|  7L|245.234|5245|
+----+----+-------+----+
复制代码

如果要使用SQL,首先我们要把这个数据集和一个表名关联起来:

> session.register(tableName = "a", df)
> session.sql("select * from a").show()
+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|null|  7L|245.234|5245|
|   4|  7L|245.234|5245|
+----+----+-------+----+
复制代码

上面的代码中 session.sql() 的结果还是一个Protein。除了使用SQL,我们还可以使用Protein里面丰富的API:

> session.sql("select * from a order by x asc").show()
+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|null|  7L|245.234|5245|
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|   4|  7L|245.234|5245|
+----+----+-------+----+

> df.sort("x").show()
+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|null|  7L|245.234|5245|
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|   4|  7L|245.234|5245|
+----+----+-------+----+
复制代码

更多用法的细节可以查看Spark SQL的文档,也可以查看Enzyme的文档。

实现

Enzyme基于Spark Catalyst实现,而Catalyst对标的开源项目是Apache Calcite。Apache Phoenix和Apache Hive等众多项目都在使用Calcite。因为我们的目标是兼容Spark SQL,自然而然选择了Catalyst,作为SQL的解析器、逻辑计划的执行器和优化器。

Enzyme SQL引擎的实现与优化

Spark Catalyst概览

  • SQL Text
    • (parse): Unresolved Logical Plan
      • (analyze): Resolved Logical Plan
        • (optimize): Optimized Logical Plan(s) ----- RBO
          • (planning): Physical Planning ------ CBO

上面的层次结构简明地概括了一个SQL从最原始的SQL文本,到最后执行的各个阶段。其中加粗的部分是Enzyme中所实现的,未加粗的部分是Catalyst所提供的功能。

解析,就是用Antlr4将SQL文本变成一棵AST树,这个AST树经过转换,变成了最原始的逻辑计划。在这样的逻辑计划中,我们是不知道 * 所表示的字段究竟是哪些。

分析,就是结合Catalog中的元数据信息,将原始的逻辑计划中各个未确定的部分(比如 * )和元数据匹配确定下来。如果发现类型无法满足或者所引用的字段根本不存在,就直接抛出AnalysisException。

优化,即通过逻辑计划的等价变换,转换得到最优的逻辑计划。Catalyst中内置了一系列既有的优化规则,比如谓词下推和列剪裁。我们也可以通过Catalyst提供的接口,将自己研发的优化规则加入其中。这里的优化就是RBO,基于规则的优化。

最后是物理计划的生成,一个优化过后的逻辑计划其实可以生成多种等效的物理计划,数据最终决定了其中一个物理计划是最优的。在没有时光机的当下,我们无法将所有物理计划都运行一遍,再选择最优的那个。所以通常的做法是,收集一些关于底层表的统计信息,依据这些信息,预判出执行效率最高的物理计划。这就是所谓的CBO,基于代价的优化。

一个SQL的一生

SELECT *
FROM employee
INNER JOIN department
ON employee.DepartmentID = department.DepartmentID
复制代码

我们用上面这个SQL来详细了解一下上述各个阶段。

分析阶段

Project [*]
+- 'Join Inner, ('employee.DepartmentID = 'department.DepartmentID)
   :- 'UnresolvedRelation `employee`
   +- 'UnresolvedRelation `department`


Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
+- Join Inner, (DepartmentID#7L = DepartmentID#0L)
   :- SubqueryAlias employee
   :  +- LocalRelation [LastName#6, DepartmentID#7L]
   +- SubqueryAlias department
      +- LocalRelation [DepartmentID#0L, DepartmentName#1]
复制代码

我们看到 * 已经被展开成了四个明确的字段,而且每个字段都有明确的ID标志,从而可以明确判定这个字段来自于哪一个表。当我们需要对Spark SQL做精确到字段级别的权限控制的时候,我们所需要的其实就是经过分析的逻辑计划。

优化

Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
+- Join Inner, (DepartmentID#7L = DepartmentID#0L)
   :- SubqueryAlias employee
   :  +- LocalRelation [LastName#6, DepartmentID#7L]
   +- SubqueryAlias department
      +- LocalRelation [DepartmentID#0L, DepartmentName#1]

Join Inner, (DepartmentID#7L = DepartmentID#0L)
:- Filter isnotnull(DepartmentID#7L)
:  +- LocalRelation [LastName#6, DepartmentID#7L]
+- Filter isnotnull(DepartmentID#0L)
   +- LocalRelation [DepartmentID#0L, DepartmentName#1]
复制代码

因为这是一个inner join,所以这里的一个优化点其实是在做join之前,把join key为null的行过滤掉。

物理计划的生成

我们模仿Spark SQL中SparkPlan的实现,提供了简化的EnzymePlan:

abstract class EnzymePlan extends QueryPlan[EnzymePlan] {
  def iterator: Iterator[InternalRow]
  override def output: Seq[Attribute]
  ...
}

trait LeafExecNode extends EnzymePlan {
  override final def children: Seq[EnzymePlan] = Nil
}

trait UnaryExecNode extends EnzymePlan {
  def child: EnzymePlan

  override final def children: Seq[EnzymePlan] = child :: Nil
}

trait BinaryExecNode extends EnzymePlan {
  def left: EnzymePlan

  def right: EnzymePlan

  override final def children: Seq[EnzymePlan] = Seq(left, right)
}
复制代码

在这个代码片段中,EnzymePlan是核心,其中output表示一个物理计划的节点上结果集的元数据信息,而iterator则是调用这个物理计划节点的入口。我们看到有三类物理计划:

  • LeafExecNode : LocalTableScanExec, LazyLocalTableScanExec
  • UnaryExecNode : ProjectExec, LimitExec, FilterExec
  • BinaryExecNode : HashJoinExec, NestedLoopExec

Enzyme中的部分物理计划实现分类之后,如上所示。物理计划整体上是一棵树,数据实际上是从叶节点(Leaf)开始,经过过滤或者转换(Unary)或者合流(Binary),最终汇聚到根节点,得到计算结果。叶节点就是我们的数据源。有两个输入源的是Union或者Join,而只有一个输入源的就是Projection,Filter,Sort等算子。

上一节中优化之后的逻辑计划可以生成这样的物理计划:

HashJoinExec [DepartmentID#11L], [DepartmentID#4L]
         , BuildRight, Inner
:- FilterExec isnotnull(DepartmentID#11L)
:  +- LazyLocalTableScan [LastName#10, DepartmentID#11L],
                         employee, catalog@60dcf9ec
 +- FilterExec isnotnull(DepartmentID#4L)
   +- LazyLocalTableScan [DepartmentID#4L, DepartmentName#5],
                         department, catalog@60dcf9ec
复制代码

计算通过在根节点调用iterator方法,层层回溯:

HashJoinExec.iterator
 + FilterExec.iterator
   + LazyLocalTableScan(employee).iterator
 + FilterExec.iterator
   + LazyLocalTableScan(department).iterator
复制代码

性能调优

首先,我们需要定位性能瓶颈。JVM生态中有很多做Profiling的工具。Enzyme在优化过程中,使用的是JDK中自带的jmc命令和FlightRecord。通过jmc的分析,可以定位到热点的方法,耗时的方法等有帮助的信息。我们有两种优化的策略。

  • 其一,直接替换掉慢的部分
  • 其二,对无法优化的部分做必要的缓存
  • 其三,逻辑计划优化

优化点一:动态代码生成调优

Spark的钨丝计划引入了动态代码生成的技术,比较有效地解决了三方面的问题(详见参考资料2):

  • 大量虚函数调用,生成的实际代码不再需要执行表达式系统中统一定义的虚函数
  • 判断数据类型和操作算子等内容的大型分支选择语句
  • 常数传播限制,生成的代码能够确定性地折叠常量

对于Enzyme的使用场景,动态代码生成并不一定有性能优化的效果,我们使用JMH做基准测试,将一部分使得性能变差的代码生成关闭掉。

数以千计的SQL会生成大量Java类,在引擎中编译并缓存,会带来一些内存上的占用和CPU的消耗,也是我们做取舍的其中一个原因。

优化点二:缓存

我们做的最主要的缓存就是从Unresolve Logical Plan到Physical Plan的生成。为什么不是直接从SQL到Physical Plan呢?因为SQL解析的开销实际上很小,而且略有差异的SQL所生成的Unresolved Logical Plan可能是一模一样的。

在物理计划的缓存中,还有两点需要注意:

  • 其一,物理计划必须和数据隔离
  • 其二,物理计划的计算不能有副作用

只有这样,我们的缓存才是有效的、正确无误的。另外,在表的schema发生改变的时候,我们还需要让所缓存的相关物理计划失效。

优化点三:新增逻辑计划优化规则

Catalyst中的优化器提供了可扩展的接口,使得我们可以自定义逻辑计划优化的规则。Databricks在Spark Summit上做过一个题为A Deep Dive into Spark SQL's Catalyst Optimizer的讲座,其中有细节的介绍。

具体的接口如下:

spark.experimental.extraStrategies = CustomizedExtraStrategy :: Nil
复制代码

我们利用这个接口,针对我们的业务数据,专门定制了一系列额外的优化规则,极大地提升了引擎的性能。

Enzyme的未来

  1. 开源
  2. 做更多针对小数据集的优化,进一步改善性能
  3. 基于Enzyme,做一些上层生态的扩展

对于第三点,我们想做的实际上是让Enzyme和其他生态更好地结合。比如如何将Enzyme运用到Spark Streaming或者Flink Streaming中,如何在Spring Boot中更加方便地使用Enzyme,如何在机器学习中使用Enzyme。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Security Testing Cookbook

Web Security Testing Cookbook

Paco Hope、Ben Walther / O'Reilly Media / 2008-10-24 / USD 39.99

Among the tests you perform on web applications, security testing is perhaps the most important, yet it's often the most neglected. The recipes in the Web Security Testing Cookbook demonstrate how dev......一起来看看 《Web Security Testing Cookbook》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具