需要3个输入文件的Python MapReduce Hadoop流作业?

罗比

我有3个小样本输入文件(实际文件更大),

# File Name: books.txt
# File Format: BookID|Title
1|The Hunger Games
2|To Kill a Mockingbird
3|Pride and Prejudice
4|Animal Farm

# File Name: ratings.txt
# File Format: ReaderID|BookID|Rating
101|1|1
102|2|2
103|3|3
104|4|4
105|1|5
106|2|1
107|3|2
108|4|3

# File Name: readers.txt
# File Format: ReaderID|Gender|PostCode|PreferComms
101|M|1000|email
102|F|1001|mobile
103|M|1002|email
104|F|1003|mobile
105|M|1004|email
106|F|1005|mobile
107|M|1006|email
108|F|1007|mobile

我想创建一个Python MapReduce Hadoop流作业,以获取以下输出,该输出是按性别划分的按标题划分的平均评分

Animal Farm F   3.5
Pride and Prejudice M   2.5
The Hunger Games    M   3
To Kill a Mockingbird   F   1.5

我搜索了这个论坛,有人指出了一个解决方案,但是它是2个输入文件而不是3个。我试了一下,但是卡在了mapper部分,因为我无法正确对其进行排序,因此reducer可以正确地识别出标题和性别的第一条记录,然后开始汇总。我下面的映射器代码

#!/usr/bin/env python
import sys
for line in sys.stdin:

    try:

        ReaderID = "-1"
        BookID = "-1"
        Title = "-1"
        Gender = "-1"
        Rating = "-1"

        line = line.strip()

        splits = line.split("|")

        if len(splits) == 2:
            BookID = splits[0]
            Title = splits[1]
        elif len(splits) == 3:
            ReaderID = splits[0]
            BookID = splits[1]
            Rating = splits[2]
        else:
            ReaderID = splits[0]
            Gender = splits[1]

        print('%s\t%s\t%s\t%s\t%s' % (BookID, Title, ReaderID, Rating, Gender))

    except:
        pass

PS:我只需要使用Python和Hadoop Streaming。不允许安装Python包,例如Dumbo,mrjob等。

提前感谢您的帮助。

谢谢,罗比

帕玛尼扬

经历了一些核心Java MR之后,所有建议都表明,这三个文件不能在单个映射作业中合并在一起。我们必须先加入前两个,然后将结果与第三个结合。对这三个应用您的逻辑并不能给我很好的结果。因此,我尝试了Pandas,它似乎给出了可喜的结果。如果使用pandas并不是您的限制,请尝试使用我的代码。否则,我们将尝试将这三个文件与Python词典和列表结合在一起。

这是我建议的代码。我刚刚连接了所有输入以对其进行测试。在您的代码中,只需注释我的for循环(第36行),然后取消注释您的for循环(第35行)。

import pandas as pd
import sys

input_string_book = [
"1|The Hunger Games",
"2|To Kill a Mockingbird",
"3|Pride and Prejudice",
"4|Animal Farm"]
input_string_book_df = pd.DataFrame(columns=('BookID','Title'))


input_string_rating = [
"101|1|1",
"102|2|2",
"103|3|3",
"104|4|4",
"105|1|5",
"106|2|1",
"107|3|2",
"108|4|3"]
input_string_rating_df = pd.DataFrame(columns=('ReaderID','BookID','Rating'))


input_string_reader = [
"101|M|1000|email",
"102|F|1001|mobile",
"103|M|1002|email",
"104|F|1003|mobile",
"105|M|1004|email",
"106|F|1005|mobile",
"107|M|1006|email",
"108|F|1007|mobile"]
input_string_reader_df = pd.DataFrame(columns=('ReaderID','Gender','PostCode','PreferComms'))

#for line in sys.stdin:
for line in input_string_book + input_string_rating + input_string_reader:
    try:

        line = line.strip()

        splits = line.split("|")

        if len(splits) == 2:
            input_string_book_df = input_string_book_df.append(pd.DataFrame([[splits[0],splits[1]]],columns=('BookID','Title')))
        elif len(splits) == 3:
            input_string_rating_df = input_string_rating_df.append(pd.DataFrame([[splits[0],splits[1],splits[2]]],columns=('ReaderID','BookID','Rating')))
        else:
            input_string_reader_df = input_string_reader_df.append(pd.DataFrame([[splits[0],splits[1],splits[2],splits[3]]]
            ,columns=('ReaderID','Gender','PostCode','PreferComms')))

    except:
        raise

l_concat_1 = input_string_book_df.merge(input_string_rating_df,on='BookID',how='inner')

l_concat_2 = l_concat_1.merge(input_string_reader_df,on='ReaderID',how='inner')

for each_iter in l_concat_2[['BookID', 'Title', 'ReaderID', 'Rating', 'Gender']].iterrows():
    print('%s\t%s\t%s\t%s\t%s' % (each_iter[1][0], each_iter[1][1], each_iter[1][2], each_iter[1][3], each_iter[1][4]))

输出

1       The Hunger Games        101     1       M
1       The Hunger Games        105     5       M
2       To Kill a Mockingbird   102     2       F
2       To Kill a Mockingbird   106     1       F
3       Pride and Prejudice     103     3       M
3       Pride and Prejudice     107     2       M
4       Animal Farm     104     4       F
4       Animal Farm     108     3       F

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

需要多个输入文件的Python MapReduce Hadoop流作业?

来自分类Dev

hadoop:无法运行mapreduce作业

来自分类Dev

具有HAR文件输入的MapReduce作业

来自分类Dev

Hadoop MapReduce读写序列文件

来自分类Dev

Hadoop MapReduce | python中的SMA

来自分类Dev

如何在使用Python的hadoop流作业中使用文件?

来自分类Dev

通过扩展Hadoop(一个MapReduce作业)写入多个输出

来自分类Dev

如何通过hadoop mapreduce作业访问Windows / unix目录中的文本文件

来自分类Dev

在不使用PuTTy / SSH的情况下通过Python启动Hadoop MapReduce作业

来自分类Dev

空指针异常-Hadoop Mapreduce作业

来自分类Dev

无法在hadoop 2.4.0上运行MapReduce作业

来自分类Dev

编写一个空的MapReduce作业

来自分类Dev

编写一个空的MapReduce作业

来自分类Dev

Mapreduce作业配置文件位置

来自分类Dev

无法执行基于Python的Hadoop流作业

来自分类Dev

Hadoop Mapreduce:自定义输入格式

来自分类Dev

Hadoop MapReduce:是否可以仅将一部分输入数据用作MR作业的输入?

来自分类Dev

Hadoop / MapReduce中的日志文件分析

来自分类Dev

Hadoop MapReduce处理来自HDFS的不同输入文件

来自分类Dev

如何不因一个数据库插入失败而使Hadoop MapReduce作业失败?

来自分类Dev

流命令失败!在CentOS7的单节点hadoop集群设置中执行MapReduce python代码时

来自分类Dev

MapReduce通过python使用hadoop流-将列表从mapper传递给reducer并将其作为列表读取

来自分类Dev

为什么在mapreduce作业中需要setMapOutputKeyClass方法

来自分类Dev

Hadoop MapReduce作业启动但找不到Map类?

来自分类Dev

打包和部署Hadoop MapReduce作业的正确方法?

来自分类Dev

如何在Hadoop 2.7.0上编译MapReduce作业源代码?

来自分类Dev

hadoop mapreduce-用于获取作业日志的API

来自分类Dev

Hadoop MapReduce作业已启动,但找不到Map类?

来自分类Dev

单节点与多节点上的hadoop MapReduce作业

Related 相关文章

  1. 1

    需要多个输入文件的Python MapReduce Hadoop流作业?

  2. 2

    hadoop:无法运行mapreduce作业

  3. 3

    具有HAR文件输入的MapReduce作业

  4. 4

    Hadoop MapReduce读写序列文件

  5. 5

    Hadoop MapReduce | python中的SMA

  6. 6

    如何在使用Python的hadoop流作业中使用文件?

  7. 7

    通过扩展Hadoop(一个MapReduce作业)写入多个输出

  8. 8

    如何通过hadoop mapreduce作业访问Windows / unix目录中的文本文件

  9. 9

    在不使用PuTTy / SSH的情况下通过Python启动Hadoop MapReduce作业

  10. 10

    空指针异常-Hadoop Mapreduce作业

  11. 11

    无法在hadoop 2.4.0上运行MapReduce作业

  12. 12

    编写一个空的MapReduce作业

  13. 13

    编写一个空的MapReduce作业

  14. 14

    Mapreduce作业配置文件位置

  15. 15

    无法执行基于Python的Hadoop流作业

  16. 16

    Hadoop Mapreduce:自定义输入格式

  17. 17

    Hadoop MapReduce:是否可以仅将一部分输入数据用作MR作业的输入?

  18. 18

    Hadoop / MapReduce中的日志文件分析

  19. 19

    Hadoop MapReduce处理来自HDFS的不同输入文件

  20. 20

    如何不因一个数据库插入失败而使Hadoop MapReduce作业失败?

  21. 21

    流命令失败!在CentOS7的单节点hadoop集群设置中执行MapReduce python代码时

  22. 22

    MapReduce通过python使用hadoop流-将列表从mapper传递给reducer并将其作为列表读取

  23. 23

    为什么在mapreduce作业中需要setMapOutputKeyClass方法

  24. 24

    Hadoop MapReduce作业启动但找不到Map类?

  25. 25

    打包和部署Hadoop MapReduce作业的正确方法?

  26. 26

    如何在Hadoop 2.7.0上编译MapReduce作业源代码?

  27. 27

    hadoop mapreduce-用于获取作业日志的API

  28. 28

    Hadoop MapReduce作业已启动,但找不到Map类?

  29. 29

    单节点与多节点上的hadoop MapReduce作业

热门标签

归档