如何在使用Python的hadoop流作业中使用文件?

埃勒姆

我想从hadoop流作业中的文件中读取列表。这是我简单的mapper.py:

#!/usr/bin/env python

import sys
import json

def read_file():
    id_list = []
    #read ids from a file
    f = open('../user_ids','r')
    for line in f:
        line = line.strip()
        id_list.append(line)
    return id_list

if __name__ == '__main__':
    id_list = set(read_file())
    # input comes from STDIN (standard input)
    for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()
        line = json.loads(line)
        user_id = line['user']['id']
        if str(user_id) in id_list:
            print '%s\t%s' % (user_id, line)

这是我的reducer.py

#!/usr/bin/env python

from operator import itemgetter
import sys

current_id = None
current_list = []
id = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    id, line = line.split('\t', 1)

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_id == id:
        current_list.append(line)
    else:
        if current_id:
            # write result to STDOUT
            print '%s\t%s' % (current_id, current_list)
        current_id = id
        current_list = [line]

# do not forget to output the last word if needed!
if current_id == id:
        print '%s\t%s' % (current_id, current_list)

现在运行它,我说:

hadoop jar contrib/streaming/hadoop-streaming-1.1.1.jar -file ./mapper.py \
    -mapper ./mapper.py -file ./reducer.py -reducer ./reducer.py \
    -input test/input.txt  -output test/output -file '../user_ids' 

作业开始运行:

13/11/07 05:04:52 INFO streaming.StreamJob:  map 0%  reduce 0%
13/11/07 05:05:21 INFO streaming.StreamJob:  map 100%  reduce 100%
13/11/07 05:05:21 INFO streaming.StreamJob: To kill this job, run:

我得到错误:

job not successful. Error: # of failed Map Tasks exceeded allowed limit. FailedCount: 1.         LastFailedTask: task_201309172143_1390_m_000001
13/11/07 05:05:21 INFO streaming.StreamJob: killJob...

我不从文件../user_ids中读取ID时,不会出现任何错误。我认为问题是找不到我的../user_id文件。我也使用了hdfs中的位置,但仍然无法正常工作。谢谢你的帮助。

克里斯·怀特
hadoop jar contrib/streaming/hadoop-streaming-1.1.1.jar -file ./mapper.py \
  -mapper ./mapper.py -file ./reducer.py -reducer ./reducer.py \
  -input test/input.txt  -output test/output -file '../user_ids'

执行作业时,.. / user_ids是否在本地文件路径上?如果确实如此,那么您需要修改映射器代码,以解决以下事实:在运行时此文件将在映射器的本地工作目录中可用:

f = open('user_ids','r')

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何在Python中使用Quart发送文件流?

来自分类Dev

如何在Python程序中使用Flask接收流数据

来自分类Dev

在Hadoop Streaming中使用Python使用文件

来自分类Dev

如何在管道作业中使用Jenkins重作业属性

来自分类Dev

如何在python中使用文件路径?

来自分类Dev

如何使用Hadoop 2.x提交Hadoop流作业并检查执行历史记录

来自分类Dev

如何在JS中使用流

来自分类Dev

Java - 如何在流中使用函数?

来自分类Dev

如何在EMR hadoop流作业中更改内存

来自分类Dev

如何在后台作业脚本块中使用文件中的功能

来自分类Dev

需要多个输入文件的Python MapReduce Hadoop流作业?

来自分类Dev

需要3个输入文件的Python MapReduce Hadoop流作业?

来自分类Dev

如何使用 Python 代码启动数据流作业

来自分类Dev

如何在Python中使用固定数量的线程处理队列中的多个作业

来自分类Dev

如何在python文件中使用外部代码python

来自分类Dev

如何在SSIS数据流任务中使用格式化文件?

来自分类Dev

我们如何在apache nifi中使用java代码生成流文件

来自分类Dev

如何在Node Cron作业中使用请求/响应

来自分类Dev

如何在Kubernetes集群中使用作业队列

来自分类Dev

如何在 servlet 中使用特定作业实现 Quartz Scheduler?

来自分类Dev

如何在jar文件中使用文件?

来自分类Dev

如何在python中使用终端命令写入文件?

来自分类Dev

如何在Python中使用熊猫跳过读取空文件

来自分类Dev

如何在python中使用urllib下载.torrent文件?

来自分类Dev

如何在python代码中使用grpc文件?

来自分类Dev

如何在python中使用Selenium下载文件?

来自分类Dev

如何在python中使用终端命令写入文件?

来自分类Dev

如何在Python中使用Appengine读写文件?

来自分类Dev

如何在Python中使用输入和输出文件

Related 相关文章

  1. 1

    如何在Python中使用Quart发送文件流?

  2. 2

    如何在Python程序中使用Flask接收流数据

  3. 3

    在Hadoop Streaming中使用Python使用文件

  4. 4

    如何在管道作业中使用Jenkins重作业属性

  5. 5

    如何在python中使用文件路径?

  6. 6

    如何使用Hadoop 2.x提交Hadoop流作业并检查执行历史记录

  7. 7

    如何在JS中使用流

  8. 8

    Java - 如何在流中使用函数?

  9. 9

    如何在EMR hadoop流作业中更改内存

  10. 10

    如何在后台作业脚本块中使用文件中的功能

  11. 11

    需要多个输入文件的Python MapReduce Hadoop流作业?

  12. 12

    需要3个输入文件的Python MapReduce Hadoop流作业?

  13. 13

    如何使用 Python 代码启动数据流作业

  14. 14

    如何在Python中使用固定数量的线程处理队列中的多个作业

  15. 15

    如何在python文件中使用外部代码python

  16. 16

    如何在SSIS数据流任务中使用格式化文件?

  17. 17

    我们如何在apache nifi中使用java代码生成流文件

  18. 18

    如何在Node Cron作业中使用请求/响应

  19. 19

    如何在Kubernetes集群中使用作业队列

  20. 20

    如何在 servlet 中使用特定作业实现 Quartz Scheduler?

  21. 21

    如何在jar文件中使用文件?

  22. 22

    如何在python中使用终端命令写入文件?

  23. 23

    如何在Python中使用熊猫跳过读取空文件

  24. 24

    如何在python中使用urllib下载.torrent文件?

  25. 25

    如何在python代码中使用grpc文件?

  26. 26

    如何在python中使用Selenium下载文件?

  27. 27

    如何在python中使用终端命令写入文件?

  28. 28

    如何在Python中使用Appengine读写文件?

  29. 29

    如何在Python中使用输入和输出文件

热门标签

归档