经典分布式论文阅读:RDD

栏目: 服务器 · 发布时间: 6年前

内容简介:本文是RDD论文的阅读笔记。RDD是一个分布式内存抽象,用来在大集群上进行内存计算,具备容错能力。RDD主要针对迭代算法和交互式数据挖掘设计,考虑到大部分的应用程序在数据项上进行同一个操作,只允许粗粒度的变换可以简化故障恢复过程。RDD是一个只读的、分区的记录集合。RDD创建的方式只能是这样一来,RDD不需要随时备份,因为总是能够从最早的RDD变换而来。另外,RDD支持

本文是RDD论文的阅读笔记。RDD是一个分布式内存抽象,用来在大集群上进行内存计算,具备容错能力。RDD主要针对迭代算法和交互式数据挖掘设计,考虑到大部分的应用程序在数据项上进行同一个操作,只允许粗粒度的变换可以简化故障恢复过程。

弹性分布式数据集(RDD)

RDD抽象

RDD是一个只读的、分区的记录集合。RDD创建的方式只能是

  1. 从稳定存储中加载
  2. 由其他RDD 变换 而来(例如 map , filterjoin

这样一来,RDD不需要随时备份,因为总是能够从最早的RDD变换而来。另外,RDD支持 持久化分区 。持久化持久保存RDD便于再次使用,分区操作控制分区策略进行优化。

Spark编程接口

开发者可以从稳定存储加载数据,通过变换创建RDD。可以通过 动作 使用RDD获得返回值或者导出在存储系统。典型的 动作

count
collect
save

另外,开发者可以使用 persist 持久化后续会多次用到的RDD。

示例:控制台日志挖掘

假设运维人员希望从保存在HDFS中的超大日志中分析错误,那么他可以首先筛选出错误日志:

lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR"))
errors.persist()
复制代码

可以使用 count 获取错误日志的数量

errors.count()
复制代码

进一步,能够在错误日志中查找更加具体的信息。

// Count errors mentioning MySQL:
errors.filter(_.contains("MySQL")).count()

// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS"))
      .map(_.split(’\t’)(3))
      .collect()
复制代码

程序的世系图如下:

经典分布式论文阅读:RDD

由于 errors 会被多次使用,因此对它进行持久化可以优化查询速度。

RDD模型的优势

经典分布式论文阅读:RDD

和现有比较先进的分布式共享内存(DSM)相比,RDD主要有以下差别:

  • RDD只能通过粗粒度的变换操作创建,极大地简化了容错设计
  • 借助不可修改的性质,可以通过运行副本任务来缓解慢节点问题,而DSM需要处理两个互备任务操作同一段数据的问题
  • 通过数据位置调度任务提升性能
  • 当内存不足的时候,因为RDD的变换都是流式地,因此即使使用硬盘存储也不会太影响性能

不适合RDD的应用

RDD比较适合批量任务,就是在RDD所以的元素上执行一种操作。而对于那种需要做异步细粒度更新状态的任务,RDD就不太适合。

Spark编程接口

在Spark中,用户需要编写一个 驱动程序 (driver),连接到 工作节点 (worker),驱动程序需要将变换操作以闭包的形式传递给工作节点。RDD数据是 静态类型 的,不过Scala支持类型推断,因此很多时候不需要指定类型。

经典分布式论文阅读:RDD

Spark中的RDD操作

Spark中主要的 变换动作 如下表所示:

经典分布式论文阅读:RDD

通过变换创新的RDD属于惰性操作,只有在执行动作的时候会进行实际的运算。其中部分变换,例如 join 只能和键值对组成的RDD上操作。另外,用户也可以要求持久化一些RDD,获取RDD的分区顺序。

示例应用

逻辑斯蒂回归

val points = spark.textFile(...)
                  .map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
    val gradient = points.map{ p =>
        p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
    }.reduce((a,b) => a+b)
    w -= gradient
}
复制代码

页排名

PageRank稍微复杂一点,算法根据链接关系不断更新网页的 排名 。在每次迭代中,每次网页发送 的贡献值给它链接的网页, 为网页自身的排名, 为链接的数量。然而计算排名为 ,其中 为所有贡献值的和, 为网页总数。

// Load graph as an RDD of (URL, outlinks) pairs
val links = spark.textFile(...).map(...).persist()
var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
    // Build an RDD of (targetURL, float) pairs
    // with the contributions sent by each page
    val contribs = links.join(ranks).flatMap {
        (url, (links, rank)) =>
            links.map(dest => (dest, rank/links.size))
    }
    // Sum contributions by URL and get new ranks
    ranks = contribs.reduceByKey((x,y) => x+y)
                    .mapValues(sum => a/N + (1-a)*sum)
}
复制代码

对应的数据世系图如下:

经典分布式论文阅读:RDD

随着迭代次数增加,依赖关系变深,可以给 ranks 设置检查点加速故障恢复。另外,可以控制RDD的分区策略来减少节点之间通信成本,加速计算。例如,可以将 linksranks 都按照 url 的哈希值分区,那么 join 操作就不需要节点之间通信就能完成。

links = spark.textFile(...).map(...)
             .partitionBy(myPartFunc).persist()
复制代码

表示RDD

RDD采用图进行保存,RDD通过接口提供五种信息的访问:分区、依赖、函数、分区规则元数据以及数据存放元数据。RDD提供的接口如下所示:

经典分布式论文阅读:RDD

RDD中的依赖关系有两种:

  • 窄依赖 :父RDD的分区至多被子RDD的一个分区使用
  • 宽依赖 :父RDD的分区被子RDD的多个分区使用

窄依赖的特征相当重要:

  1. 在窄依赖中,每个分区的变换都可以并行独立执行
  2. 故障恢复也更加高效
经典分布式论文阅读:RDD
  • HDFS文件partitions() 返回每个分区在文件的位置, preferredLocations(p) 会返回分区所在节点, iterator(p,parentIters) 读取分区
  • map :将函数作用在父RDD分区的每条记录上
  • union :它的分区为父RDD分区的并集
  • sample :保存一个随机数生成器,从父RDD记录中采样
  • join :如果两个父RDD由同一个 partitioner 分区,那么变换是窄依赖,否则为宽依赖或者混合依赖,生成的子RDD会带有 partitioner

实现

任务调度

在执行运算的时候,调度器会分析世系图,生成一个由不同 阶段 组成的DAG。每个阶段由窄依赖构成,便于进行并行计算。DAG的边界是宽依赖或者已经计算好的RDD分区。调度器会调度计算未计算的分区,调度器会根据数据存在位置调度任务。在宽依赖中,节点可以保存中间结果来加快故障恢复。

当任务发生故障,那么调度器把任务分配给其他节点,如果父RDD分区丢失,那么丢失的分区也要重新计算。目前调度器是没有容错能力的,不过备份数据世系图就足够了。

经典分布式论文阅读:RDD

解释器集成

解释器为每一行编译一个类,然后使用一个函数调用它。Spark解释器在Scala解释器基础上修改了:

  1. 类传输 :为了能让工作节点访问每一行的节码,解释器采用HTTP共享字节码
  2. 修改代码生成 :为了序列化前面行的数据,每一行会引用前一行对象
经典分布式论文阅读:RDD

内存管理

Spark提供了三种保存RDD的方式:

  • 内存中的未序列化 Java 对象:JVM可以直接访问,高性能
  • 内存中的序列化Java对象:内存空间有限的时候,内存利用率高
  • 硬盘中:保存大到内存中无法保存的RDD

系统采用LRU策略管理内存,每次内存不够的时候淘汰最不近使用的RDD分区(和新分区属于同一个RDD的分区除外)。另外,Spark也支持用户手动设置RDD的 持久化优先级

检查点支持

虽然数据世系可以用来回复RDD,但是如果依赖关系过深,那么恢复速度会很慢。因此,有必要通过设置 检查点 来加速故障恢复,系统将这个决定权交给了用户,用户可以调用 persist 设置 REPLICATE 持久化RDD建立检查点。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Data Structures and Algorithms in Python

Data Structures and Algorithms in Python

Michael T. Goodrich、Roberto Tamassia、Michael H. Goldwasser / John Wiley & Sons / 2013-7-5 / GBP 121.23

Based on the authors' market leading data structures books in Java and C++, this book offers a comprehensive, definitive introduction to data structures in Python by authoritative authors. Data Struct......一起来看看 《Data Structures and Algorithms in Python》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换