如何在Hadoop集群中运行使用mrjob编写的MapReduce代码时安装Python模块,如numpy和pandas?

0 投票
1 回答
35 浏览
提问于 2025-04-12 06:43

我正在尝试在Hadoop集群上运行一个使用Python的mrjob的map reduce任务,但在安装一些外部模块,比如numpy和pandas时遇到了一些麻烦。

这是我的mrjob代码:

import sys

from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import TextProtocol
import numpy as np


class UserItemMatrix(MRJob):
    OUTPUT_PROTOCOL = TextProtocol

    def create_user_item_matrix_mapper(self, _, line):
        key, value = line.strip().split("\t")
        key = key.strip().split(";")
        if len(key) == 1:
            yield key[0], value
            return
        user, item = key
        rating = value.strip().split(";")[0]

        yield user, f"{item};{rating}"

    def configure_args(self):
        super(UserItemMatrix, self).configure_args()
        self.add_file_arg("--items-path", help="Path to the items file")

    def create_item_list(self, filename):
        items = []
        with open(filename, "r") as file:
            for line in file:
                item = line.strip()  # Remove leading/trailing whitespaces and newlines
                items.append(float(item))
        return items

    def create_user_item_matrix_reducer_init(self):
        items_path = self.options.items_path
        self.items = self.create_item_list(items_path)

    def create_user_item_matrix_reducer(self, user, values):
        values = [value.strip().split(";") for value in values]
        values = np.array(values, dtype="object")
        # Find rows with length 1
        rows_to_remove = np.array([len(row) == 1 for row in values])

        # Use boolean indexing to create a new array with rows of length 1
        removed_rows = values[rows_to_remove]
        avg_rating = removed_rows[0][0]

        # Use boolean indexing to remove rows from the original array
        coordinates = values[~rows_to_remove]
        coordinates = np.vstack(coordinates).astype(float)

        result = []
        for item in self.items:
            found = False
            for user_item, rating in coordinates:
                if float(user_item) == item:
                    result.append(f"{item};{rating}")
                    found = True
                    break
            if not found:
                result.append(f"{item};{avg_rating}")
        result = "|".join(result)
        yield user, result

    def steps(self):
        return [
            MRStep(
                mapper=self.create_user_item_matrix_mapper,
                reducer_init=self.create_user_item_matrix_reducer_init,
                reducer=self.create_user_item_matrix_reducer,
            )
        ]


if __name__ == "__main__":
    sys.argv[1:] = [
        "./input/input_file_copy.txt",
        "./clustering/output/avg_ratings.txt",
        "--items-path",
        "./input/items.txt",
        "-r",
        "hadoop",
    ]
    UserItemMatrix().run()

这是我的mrjob.conf文件:

{
  "runners": {
    "hadoop": {
      "setup": [
        "sudo pip install numpy"
      ]
    }
  }
}

我的任务运行失败,输出如下:

Using configs in /home/mackop/.mrjob.conf
Looking for hadoop binary in /home/mackop/hadoop-3.3.6/bin...
Found hadoop binary: /home/mackop/hadoop-3.3.6/bin/hadoop
Using Hadoop version 3.3.6
Looking for Hadoop streaming jar in /home/mackop/hadoop-3.3.6...
Found Hadoop streaming jar: /home/mackop/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar
Creating temp directory /tmp/create_user_item_matrix.mackop.20240328.135901.991160
uploading working dir files to hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd...
Copying other local files to hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar6233086433744078313/] [] /tmp/streamjob1183834584487146058.jar tmpDir=null
  Connecting to ResourceManager at /0.0.0.0:8032
  Connecting to ResourceManager at /0.0.0.0:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/mackop/.staging/job_1711623965898_0001
  Total input files to process : 2
  number of splits:3
  Submitting tokens for job: job_1711623965898_0001
  Executing with tokens: []
  resource-types.xml not found
  Unable to find 'resource-types.xml'.
  Submitted application application_1711623965898_0001
  The url to track the job: http://vivobook:8088/proxy/application_1711623965898_0001/
  Running job: job_1711623965898_0001
  Job job_1711623965898_0001 running in uber mode : false
   map 0% reduce 0%
  Task Id : attempt_1711623965898_0001_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000002_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000000_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000001_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000002_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000000_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

  Task Id : attempt_1711623965898_0001_m_000002_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

   map 100% reduce 100%
  Job job_1711623965898_0001 failed with state FAILED due to: Task failed task_1711623965898_0001_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0 killedMaps:0 killedReduces: 0

  Job not successful!
  Streaming Command Failed!
Counters: 14
        Job Counters 
                Data-local map tasks=3
                Failed map tasks=10
                Killed map tasks=2
                Killed reduce tasks=1
                Launched map tasks=12
                Other local map tasks=9
                Total megabyte-milliseconds taken by all map tasks=31662080
                Total time spent by all map tasks (ms)=30920
                Total time spent by all maps in occupied slots (ms)=30920
                Total time spent by all reduces in occupied slots (ms)=0
                Total vcore-milliseconds taken by all map tasks=30920
        Map-Reduce Framework
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
Scanning logs for probable cause of failure...
Looking for history log in hdfs:///tmp/hadoop-yarn/staging...
Looking for history log in /home/mackop/hadoop-3.3.6/logs...
Looking for task syslogs in /home/mackop/hadoop-3.3.6/logs/userlogs/application_1711623965898_0001...

Probable cause of failure:

Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)


Step 1 of 1 failed: Command '['/home/mackop/hadoop-3.3.6/bin/hadoop', 'jar', '/home/mackop/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar', '-files', 'hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd/create_user_item_matrix.py#create_user_item_matrix.py,hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd/items.txt#items.txt,hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd/mrjob.zip#mrjob.zip,hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/wd/setup-wrapper.sh#setup-wrapper.sh', '-input', 'hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/input_file_copy.txt', '-input', 'hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/files/avg_ratings.txt', '-output', 'hdfs:///user/mackop/tmp/mrjob/create_user_item_matrix.mackop.20240328.135901.991160/output', '-mapper', '/bin/sh -ex setup-wrapper.sh python3 create_user_item_matrix.py --step-num=0 --mapper --items-path items.txt', '-reducer', '/bin/sh -ex setup-wrapper.sh python3 create_user_item_matrix.py --step-num=0 --reducer --items-path items.txt']' returned non-zero exit status 256.

当我在本地直接运行时,这个任务可以正常工作。只有在使用Hadoop集群时才会出现这个错误。我觉得问题可能是它无法导入numpy。

1 个回答

0

首先,下次请提供一下你尝试执行任务时的日志(可以通过点击任务日志查看)。

接下来,我觉得问题可能出在你使用了sudo,这会导致你无法输入密码。

另外,我建议你使用虚拟环境(venv),而不是直接安装库。设置文件应该是:

runners:
   hadoop:
       setup:
       - 'set -e'
       - VENV=/tmp/$mapreduce_job_id
       - if [ ! -e $VENV ]; then virtualenv $VENV; fi
       - . $VENV/bin/activate
       - 'pip install numpy'

记得虚拟环境(venv)需要用管理员权限来安装。你也可以参考这个帖子

撰写回答