`

Hadoop Streaming 编程

 
阅读更多

1、概述

Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:

采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer)

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

-input myInputDirs \

-output myOutputDir \

-mapper cat \

-reducer wc

本文安排如下,第二节介绍Hadoop Streaming的原理,第三节介绍Hadoop Streaming的使用方法,第四节介绍Hadoop Streaming的程序编写方法,在这一节中,用C++、C、shell脚本 和python实现了WordCount作业,第五节总结了常见的问题。文章最后给出了程序下载地址 。(本文内容基于Hadoop-0.20.2版本 )

(注:如果你采用的语言为C或者C++,也可以使用Hadoop Pipes,具体可参考这篇文章:Hadoop Pipes编程。 )

关于Hadoop Streaming高级编程方法,可参考这篇文章:Hadoop Streaming高级编程

2、Hadoop Streaming原理

mapper和reducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming工具会创建MapReduce作业,发送给各个tasktracker,同时监控整个作业的执行过程。

如果一个文件(可执行或者脚本)作为mapper,mapper初始化时,每一个mapper任务会把该文件作为一个单独进程启动,mapper任 务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。 默认情况下,一行中第一个tab之前的部分作为key ,之后的(不包括tab)作为value 如果没有tab,整行作为key值,value值为null。

对于reducer,类似。

以上是Map/Reduce框架和streaming mapper/reducer之间的基本通信协议。

3、Hadoop Streaming用法

Usage: $HADOOP_HOME/bin/hadoop jar \

$HADOOP_HOME/hadoop-streaming.jar [options]

options:

(1)-input:输入文件路径

(2)-output:输出文件路径

(3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本

(4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本

(5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。

(6)-partitioner:用户自定义的partitioner程序

(7)-combiner:用户自定义的combiner程序(必须用java实现)

(8)-D:作业的一些属性(以前用的是-jonconf),具体有:

             1)mapred.map.tasks:map task数目
             2)mapred.reduce.tasks:reduce task数目
             3)stream.map.input.field.separator/stream.map.output.field.separator: map task输入/输出数
据的分隔符,默认均为\t。
             4)stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
             5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task输入/输出数据的分隔符,默认均为\t。
             6)stream.num.reduce.output.key.fields:指定reduce task输出记录中key所占的域数目
另外,Hadoop本身还自带一些好用的Mapper和Reducer:
(1)    Hadoop聚集功能

Aggregate提供一个特殊的reducer类和一个特殊的combiner类,并且有一系列的“聚合器”(例如 “sum”,“max”,“min”等)用于聚合一组value的序列。用户可以使用Aggregate定义一个mapper插件类,这个类用于为 mapper输入的每个key/value对产生“可聚合项”。Combiner/reducer利用适当的聚合器聚合这些可聚合项。要使用 Aggregate,只需指定“-reducer aggregate”。

(2)字段的选取(类似于Unix中的‘cut’)

Hadoop的工具类org.apache.hadoop.mapred.lib.FieldSelectionMapReduc帮助用户高效处理 文本数据,就像unix中的“cut”工具。工具类中的map函数把输入的key/value对看作字段的列表。 用户可以指定字段的分隔符(默认是tab),可以选择字段列表中任意一段(由列表中一个或多个字段组成)作为map输出的key或者value。 同样,工具类中的reduce函数也把输入的key/value对看作字段的列表,用户可以选取任意一段作为reduce输出的key或value。

4、Mapper和Reducer实现

本节试图用尽可能多的语言编写Mapper和Reducer,包括Java,C,C++,Shell脚本,python等。

由于Hadoop会自动解析数据文件到Mapper或者Reducer的标准输入中,以供它们读取使用,所有应先了解各个语言获取标准输入的方法。

(1)    Java语言:

见Hadoop自带例子

(2)    C++语言

1
2
3
4
string key;
while (cin>>key){
cin>>value;
  ….

(3)  C语言

1
2
3
4
5
char buffer[BUF_SIZE];
while ( fgets (buffer, BUF_SIZE - 1, stdin)){
int len = strlen (buffer);
}

(4)  Shell脚本

用管道

(5)  Python脚本

1
2
3
import  sys
for  line  in  sys.stdin:
.......

为了说明各种语言编写Hadoop Streaming程序的方法,下面以WordCount为例,WordCount作业的主要功能是对用户输入的数据中所有字符串进行计数。

(1)C语言实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
//mapper
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
 
#define BUF_SIZE        2048
#define DELIM   "\n"
 
int main( int argc, char *argv[]){
      char buffer[BUF_SIZE];
      while ( fgets (buffer, BUF_SIZE - 1, stdin)){
             int len = strlen (buffer);
             if (buffer[len-1] == '\n' )
              buffer[len-1] = 0;
 
             char *querys  = index(buffer, ' ' );
             char *query = NULL;
             if (querys == NULL) continue ;
             querys += 1; /*  not to include '\t' */
 
             query = strtok (buffer, " " );
             while (query){
                    printf ( "%s\t1\n" , query);
                    query = strtok (NULL, " " );
             }
      }
      return 0;
}
//---------------------------------------------------------------------------------------
//reducer
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
 
#define BUFFER_SIZE     1024
#define DELIM   "\t"
 
int main( int argc, char *argv[]){
  char strLastKey[BUFFER_SIZE];
  char strLine[BUFFER_SIZE];
  int count = 0;
 
  *strLastKey = '\0' ;
  *strLine = '\0' ;
 
  while ( fgets (strLine, BUFFER_SIZE - 1, stdin) ){
  char *strCurrKey = NULL;
  char *strCurrNum = NULL;
 
  strCurrKey  = strtok (strLine, DELIM);
  strCurrNum = strtok (NULL, DELIM); /* necessary to check error but.... */
 
  if ( strLastKey[0] == '\0' ){
  strcpy (strLastKey, strCurrKey);
  }
 
  if ( strcmp (strCurrKey, strLastKey)){
  printf ( "%s\t%d\n" , strLastKey, count);
  count = atoi (strCurrNum);
  } else {
  count += atoi (strCurrNum);
  }
  strcpy (strLastKey, strCurrKey);
 
  }
  printf ( "%s\t%d\n" , strLastKey, count); /* flush the count */
  return 0;
}

(2)C++语言实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//mapper
#include <stdio.h>
#include <string>
#include <iostream>
using namespace std;
 
int main(){
         string key;
         string value = "1" ;
         while (cin>>key){
                 cout<<key<< "\t" <<value<<endl;
         }
         return 0;
}
//------------------------------------------------------------------------------------------------------------
//reducer
#include <string>
#include <map>
#include <iostream>
#include <iterator>
using namespace std;
int main(){
         string key;
         string value;
         map<string, int > word2count;
         map<string, int >::iterator it;
         while (cin>>key){
                 cin>>value;
                 it = word2count.find(key);
                 if (it != word2count.end()){
                         (it->second)++;
                 }
                 else {
                         word2count.insert(make_pair(key, 1));
                 }
         }
 
         for (it = word2count.begin(); it != word2count.end(); ++it){
                 cout<<it->first<< "\t" <<it->second<<endl;
         }
         return 0;
}

(3)shell脚本语言实现

1
2
3
4
5
$HADOOP_HOME /bin/hadoop   jar $HADOOP_HOME /hadoop-streaming .jar \
     -input myInputDirs \
     -output myOutputDir \
     -mapper cat \
    -reducer  wc

(4)Python脚本语言实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#!/usr/bin/env python
 
import sys
 
# maps words to their counts
word2count = {}
 
# input comes from STDIN (standard input)
for line in sys.stdin:
     # remove leading and trailing whitespace
     line = line.strip()
     # split the line into words while removing any empty strings
     words = filter ( lambda word: word, line.split())
     # increase counters
     for word in words:
         # write the results to STDOUT (standard output);
         # what we output here will be the input for the
         # Reduce step, i.e. the input for reducer.py
         #
         # tab-delimited; the trivial word count is 1
         print '%s\t%s' % (word, 1 )
#---------------------------------------------------------------------------------------------------------
#!/usr/bin/env python
 
from operator import itemgetter
import sys
 
# maps words to their counts
word2count = {}
 
# input comes from STDIN
for line in sys.stdin:
     # remove leading and trailing whitespace
     line = line.strip()
 
     # parse the input we got from mapper.py
     word, count = line.split()
     # convert count (currently a string) to int
     try :
         count = int (count)
         word2count[word] = word2count.get(word, 0 ) + count
     except ValueError:
         # count was not a number, so silently
         # ignore/discard this line
         pass
 
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted (word2count.items(), key = itemgetter( 0 ))
 
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
     print '%s\t%s' % (word, count)

5、常见问题

(1)作业总是运行失败:

需要把mapper文件和reducer文件放到各个tasktracker上,保证各个节点均有一份。也可在提交作业时,采用-file选项指定这些文件。

(2)用脚本编写时,第一行需注明脚本解释器,默认是shell

6、参考资料

【1】C++&Python实现Hadoop Streaming的paritioner和模块化

【2】如何在Hadoop中使用Streaming编写MapReduce

【3】Hadoop如何与C++结合

【4】Hadoop Streaming和pipes理解

7、程序打包下载

文章中用到的程序源代码可在此处下载

转载自董的博客

本文链接地址: http://dongxicheng.org/mapreduce/hadoop-streaming-programming/

分享到:
评论

相关推荐

    ( Hadoop Streaming编程实战(C++、PHP、Python).pdf )

    ( Hadoop Streaming编程实战(C++、PHP、Python).pdf ) ( Hadoop Streaming编程实战(C++、PHP、Python).pdf )

    HadoopStreaming编程.doc

    HadoopStreaming编程.doc

    Hadoop streaming详细介绍

    Hadoop为MapReduce提供了不同的API,可以方便我们使用不同的编程语言来使用MapReduce框架,而不是只局限于Java。这里要介绍的就是Hadoop streaming API。Hadoop streaming 使用Unix的standard streams作为我们...

    用python + hadoop streaming 分布式编程(一) — 原理介绍,样例程序与本地调试

    Google为自己的业务需要提出了编程模型MapReduce和分布式文件系统Google File System,并发布了相关论文(可在Google Research的网站上获得: GFS 、 MapReduce)。 Doug Cutting和Mike Cafarella在开发搜索引擎...

    大数据编程期末大作业-Hadoop

    大数据编程期末大作业 一、Hadoop基础操作 二、RDD编程 三、SparkSQL编程 四、SparkStreaming编程

    用python+hadoopstreaming编写分布式程序

    Google为自己的业务需要提出了编程模型MapReduce和分布式文件系统GoogleFileSystem,并发布了相关论文(可在GoogleResearch的网站上获得:GFS、MapReduce)。DougCutting和MikeCafarella在开发搜索引擎Nutch时对这两...

    实验七:Spark初级编程实践

    1、实验环境: 设备名称 LAPTOP-9KJS8HO6 处理器 Intel(R) Core(TM) i5-10300H CPU @ 2.50GHz 2.50 GHz 机带 RAM 16.0 GB (15.8 GB 可用) ...(2) 在spark-shell中读取HDFS系统文件“/user/hadoop/test.txt”

    Hadoop技术内幕:深入解析MapReduce架构设计与实现原理

    MapReduce编程模型3.1 MapReduce编程模型概述3.1.1 MapReduce编程接口体系结构3.1.2 新旧MapReduce API比较3.2 MapReduce API基本概念3.2.1 序列化3.2.2 Reporter参数3.2.3 回调机制3.3 Java API解析3.3.1 ...

    Hadoop实战中文版

    4.4 适应Hadoop API 的改变 4.5 Hadoop 的Streaming 4.5.1 通过Unix命令使用Streaming 4.5.2 通过脚本使用Streaming 4.5.3 用Streaming处理键/值对 4.5.4 通过Aggregate包使用Streaming 4.6 使用combiner 提升...

    大数据平台与编程实践实验报告

    大数据平台与编程实践实验报告,共八个实验报告 1.Linux系统的基本使用 2.hdfs shell基本命令操作 3.MapReduce的基本使用 4.Hbase的基本使用 5.Hive的基本使用 6.Spark的基本使用 7.Sparksql的基本使用 8.Spark...

    Hadoop实战中文版.PDF

    目录编辑第一部分 Hadoop——一种分布式编程框架第1章 Hadoop简介 21.1 为什么写《Hadoop 实战》 31.2 什么是Hadoop 31.3 了解分布式系统和Hadoop 41.4 比较SQL数据库和Hadoop 51.5 理解MapReduce 61.5...

    Hadoop实战(陆嘉恒)译

    实战第4 章 编写MapReduce基础程序4.1 获得专利数据集4.1.1 专利引用数据4.1.2 专利描述数据4.2 构建MapReduce 程序的基础模板4.3 计数4.4 适应Hadoop API 的改变4.5 Hadoop 的Streaming4.5.1 通过Unix命令使用...

    Hadoop实战

    554.3 计数 604.4 适应Hadoop API的改变 644.5 Hadoop的Streaming 674.5.1 通过Unix命令使用Streaming 684.5.2 通过脚本使用Streaming 694.5.3 用Streaming处理键/值对 724.5.4 通过Aggregate包使用Streaming 754.6 ...

    7.SparkStreaming(上)--SparkStreaming原理介绍.pdf

    2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--IDEA搭建及实战.pdf 4.Spark运行架构.pdf 5.Hive(上)--Hive介绍及部署.pdf 5.Hive(下)-...

    projecthadoop:项目Hadoop

    但是他们在课程中主要使用/教授了使用Python Hadoop MapReduce作业(即使用Hadoop Streaming方法来运行作业)。 我已经为2种编程语言中的2个问题陈述(每个3个问题)开发了Hadoop MapReduce代码; Python和Java 。...

    7.SparkStreaming(下)--SparkStreaming实战.pdf

    1.Spark及其生态圈简介.pdf2.Spark编译与部署(上)--基础环境搭建.pdf2.Spark编译与部署(下)--Spark编译安装.pdf2.Spark编译与部署(中)--Hadoop编译安装.pdf3.Spark编程模型(上)--概念及SparkShell实战.pdf3....

Global site tag (gtag.js) - Google Analytics