Hadoop教程:MapReduce

栏目: 编程工具 · 发布时间: 5年前

内容简介:MapReduce是一个框架,我们可以使用它编写应用程序,以一种可靠的方式,并行地在大型商品硬件集群上处理大量数据。MapReduce是一种基于java的分布式计算处理技术和程序模型。MapReduce算法包含两个重要的任务,即Map和Reduce。Map接受一组数据并将其转换为另一组数据,其中单个元素被分解为元组(键/值对)。其次是reduce task,它将来自映射的输出作为输入,并将这些数据元组组合成较小的元组集合。顾名思义,reduce任务总是在映射作业之后执行。MapReduce的主要优点是,它

MapReduce是一个框架,我们可以使用它编写应用程序,以一种可靠的方式,并行地在大型商品硬件集群上处理大量数据。

MapReduce是什么

MapReduce是一种基于 java 的分布式计算处理技术和程序模型。MapReduce算法包含两个重要的任务,即Map和Reduce。Map接受一组数据并将其转换为另一组数据,其中单个元素被分解为元组(键/值对)。其次是reduce task,它将来自映射的输出作为输入,并将这些数据元组组合成较小的元组集合。顾名思义,reduce任务总是在映射作业之后执行。

MapReduce的主要优点是,它很容易在多个计算节点上扩展数据处理。在MapReduce模型下,数据处理原语称为映射器和约简器。将数据处理应用程序分解为映射器和还原器有时是很重要的。但是,一旦我们在MapReduce表单中编写了一个应用程序,将应用程序扩展到集群中的成百上千甚至上万台机器上,这仅仅是一个配置更改。正是这种简单的可伸缩性吸引了许多 程序员 使用MapReduce模型。

算法

  • 通常MapReduce范例是基于将计算机发送到数据所在的位置!
  • MapReduce程序分三个阶段执行,即map阶段、shuffle阶段和reduce阶段。
    • Map stage : 映射或映射程序的工作是处理输入数据。通常输入数据以文件或目录的形式存储在Hadoop文件系统(HDFS)中。输入文件逐行传递给mapper函数。映射器处理数据并创建几个小数据块。
    • Reduce stage : 这一阶段是 Shuffle 阶段和 Reduce 阶段的结合。 Reduce 的工作是处理来自mapper的数据。处理之后,它会生成一组新的输出,这些输出将存储在HDFS中。
  • 在MapReduce作业期间,Hadoop将映射和Reduce任务发送到集群中的适当服务器。
  • 该框架管理数据传递的所有细节,例如发出任务、验证任务完成以及在节点之间围绕集群复制数据。
  • 大多数计算发生在节点上,节点上的数据位于本地磁盘上,从而减少了网络流量。
  • 在完成给定的任务后,集群收集并减少数据,形成适当的结果,并将其发送回Hadoop服务器。
Hadoop教程:MapReduce

输入和输出(Java方面)

MapReduce框架对对进行操作,即框架将作业的输入视为一组对,并生成一组对作为作业的输出,可以想象为不同类型。

键和值类应该由框架序列化,因此需要实现可写接口。此外,关键类必须实现可写可比较的接口,以便按框架进行排序。MapReduce作业的输入输出类型:(Input) <k1, v1> -> map -> <k2, v2>-> reduce -> <k3, v3>(Output)

Input Output
Map <k1, v1> list (<k2, v2>)
Reduce <k2, list(v2)> list (<k3, v3>)

术语

  • PayLoad - 应用程序实现了映射和Reduce函数,构成了作业的核心。
  • Mapper - Mapper 将输入键/值对映射到一组中间键/值对。
  • NamedNode - 管理Hadoop分布式文件系统(HDFS)的节点。
  • DataNode - 存放数据的节点。
  • MasterNode - 作业跟踪程序运行的节点,它接受来自客户端的作业请求。
  • SlaveNode - 节点,Map和Reduce程序在此运行。
  • JobTracker - 计划作业并跟踪分配给Task tracker的作业。
  • Task Tracker - 跟踪任务并向JobTracker报告状态。
  • Job - 程序是Mapper 和Reducer 在数据集上的执行。
  • Task - 在数据片上执行Mapper 或Reducer 。
  • Task Attempt - 试图在SlaveNode上执行任务的特定实例。

示例场景

下面是关于一个组织的电力消耗的数据。它包含了每个月的用电量和不同年份的年平均用电量。

Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec Avg
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

如果以上述数据作为输入,我们必须编写应用程序来处理它,并产生诸如查找最大使用年、最小使用年等结果。这是一个针对记录数量有限的程序员的演练。它们将简单地编写逻辑来生成所需的输出,并将数据传递给所写的应用程序。

但是,想想一个州自形成以来所有大型工业的电力消耗数据。

当我们编写应用程序来处理这种大容量数据时,

  • 它们将花费大量的时间来执行。
  • 当我们将数据从源移动到网络服务器等时,将会有大量的网络流量。

为了解决这些问题,我们有 MapReduce 框架。

输入数据

以上数据保存为 sample.txt 作为输入。输入文件如下所示

程序实例

下面是使用MapReduce框架对示例数据的程序

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits 
{ 
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   { 
      
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      Reporter reporter) throws IOException 
      { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens())
            {
               lasttoken=s.nextToken();
            } 
            
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   } 
   
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements 
   Reducer< Text, IntWritable, Text, IntWritable > 
   {  
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
         OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException 
         { 
            int maxavg=30; 
            int val=Integer.MIN_VALUE; 
            
            while (values.hasNext()) 
            { 
               if((val=values.next().get())>maxavg) 
               { 
                  output.collect(key, new IntWritable(val)); 
               } 
            } 
 
         } 
   }  
   
   
   //Main function 
   public static void main(String args[])throws Exception 
   { 
      JobConf conf = new JobConf(ProcessUnits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
}

将上述程序保存为 Process .java 。下面将解释程序的编译和执行。

编译和执行

让我们假设我们在Hadoop用户的主目录中(例如/home/hadoop)。

按照下面给出的步骤编译和执行上述程序

Step 1

下面的命令是创建一个目录来存储编译后的java类。

$ mkdir units

Step 2

下载 Hadoop-core-1.2.1.jar ,它用于编译和执行MapReduce程序。请访问以下链接 http://mvnrepository.com/… 下载jar。让我们假设下载的文件夹是 /home/hadoop/

Step 3

以下命令用于编译 ProcessUnits.java 程序,并为该程序创建一个jar。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ .

Step 4

下面的命令用于在HDFS中创建一个输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 7

下面的命令用于通过从输入目录中获取输入文件来运行Eleunit_max应用程序。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

稍等片刻,直到执行该文件。执行后,如下图所示,输出将包含输入分割数、映射任务数、reducer任务数等。

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
File System Counters 
 
FILE: Number of bytes read=61 
FILE: Number of bytes written=279400 
FILE: Number of read operations=0 
FILE: Number of large read operations=0   
FILE: Number of write operations=0 
HDFS: Number of bytes read=546 
HDFS: Number of bytes written=40 
HDFS: Number of read operations=9 
HDFS: Number of large read operations=0 
HDFS: Number of write operations=2 Job Counters 


   Launched map tasks=2  
   Launched reduce tasks=1 
   Data-local map tasks=2  
   Total time spent by all maps in occupied slots (ms)=146137 
   Total time spent by all reduces in occupied slots (ms)=441   
   Total time spent by all map tasks (ms)=14613 
   Total time spent by all reduce tasks (ms)=44120 
   Total vcore-seconds taken by all map tasks=146137 
   
   Total vcore-seconds taken by all reduce tasks=44120 
   Total megabyte-seconds taken by all map tasks=149644288 
   Total megabyte-seconds taken by all reduce tasks=45178880 
   
Map-Reduce Framework 
 
Map input records=5  
   Map output records=5   
   Map output bytes=45  
   Map output materialized bytes=67  
   Input split bytes=208 
   Combine input records=5  
   Combine output records=5 
   Reduce input groups=5  
   Reduce shuffle bytes=6  
   Reduce input records=5  
   Reduce output records=5  
   Spilled Records=10  
   Shuffled Maps =2  
   Failed Shuffles=0  
   Merged Map outputs=2  
   GC time elapsed (ms)=948  
   CPU time spent (ms)=5160  
   Physical memory (bytes) snapshot=47749120  
   Virtual memory (bytes) snapshot=2899349504  
   Total committed heap usage (bytes)=277684224
     
File Output Format Counters 
 
   Bytes Written=40

Step 8

下面的命令用于验证输出文件夹中生成的文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9

下面的命令用于查看 Part-00000 文件中的输出。这个文件是由HDFS生成的。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

下面是MapReduce程序生成的输出

Step 10

下面的命令用于将输出文件夹从HDFS复制到本地文件系统进行分析。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop

重要的命令

所有Hadoop命令都由 $HADOOP_HOME/bin/hadoop 命令调用。在没有任何参数的情况下运行Hadoop脚本将打印所有命令的描述。

Usage: hadoop [–config confdir] COMMAND

下表列出了可用的选项及其描述。

Options Description
namenode -format Formats the DFS filesystem.
secondarynamenode Runs the DFS secondary namenode.
namenode Runs the DFS namenode.
datanode Runs a DFS datanode.
dfsadmin Runs a DFS admin client.
mradmin Runs a Map-Reduce admin client.
fsck Runs a DFS filesystem checking utility.
fs Runs a generic filesystem user client.
balancer Runs a cluster balancing utility.
oiv Applies the offline fsimage viewer to an fsimage.
fetchdt Fetches a delegation token from the NameNode.
jobtracker Runs the MapReduce job Tracker node.
pipes Runs a Pipes job.
tasktracker Runs a MapReduce task Tracker node.
historyserver Runs job history servers as a standalone daemon.
job Manipulates the MapReduce jobs.
queue Gets information regarding JobQueues.
version Prints the version.
jar Runs a jar file.
distcp Copies file or directories recursively.
distcp2 DistCp version 2.
archive -archiveName NAME -p Creates a hadoop archive.
*
classpath Prints the class path needed to get the Hadoop jar and the required libraries.
daemonlog Get/Set the log level for each daemon

如何与MapReduce作业交互

Usage: hadoop job [GENERIC_OPTIONS]

以下是Hadoop作业中可用的通用选项。

GENERIC_OPTIONS Description
-submit Submits the job.
-status Prints the map and reduce completion percentage and all job counters.
-counter Prints the counter value.
-kill Kills the job.
-events <fromevent-#> <#-of-events> Prints the events’ details received by jobtracker for the given range.
-history [all] - history < jobOutputDir> Prints job details, failed and killed tip details. More details about the job such as successful tasks and task attempts made for each task can be viewed by specifying the [all] option.
-list[all] Displays all jobs. -list displays only jobs which are yet to complete.
-kill-task Kills the task. Killed tasks are NOT counted against failed attempts.
-fail-task Fails the task. Failed tasks are counted against failed attempts.
-set-priority Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

查看工作状态

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004

查看作业输出目录的历史

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output

kill作业

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004

原文链接: https://www.tutorialspoint.com/…


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Mission Python

Mission Python

Sean McManus / No Starch Press / 2018-9-18 / GBP 24.99

Launch into coding with Mission Python, a space-themed guide to building a complete computer game in Python. You'll learn programming fundamentals like loops, strings, and lists as you build Escape!, ......一起来看看 《Mission Python》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试