7个实例全面掌握Hadoop MapReduce

栏目: 服务器 · 发布时间: 6年前

内容简介:7个实例全面掌握Hadoop MapReduce

一、MapReduce基本原理

MapReduce是一种编程模型,用于大规模数据集的分布式运算。

1 、MapReduce通俗解释

图书馆要清点图书数量,有10个书架,管理员为了加快统计速度,找来了10个同学,每个同学负责统计一个书架的图书数量。

张同学统计 书架1

王同学统计 书架2

刘同学统计 书架3

……

过了一会儿,10个同学陆续到管理员这汇报自己的统计数字,管理员把各个数字加起来,就得到了图书总数。

这个过程就可以理解为MapReduce的工作过程。

2 、MapReduce中有两个核心操作

(1)map

管理员分配哪个同学统计哪个书架,每个同学都进行相同的“统计”操作,这个过程就是map。

(2)reduce

每个同学的结果进行汇总,这个过程是reduce。

3 、MapReduce工作过程拆解

下面通过一个景点案例(单词统计)看MapReduce是如何工作的。

有一个文本文件,被分成了4份,分别放到了4台服务器中存储

Text1:the weather is good

Text2:today is good

Text3:good weather is good

Text4:today has good weather

现在要统计出每个单词的出现次数。

处理过程

(1)拆分单词

  • map节点1

输入:“the weather is good”

输出:(the,1),(weather,1),(is,1),(good,1)

7个实例全面掌握Hadoop MapReduce

  • map节点2

输入:“today is good”

输出:(today,1),(is,1),(good,1)

7个实例全面掌握Hadoop MapReduce

  • map节点3

输入:“good weather is good”

输出:(good,1),(weather,1),(is,1),(good,1)

7个实例全面掌握Hadoop MapReduce

  • map节点4

输入:“today has good weather”

输出:(today,1),(has,1),(good,1),(weather,1)

7个实例全面掌握Hadoop MapReduce

(2)排序

  • map节点1

7个实例全面掌握Hadoop MapReduce

  • map节点2

7个实例全面掌握Hadoop MapReduce

  • map节点3

7个实例全面掌握Hadoop MapReduce

  • map节点4

7个实例全面掌握Hadoop MapReduce

(3)合并

  • map节点1

7个实例全面掌握Hadoop MapReduce

  • map节点2

7个实例全面掌握Hadoop MapReduce

  • map节点3

7个实例全面掌握Hadoop MapReduce

  • map节点4

7个实例全面掌握Hadoop MapReduce

(4)汇总统计

每个map节点都完成以后,就要进入reduce阶段了。

例如使用了3个reduce节点,需要对上面4个map节点的结果进行重新组合,比如按照26个字母分成3段,分配给3个reduce节点。

Reduce节点进行统计,计算出最终结果。

7个实例全面掌握Hadoop MapReduce

这就是最基本的MapReduce处理流程。

4 、MapReduce编程思路

了解了MapReduce的工作过程,我们思考一下用代码实现时需要做哪些工作?

  1. 在4个服务器中启动4个map任务

  2. 每个map任务读取目标文件,每读一行就拆分一下单词,并记下来次单词出现了一次

  3. 目标文件的每一行都处理完成后,需要把单词进行排序

  4. 在3个服务器上启动reduce任务

  5. 每个reduce获取一部分map的处理结果

  6. reduce任务进行汇总统计,输出最终的结果数据

但不用担心,MapReduce是一个非常优秀的编程模型,已经把绝大多数的工作做完了,我们只需要关心2个部分:

  1. map处理逻辑——对传进来的一行数据如何处理?输出什么信息?

  2. reduce处理逻辑——对传进来的map处理结果如何处理?输出什么信息?

编写好这两个核心业务逻辑之后,只需要几行简单的代码把map和reduce装配成一个job,然后提交给Hadoop集群就可以了。

至于其它的复杂细节,例如如何启动map任务和reduce任务、如何读取文件、如对map结果 排序 、如何把map结果数据分配给reduce、reduce如何把最终结果保存到文件等等,MapReduce框架都帮我们做好了,而且还支持很多自定义扩展配置,例如如何读文件、如何组织map或者reduce的输出结果等等,后面的示例中会有介绍。

二、MapReduce入门示例:WordCount单词统计

WordCount是非常好的入门示例,相当于helloword,下面就开发一个WordCount的MapReduce程序,体验实际开发方式。

1、安装Hadoop实践环境

您可以选择自己搭建环境,也可以使用打包好的Hadoop环境(版本2.7.3)。

这个Hadoop环境实际上是一个虚机镜像,所以需要安装virtualbox虚拟机、vagrant镜像管理工具,和我的Hadoop镜像,然后用这个镜像启动虚机就可以了,下面是具体操作步骤:

(1)安装virtualbox

下载地址:https://www.virtualbox.org/wiki/Downloads

(2)安装vagrant

因为官网下载较慢,我上传到了云盘

Windows版

链接: https://pan.baidu.com/s/1pKKQGHl

密码: eykr

Mac版

链接: https://pan.baidu.com/s/1slts9yt

密码: aig4

安装完成后,在命令行终端下就可以使用vagrant命令。

(3)下载Hadoop镜像

链接: https://pan.baidu.com/s/1bpaisnd

密码: pn6c

(4)启动

加载Hadoop镜像

vagrant  box add {自定义镜像名称} {镜像所在路径}

例如您想命名为Hadoop,镜像下载后的路径为d:\hadoop.box,加载命令就是这样:

vagrant  box add  hadoop d:\hadoop .box

创建工作目录,例如d:\hdfstest。

进入此目录,初始化

cd d:\hdfstest

vagrant init hadoop

启动虚机

vagrant up

启动完成后,就可以使用SSH客户端登录虚机了

IP   127.0.0.1

端口 2222

用户名 root

密码 vagrant

在Hadoop服务器中启动HDFS和Yarn,之后就可以运行MapReduce程序了

start-dfs.sh

start-yarn.sh

2、创建项目

注:流程是在本机开发,然后打包,上传到Hadoop服务器上运行。

新建项目目录wordcount,其中新建文件pom.xml,内容:

7个实例全面掌握Hadoop MapReduce

然后创建源码目录src/main/java

现在的目录结构

7个实例全面掌握Hadoop MapReduce

3 、代码

mapper程序:src/main/java/WordcountMapper.java

内容:

7个实例全面掌握Hadoop MapReduce

这里定义了一个mapper类,其中有一个map方法。MapReduce框架每读到一行数据,就会调用一次这个map方法。

map的处理流程就是接收一个key value对儿,然后进行业务逻辑处理,最后输出一个key value对儿。

Mapper<LongWritable, Text, Text, IntWritable>

其中的4个类型分别是:输入key类型、输入value类型、输出key类型、输出value类型。

MapReduce框架读到一行数据侯以key value形式传进来,key默认情况下是mr矿机所读到一行文本的起始偏移量(Long类型),value默认情况下是mr框架所读到的一行的数据内容(String类型)。

输出也是key value形式的,是用户自定义逻辑处理完成后定义的key,用户自己决定用什么作为key,value是用户自定义逻辑处理完成后的value,内容和类型也是用户自己决定。

此例中,输出key就是word(字符串类型),输出value就是单词数量(整型)。

这里的数据类型和我们常用的不一样,因为MapReduce程序的输出数据需要在不同机器间传输,所以必须是可序列化的,例如Long类型,Hadoop中定义了自己的可序列化类型LongWritable,String对应的是Text,int对应的是IntWritable。

reduce 程序: src/main/java/WordCountReducer.java

7个实例全面掌握Hadoop MapReduce

这里定义了一个Reducer类和一个reduce方法。

当传给reduce方法时,就变为:

Reducer<Text, IntWritable, Text, IntWritable>

4个类型分别指:输入key的类型、输入value的类型、输出key的类型、输出value的类型。

需要注意,reduce方法接收的是:一个字符串类型的key、一个可迭代的数据集。因为reduce任务读取到map任务处理结果是这样的:

(good,1)(good,1)(good,1)(good,1)

当传给reduce方法时,就变为:

key:good

value:(1,1,1,1)

所以,reduce方法接收到的是同一个key的一组value。

主程序:src/main/java/WordCountMapReduce.java

7个实例全面掌握Hadoop MapReduce

这个main方法就是用来组装一个job并提交执行

4 、编译打包

在pom.xml所在目录下执行打包命令:

mvn package

执行完成后,会自动生成target目录,其中有打包好的jar文件。

现在项目文件结构:

7个实例全面掌握Hadoop MapReduce

5、运行

先把target中的jar上传到Hadoop服务器,然后在Hadoop服务器的HDFS中准备测试文件(把Hadoop所在目录下的txt文件都上传到HDFS)

cd $HADOOP_HOME

hdfs dfs -mkdir -p /wordcount/input

hdfs dfs -put *.txt /wordcount/input

执行wordcount jar

hadoop jar mapreduce-wordcount-0.0.1-SNAPSHOT.jar WordCountMapR

educe /wordcount/input /wordcount/output

执行完成后验证

hdfs dfs -cat /wordcount/output/*

可以看到单词数量统计结果。

三、MapReduce执行过程分析

下面看一下从job提交到执行完成这个过程是怎样。

(1)客户端提交任务

Client提交任务时会先到HDFS中查看目标文件的大小,了解要获取的数据的规模,然后形成任务分配的规划,例如:

a.txt 0-128M交给一个task,128-256M 交给一个task,b.txt 0-128M交给一个task,128-256M交给一个task ...,形成规划文件job.split。

然后把规划文件job.split、jar、配置文件xml提交给yarn(Hadoop集群资源管理器,负责为任务分配合适的服务器资源)

7个实例全面掌握Hadoop MapReduce

(2)启动appmaster

注: appmaster是本次job的主管,负责maptask和reducetask的启动、监控、协调管理工作。

yarn找一个合适的服务器来启动appmaster,并把job.split、jar、xml交给它。

7个实例全面掌握Hadoop MapReduce

(3)启动maptask

Appmaster启动后,根据固化文件job.split中的分片信息启动maptask,一个分片对应一个maptask。

分配maptask时,会尽量让maptask在目标数据所在的datanode上执行。

7个实例全面掌握Hadoop MapReduce

(4)执行maptask

Maptask会一行行地读目标文件,交给我们写的map程序,读一行就调一次map方法,map调用context.write把处理结果写出去,保存到本机的一个结果文件,这个文件中的内容是分区且有序的。

分区的作用就是定义哪些key在一组,一个分区对应一个reducer。

7个实例全面掌握Hadoop MapReduce

(5)启动reducetask

Maptask都运行完成后,appmaster再启动reducetask,maptask的结果中有几个分区就启动几个reducetask。

7个实例全面掌握Hadoop MapReduce

(6)执行reducetask

reducetask去读取maptask的结果文件中自己对应的那个分区数据,例如reducetask_01去读第一个分区中的数据。

reducetask把读到的数据按key组织好,传给reduce方法进行处理,处理结果写到指定的输出路径。

7个实例全面掌握Hadoop MapReduce

四、实例1:自定义对象序列化

1 、需求与实现思路

(1)需求

需要统计手机用户流量日志,日志内容实例:

7个实例全面掌握Hadoop MapReduce

要把同一个用户的上行流量、下行流量进行累加,并计算出综合。

例如上面的13897230503有两条记录,就要对这两条记录进行累加,计算总和,得到:

13897230503,500,1600,2100

(2)实现思路

  • map

接收日志的一行数据,key为行的偏移量,value为此行数据。

输出时,应以手机号为key,value应为一个整体,包括:上行流量、下行流量、总流量。

手机号是字符串类型Text,而这个整体不能用基本数据类型表示,需要我们自定义一个bean对象,并且要实现可序列化。

key: 13897230503

value: < upFlow:100, dFlow:300, sumFlow:400 >

  • reduce

接收一个手机号标识的key,及这个手机号对应的bean对象集合。

例如:

key:

13897230503

value:

< upFlow:400, dFlow:1300, sumFlow:1700 >,

< upFlow:100, dFlow:300, sumFlow:400 >

迭代bean对象集合,累加各项,形成一个新的bean对象,例如:

< upFlow:400+100, dFlow:1300+300, sumFlow:1700+400 >

最后输出:

key: 13897230503

value: < upFlow:500, dFlow:1600, sumFlow:2100 >

2 、代码实践

(1)创建项目

新建项目目录serializebean,其中新建文件pom.xml,内容:

7个实例全面掌握Hadoop MapReduce

然后创建源码目录src/main/java

现在项目目录的文件结构

7个实例全面掌握Hadoop MapReduce

(2)代码

自定义bean:src/main/java/FlowBean

7个实例全面掌握Hadoop MapReduce

MapReduce程序:src/main/java/FlowCount

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

(3)编译打包

在pom.xml所在目录下执行打包命令:

mvn package

执行完成后,会自动生成target目录,其中有打包好的jar文件。

现在项目文件结构:

7个实例全面掌握Hadoop MapReduce

(4)运行

先把target中的jar上传到Hadoop服务器,然后下载测试数据文件:

链接: https://pan.baidu.com/s/1skTABlr

密码:tjwy

上传到HDFS

hdfs dfs -mkdir -p /flowcount/input

hdfs dfs -put flowdata.log /flowcount/input

运行

hadoop jar mapreduce-serializebean-0.0.1-SNAPSHOT.jar FlowCount

/flowcount/input /flowcount/output2

检查

hdfs dfs -cat /flowcount/output/*

五、实例2:自定义分区

1 、需求与实现思路

(1)需求

还是以上个例子的手机用户流量日志为例:

7个实例全面掌握Hadoop MapReduce

在上个例子的统计需要基础上添加一个新需求:按省份统计,不同省份的手机号放到不同的文件里。

例如137表示属于河北,138属于河南,那么在结果输出时,他们分别在不同的文件中。

(2)实现思路

map和reduce的处理思路与上例相同,这里需要多做2步:

  • 自定义一个分区器Partitioner

根据手机号判断属于哪个分区。 有几个分区就有几个reducetask,每个reducetask输出一个文件,那么,不同分区中的数据就写入了不同的结果文件中。

7个实例全面掌握Hadoop MapReduce

  • 在main程序中指定使用我们自定义的Partitioner即可

2 、代码实践

(1)创建项目

新建项目目录custom_partion,其中新建文件pom.xml,内容:

7个实例全面掌握Hadoop MapReduce

然后创建源码目录src/main/java

现在项目目录的文件结构

7个实例全面掌握Hadoop MapReduce

(2)代码

自定义bean:src/main/java/FlowBean.java

7个实例全面掌握Hadoop MapReduce

自定义分区器:src/main/java/ProvincePartitioner.java

7个实例全面掌握Hadoop MapReduce

这段代码是本示例的重点,其中定义了一个hashmap,假设其是一个数据库,定义了手机号和分区的关系。

getPartition取得手机号的前缀,到数据库中获取区号,如果没在数据库中,就指定其为“其它分区”(用4代表)

MapReduce程序:src/main/java/FlowCount.java

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

main程序中指定了使用自定义的分区器

job.setPartitionerClass(ProvincePartitioner.class);

(3)编译打包

在pom.xml所在目录下执行打包命令:

mvn package

执行完成后,会自动生成target目录,其中有打包好的jar文件

现在项目文件结构

7个实例全面掌握Hadoop MapReduce

(4)运行

先把target中的jar上传到Hadoop服务器

运行

hadoop jar mapreduce-custompartion-0.0.1-SNAPSHOT.jar FlowCount

/flowcount/input /flowcount/output-part

检查

hdfs dfs -ls /flowcount/output-part

六、实例3:计算出每组订单中金额最大的记录

1、需求与实现思路

(1)需求

有如下订单数据:

7个实例全面掌握Hadoop MapReduce

需要求出每一个订单中成交金额最大的一笔交易。

(2)实现思路

先介绍一个概念GroupingComparator组比较器,通过WordCount来理解它的作用。

WordCount中map处理完成后的结果数据是这样的:

<good,1>

<good,1>

<good,1>

<is,1>

<is,1>

Reducer会把这些数据都读进来,然后进行分组,把key相同的放在一组,形成这样的形式:

<good, [1,1,1]>

<is, [1,1]>

然后对每一组数据调用一次reduce( key, Iterable, ...)方法。

其中分组的操作就需要用到GroupingComparator,对key进行比较,相同的放在一组。

注: 上例中的Partitioner是属于mapDuang的,GroupingComparator是属于reduce端的。

下面看整体实现思路。

1)定义一个订单bean

属性包括:订单号、金额

{ itemid, amount }

要实现可序列化,与比较方法compareTo,比较规则:订单号不同的,按照订单好比较,相同的,按照金额比较。

2)定义一个Partitioner

根据订单号的hashcode分区,可以保证订单号相同的在同一个分区,以便reduce中接收到同一个订单的全部记录。

同分区的数据是序的,这就用到了bean中的比较方法,可以让订单号相同的记录按照金额从大到小排序。

在map方法中输出数据时,key就是bean,value为null。

map的结果数据形式例如:

7个实例全面掌握Hadoop MapReduce

3)定义一个GroupingComparator

因为map的结果数据中key是bean,不是普通数据类型,所以需要使用自定义的比较器来分组,就使用bean中的订单号来比较。

例如读取到分区1的数据:

<{ Order_0000001   222.8 }, null>,

<{ Order_0000001   25.8 }, null>,

<{ Order_0000003   222.8 }, null>

进行比较,前两条数据的订单号相同,放入一组,默认是以第一条记录的key作为这组记录的key。

分组后的形式如下:

<{ Order_0000001 222.8 }, [null, null]>,

<{ Order_0000003 222.8 }, [null]>

在reduce方法中收到的每组记录的key就是我们最终想要的结果,所以直接输出到文件就可以了。

7个实例全面掌握Hadoop MapReduce

2 、代码实践

(1)创建项目

新建项目目录groupcomparator,其中新建文件pom.xml,内容:

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

然后创建源码目录src/main/java

现在项目目录的文件结构

7个实例全面掌握Hadoop MapReduce

(2)代码

**自定义bean:** src/main/java/OrderBean.java

7个实例全面掌握Hadoop MapReduce 7个实例全面掌握Hadoop MapReduce

自定义分区器:src/main/java/ItemIdPartitioner.java

7个实例全面掌握Hadoop MapReduce

自定义比较器:src/main/java/MyGroupingComparator.java

7个实例全面掌握Hadoop MapReduce

MapReduce程序:src/main/java/GroupSort.java

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

(3)编译打包

在pom.xml所在目录下执行打包命令:

mvn package

执行完成后,会自动生成target目录,其中有打包好的jar文件

现在项目文件结构

7个实例全面掌握Hadoop MapReduce

(4)运行

先把target中的jar上传到Hadoop服务器

下载测试数据文件

链接:https://pan.baidu.com/s/1pKKlvh5

密码: 43xa

上传到HDFS

hdfs dfs -put orders.txt /

运行

hadoop jar mapreduce-groupcomparator-0.0.1-SNAPSHOT.jar GroupSo

rt /orders.txt /outputOrders

检查

hdfs dfs -ls /outputOrders

hdfs dfs -cat /outputOrders/*

七、实例4:合并多个小文件

1 、需求与实现思路

(1)需求

要计算的目标文件中有大量的小文件,会造成分配任务和资源的开销比实际的计算开销还打,这就产生了效率损耗。

需要先把一些小文件合并成一个大文件。

(2)实现思路

文件的读取由map负责,在前面的示意图中可以看到一个inputformat用来读取文件,然后以key value形式传递给map方法。

我们要自定义文件的读取过程,就需要了解其细节流程:

7个实例全面掌握Hadoop MapReduce

所以我们需要自定义一个inputformat和RecordReader。

Inputformat使用我们自己的RecordReader,RecordReader负责实现一次读取一个完整文件封装为key value。

map接收到文件内容,然后以文件名为key,以文件内容为value,向外输出的格式要注意,要使用SequenceFileOutPutFormat(用来输出对象)。

因为reduce收到的key value都是对象,不是普通的文本,reduce默认的输出格式是TextOutputFormat,使用它的话,最终输出的内容就是对象ID,所以要使用SequenceFileOutPutFormat进行输出。

2 、代码实践

(1)创建项目inputformat,其中新建文件pom.xml,内容:

7个实例全面掌握Hadoop MapReduce

然后创建源码目录src/main/java

现在项目目录文件结构

7个实例全面掌握Hadoop MapReduce

(2)代码

自定义inputform:src/main/java/MyInputFormat.java

7个实例全面掌握Hadoop MapReduce

createRecordReader方法中创建一个自定义的reader

自定义reader:src/main/java/MyRecordReader.java

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

其中有3个核心方法:nextKeyValue、getCurrentKey、getCurrentValue。

nextKeyValue负责生成要传递给map方法的key和value。getCurrentKey、getCurrentValue是实际获取key和value的。所以RecordReader的核心机制就是:通过nextKeyValue生成key value,然后通过getCurrentKey和getCurrentValue来返回上面构造好的key value。这里的nextKeyValue负责把整个文件内容作为value。

MapReduce 程序: src/main/java/ManyToOne.java

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

main程序中指定使用我们自定义的MyInputFormat,输出使用SequenceFileOutputFormat。

(3)编译打包

在pom.xml所在目录下执行打包命令:

mvn package

执行完成后,会自动生成target目录,其中有打包好的jar文件。

现在项目文件结构

7个实例全面掌握Hadoop MapReduce

(4)运行

先把target中的jar上传到Hadoop服务器。

准备测试文件,把Hadoop目录中的配置文件上传到HDFS

hdfs dfs -mkdir /files

hdfs dfs -put $HADOOP_HOME/etc/hadoop/*.xml /files

运行

hadoop jar mapreduce-inputformat-0.0.1-SNAPSHOT.jar ManyToOne /

files /onefile

检查

hdfs dfs -ls /onefile

八、实例5:分组输出到多个文件

1 、需求与实现思路

(1)需求

7个实例全面掌握Hadoop MapReduce

需要把相同订单id的记录放在一个文件中,并以订单id命名。

(2)实现思路

这个需求可以直接使用MultipleOutputs这个类来实现。

默认情况下,每个reducer写入一个文件,文件名由分区号命名,例如'part-r- 00000',而 MultipleOutputs可以用key作为文件名,例如‘Order_0000001-r-00000’。

所以,思路就是map中处理每条记录,以‘订单id’为key,reduce中使用MultipleOutputs进行输出,会自动以key为文件名,文件内容就是相同key的所有记录。

例如‘Order_0000001-r-00000’的内容就是:

Order_0000001,Pdt_05,25.8

Order_0000001,Pdt_01,222.8

2 、代码实践

(1)创建项目

新建项目目录multioutput,其中新建文件pom.xml,内容:

7个实例全面掌握Hadoop MapReduce

然后创建源码目录src/main/java

现在项目目录的文件结构

7个实例全面掌握Hadoop MapReduce

(2)代码

MapReduce程序:src/main/java/MultipleOutputTest.java

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

(3)编译打包

在pom.xml所在目录下执行打包命令:

mvn package

执行完成后,会自动生成target目录,其中有打包好的jar文件。

现在项目文件结构

7个实例全面掌握Hadoop MapReduce

(4)运行

先把target中的jar上传到Hadoop服务器

然后运行

hadoop jar mapreduce-multipleOutput-0.0.1-SNAPSHOT.jar Multiple

OutputTest /orders.txt /output-multi

检查

hdfs dfs -ls /output-multi

九、MapReduce核心流程梳理

我们已经了解了MapReduce的大概流程:

(1)maptask从目标文件中读取数据

(2)mapper的map方法处理每一条数据,输出到文件中

(3)reducer读取map的结果文件,进行分组,把每一组交给reduce方法进行处理,最后输出到指定路径。

7个实例全面掌握Hadoop MapReduce

这是最基本的流程,有助于快速理解MapReduce的工作方式。

通过上面的几个示例,我们要经接触了一些更深入的细节,例如mapper的inputform中还有RecordReader、reducer中还有GroupingComparator。

下面就看一下更加深入的处理流程。

1、Maptask 中的处理流程

(1)读文件流程

7个实例全面掌握Hadoop MapReduce

目标文件会被按照规划文件进行切分,inputformat调用RecordReader读取文件切片,RecordReader会生成key value对儿,传递给Mapper的mao方法。

(2)写入结果文件的流程

从Mapper的map方法调用context.write之后,到形成结果数据文件这个过程是比较复杂的。

7个实例全面掌握Hadoop MapReduce

context.write不是直接写入文件,而是把数据交给OutputCollector,OutputCollector把数据写入‘环形缓冲区’。‘环形缓冲区’中的数据会进行排序。

因为缓冲区的大小是有限制的,所以每当快满时(达到80%)就要把其中的数据写出去,这个过程叫做数据溢出。

溢出到一个文件中,溢出过程会对这批数据进行分组、比较操作,然后吸入文件,所以溢出文件中的数据是分好区的,并且是有序的。每次溢出都会产生一个溢出数据文件,所以会有多个。

当map处理完全数据后,就会对各个溢出数据文件进行合并,每个文件中相同区的数据放在一起,并再次排序,最后得到一个整体的结果文件,其中是分区且有序的。

这样就完成了map过程,读数据过程和写结果文件的过程联合起来如下图:

7个实例全面掌握Hadoop MapReduce

2、Reducetask 的处理流程

7个实例全面掌握Hadoop MapReduce

reducetask去读每个maptask产生的结果文件中自己所负责的分区数据,读到自己本地。对多个数据文件进行合并排序,然后通过GroupingComparator进行分组,把相同key的数据放到一组。对每组数据调一次reduce方法,处理完成后写入目标路径文件。

3、整体流程

把map和reduce的过程联合起来:

7个实例全面掌握Hadoop MapReduce

十、实例6:join操作

1、需求与实现思路

(1)需求

有2个数据文件:订单数据、商品信息。

订单数据表order

7个实例全面掌握Hadoop MapReduce

商品信息表product

7个实例全面掌握Hadoop MapReduce

需要用MapReduce程序来实现下面这个 SQL 查询运算:

select o.id order_id, o.date, o.amount, p.id p_id, p.pname, p.c

ategory_id, p.price

from t_order o join t_product p on o.pid = p.id

(2)实现思路

SQL的执行结果是这样的:

7个实例全面掌握Hadoop MapReduce

实际上就是给每条订单记录补充上商品表中的信息。

实现思路:

1)定义bean

把SQL执行结果中的各列封装成一个bean对象,实现序列化。

bean中还要有一个另外的属性flag,用来标识此对象的数据是订单还是商品。

2)map处理

map会处理两个文件中的数据,根据文件名可以知道当前这条数据是订单还是商品。

对每条数据创建一个bean对象,设置对应的属性,并标识flag(0代表order,1代表product)

以join的关联项“productid”为key,bean为value进行输出。

3)reduce处理

reduce方法接收到pid相同的一组bean对象。

遍历bean对象集合,如果bean是订单数据,就放入一个新的订单集合中,如果是商品数据,就保存到一个商品bean中。然后遍历那个新的订单集合,使用商品bean的数据对每个订单bean进行信息补全。

这样就得到了完整的订单及其商品信息。

2 、代码实践

(1)创建项目

新建项目目录jointest,其中新建文件pom.xml,内容:

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

然后创建源码目录src/main/java

现在项目目录的文件结构

7个实例全面掌握Hadoop MapReduce

(2)代码

**封装bean:** src/main/java/InfoBean.java

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

MapReduce程序:src/main/java/JoinMR.java

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

(3)编译打包

在pom.xml所在目录下执行打包命令:

mvn package

执行完成后,会自动生成target目录,其中有打包好的jar文件。

现在项目文件结构

7个实例全面掌握Hadoop MapReduce

(4)运行

先把target中的jar上传到Hadoop服务器

下载产品和订单的测试数据文件

链接: https://pan.baidu.com/s/1pLRnm47

密码: cg7x

链接: https://pan.baidu.com/s/1pLrvsfT

密码: j2zb

上传到HDFS

hdfs dfs -mkdir -p /jointest/input

hdfs dfs -put order.txt /jointest/input

hdfs dfs -put product.txt /jointest/input

运行

hadoop jar joinmr.jar com.dys.mapreducetest.join.JoinMR /jointe

st/input /jointest/output

检查

hdfs dfs -cat /jointest/output/*

十一、实例7:计算出用户间的共同好友

1、需求与实现思路

(1)需求

下面是用户的好友关系列表,每一行代表一个用户和他的好友列表。

7个实例全面掌握Hadoop MapReduce

需要求出哪些人两两之间有共同好友,及他俩的共同好友都有谁。

例如从前2天记录中可以看出,C、E是A、B的共同好友,最终的形式如下:

7个实例全面掌握Hadoop MapReduce

(2)实现思路

之前的示例中都是一个MapReduce计算出来的,这里我们使用2个MapReduce来实现。

1)第1个MapReduce

  • map

找出每个用户都是谁的好友,例如:

读一行A:B,C,D,F,E,O(A的好友有这些,反过来拆开,这些人中的每一个都是A的好友)

输出<B,A> <C,A> <D,A> <F,A> <E,A> <O,A>

再读一行B:A,C,E,K

输出<A,B> <C,B> <E,B> <K,B>

……

  • reduce

key相同的会分到一组,例如:

<C,A><C,B><C,E><C,F><C,G>......

Key:C

value: [ A, B, E, F, G ]

意义是:C是这些用户的好友。

遍历value就可以得到:

A B 有共同好友C

A E 有共同好友C

...

B E有共同好友 C

B F有共同好友 C

输出:

<A-B,C>

<A-E,C>

<A-F,C>

<A-G,C>

<B-E,C>

<B-F,C>

.....

2)第2个MapReduce

对上一步的输出结果进行计算。

  • map

读出上一步的结果数据,组织成key value直接输出

例如:

读入一行<A-B,C>

直接输出<A-B,C>

  • reduce

读入数据,key相同的在一组

<A-B,C><A-B,F><A-B,G>......

输出:

A-B C,F,G,.....

这样就得出了两个用户间的共同好友列表

2 、代码实践

(1)创建项目

新建项目目录jointest,其中新建文件pom.xml,内容:

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

然后创建源码目录src/main/java

现在项目目录的文件结构

7个实例全面掌握Hadoop MapReduce

(2)代码

第一步的MapReduce程序:src/main/java/StepFirst.java

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

第二步的MapReduce程序:src/main/java/StepSecond.java

7个实例全面掌握Hadoop MapReduce

7个实例全面掌握Hadoop MapReduce

(3)编译打包

在pom.xml所在目录下执行打包命令:

mvn package

执行完成后,会自动生成target目录,其中有打包好的jar文件。

现在项目文件结构

7个实例全面掌握Hadoop MapReduce

(4)运行

先把target中的jar上传到Hadoop服务器

下载测试数据文件

链接: https://pan.baidu.com/s/1o8fmfbG

密码: kbut

上传到HDFS

hdfs dfs -mkdir -p /friends/input

hdfs dfs -put friendsdata.txt /friends/input

运行第一步

hadoop jar mapreduce-friends-0.0.1-SNAPSHOT.jar StepFirst /frie

nds/input/friendsdata.txt /friends/output01

运行第二步

hadoop jar mapreduce-friends-0.0.1-SNAPSHOT.jar StepSecond /fri

ends/output01/part-r-00000 /friends/output02

查看结果

hdfs dfs -ls /friends/output02hdfs dfs -cat /friends/output02/*

十二、小结

MapReduce的基础内容介绍完了,希望可以帮助您快速熟悉MapReduce的工作原理和开发方法。如有批评与建议(例如内容有误、不足的地方、改进建议等),欢迎留言讨论。

提示: 如需下载本文,点击文末【阅读原文】或登录云盘 http://pan.baidu.com/s/1bpxSCZt进行下载。


以上所述就是小编给大家介绍的《7个实例全面掌握Hadoop MapReduce》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Music Recommendation and Discovery

Music Recommendation and Discovery

Òscar Celma / Springer / 2010-9-7 / USD 49.95

With so much more music available these days, traditional ways of finding music have diminished. Today radio shows are often programmed by large corporations that create playlists drawn from a limited......一起来看看 《Music Recommendation and Discovery》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

URL 编码/解码
URL 编码/解码

URL 编码/解码

SHA 加密
SHA 加密

SHA 加密工具