用ApacheArrow加速PySpark

栏目: 服务器 · 发布时间: 5年前

内容简介:Pandas、Numpy是做数据分析最常使用的Python包,如果数据存在Hadoop又想用Pandas做一些数据处理,通常会使用PySpark的可以看到,近500w数据的toPandas操作,开启arrow后,粗略耗时统计从39s降低为2s。如何开启arrow,就是spark.sql.execution.arrow.enabled=true这个配置了,spark2.3开始支持。

用ApacheArrow加速PySpark

Pandas、Numpy是做数据分析最常使用的 Python 包,如果数据存在Hadoop又想用Pandas做一些数据处理,通常会使用PySpark的 DataFrame.toPandas() 这个方法。让人不爽的是,这个方法执行很慢,数据量越大越慢。

做个测试

Using Python version 2.7.14 (default, Oct  5 2017 02:28:52)  
SparkSession available as 'spark'.  
>>> def test():
...     from pyspark.sql.functions import rand
...     from better_utils import TimeUtil
...     start = TimeUtil.now_unix()
...     df = spark.range(1 << 22).toDF('id').withColumn("x", rand())
...     df.toPandas()
...     cost = TimeUtil.now_unix() - start
...     print "耗时:{}s".format(cost)
... 
>>> test()
耗时:39s                                                                        
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> test()
/Users/yulian/anaconda3/envs/python2/lib/python2.7/site-packages/pyarrow/__init__.py:152: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
耗时:2s                                                                         
>>>

可以看到,近500w数据的toPandas操作,开启arrow后,粗略耗时统计从39s降低为2s。

如何开启arrow,就是spark.sql.execution.arrow.enabled=true这个配置了,spark2.3开始支持。

另外需要安装pip install pyarrow。

是什么

Arrow是一种跨语言的基于内存的列式数据结构。

在分布式系统内部,每个系统都有自己的内存格式,大量的 CPU 资源被消耗在序列化和反序列化过程中,并且由于每个项目都有自己的实现,没有一个明确的标准,造成各个系统都在重复着复制、转换工作,这种问题在微服务系统架构出现之后更加明显,Arrow 的出现就是为了解决这一问题。作为一个跨平台的数据层,我们可以使用 Arrow 加快大数据分析项目的运行速度。

用ApacheArrow加速PySpark

需要明确的是,Apache Arrow 不是一个引擎,也不是一个存储系统,它是用来处理分层的列式内存数据的一系列格式和算法。

为什么

PySpark中使用DataFrame.toPandas()将数据从Spark DataFrame转换到Pandas中是非常低效的。

Spark和Python基于Socket通信,使用serializers/deserializers交换数据。

Python的反序列化pyspark.serializers.PickleSerializer使用cPickle模块的标准pickle格式。

Spark先把所有的行汇聚到driver上,然后通过初始转换,以消除Scala和 Java 之间的任何不兼容性,使用Pyrolite库的org.apache.spark.api.python.SerDeUtil.AutoBatchedPickler去把Java对象序列化成pickle格式。

然后序列化后的数据分批发送个Python的worker子进程,这个子进程会反序列化每一行,拼成一个大list;最后利用 pandas.DataFrame.from_records() 从这个list来创建一个Pandas DataFrame。

上面的过程有两个明显问题:

1)即使使用CPickle,Python的序列化也是一个很慢的过程。

2)利用 from_records 来创建一个 pandas.DataFrame 需要遍历Python list,将每个value转换成Pandas格式。

Arrow可以优化这几个步骤:

1)一旦数据变成了Arrow的内存格式,就不再有序列化的需要,因为Arrow数据可以直接发送到Python进程。

2)当在Python里接收到Arrow数据后,pyarrow可以利用zero-copy技术,一次性的从整片数据来创建 pandas.DataFrame,而不需要轮询去处理每一行记录。另外转换成Arrow数据的过程可以在JVM里并行完成,这样可以显著降低driver的压力。

Arrow有点可以总结为:

* 序列化友好

* 向量化

序列化友好指的是,Arrow提供了一个内存格式,该格式本身是跨应用的,无论你放到哪,都是这个格式,中间如果需要网络传输这个格式,那么也是序列化友好的,只要做下格式调整(不是序列化)就可以将数据发送到另外一个应用里。这样就大大的降低了序列化开销。

向量化指的是,首先Arrow是将数据按block进行传输的,其次是可以对立面的数据按列进行处理的。这样就极大的加快了处理速度。

感兴趣的话可以看下Python各种序列化方案的对比:

http://satoru.rocks/2018/08/fastest-way-to-serialize-array/

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

硅谷之谜

硅谷之谜

吴军 / 人民邮电出版社 / 2015-12-1 / 59.00

这是一本颠覆人们对信息时代的认识、对创新和创业的理解的好书。作者吴军通过介绍硅谷成功的秘诀,揭示了信息时代的特点和方法论。 近年来,吴军从技术和管理人员变成了投资人,他对IT领域,尤其是对科技创新因而有了更深入的了解。他根据这些年在硅谷所获得的第一手资料,结合自己的思考,回答了长期以来令大家深感困惑的一个不解之谜,那就是—为什么硅谷在全世界其他地区难以复制? 《硅谷之谜》从某种意义上讲......一起来看看 《硅谷之谜》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具