通过Python脚本导入模块进行Pig流处理

4 投票
2 回答
9147 浏览
提问于 2025-04-17 06:52

在使用 pigtmp$ pig --version 命令时,显示的内容是:

Apache Pig 版本 0.8.1-cdh3u1 (rexported),编译时间是 2011年7月18日,08:29:40。

我有一个 Python 脚本(c-python),它导入了另一个脚本,在我的例子中这两个脚本都很简单:

数据部分:

example$ hadoop fs -cat /user/pavel/trivial.log

1   one
2   two
3   three

没有包含的例子 - 运行得很好

example$ pig -f trivial_stream.pig

(1,1,one)
()
(1,2,two)
()
(1,3,three)
()

其中:

  1. trivial_stream.pig:
  2. DEFINE test_stream `test_stream.py` SHIP ('test_stream.py');
    A = LOAD 'trivial.log' USING PigStorage('\t') AS (mynum: int, mynumstr: chararray);
    C = STREAM A THROUGH test_stream;
    DUMP C;
    
  3. test_stream.py
  4. #! /usr/bin/env python
    
    import sys
    import string
    
    for line in sys.stdin:
        if len(line) == 0: continue
        new_line = line
        print "%d\t%s" % (1, new_line) 
    

所以基本上我只是用一个键来聚合行,没什么特别的。

包含的例子 - 出错了!

现在我想从一个 Python 导入模块中添加一个字符串,这个模块和 test_stream.py 在同一个目录下。我尝试了很多不同的方法来引入这个模块,但总是出现同样的错误(见下文)。

1) trivial_stream.pig:

DEFINE test_stream `test_stream.py` SHIP ('test_stream.py', 'test_import.py');
A = LOAD 'trivial.log' USING PigStorage('\t') AS (mynum: int, mynumstr: chararray);
C = STREAM A THROUGH test_stream;
DUMP C;

2) test_stream.py

#! /usr/bin/env python

import sys
import string

import test_import

for line in sys.stdin:
    if len(line) == 0: continue
    new_line = ("%s-%s") % (line.strip(), test_import.getTestLine())
    print "%d\t%s" % (1, new_line) 

3) test_import.py

def getTestLine():
    return "test line";

现在:

example$ pig -f trivial_stream.pig

后端错误信息

org.apache.pig.backend.executionengine.ExecException: ERROR 2055: Received Error while processing the map plan: 'test_stream.py ' failed with exit status: 1
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:265)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.cleanup(PigMapBase.java:103)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
    at org.apache.hadoop.mapred.Child.main(Child.java:264)

Pig 堆栈跟踪

ERROR 2997: Unable to recreate exception from backed error: org.apache.pig.backend.executionengine.ExecException: ERROR 2055: Received Error while processing the map plan: 'test_stream.py ' failed with exit status: 1

org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1066: Unable to open iterator for alias C. Backend error : Unable to recreate exception from backed error: org.apache.pig.backend.executionengine.ExecException: ERROR 2055: Received Error while processing the map plan: 'test_stream.py ' failed with exit status: 1
    at org.apache.pig.PigServer.openIterator(PigServer.java:753)
    at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:615)
    at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:303)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:168)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:144)
    at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:90)
    at org.apache.pig.Main.run(Main.java:396)
    at org.apache.pig.Main.main(Main.java:107)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 2997: Unable to recreate exception from backed error: org.apache.pig.backend.executionengine.ExecException: ERROR 2055: Received Error while processing the map plan: 'test_stream.py ' failed with exit status: 1
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher.getErrorMessages(Launcher.java:221)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher.getStats(Launcher.java:151)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher.launchPig(MapReduceLauncher.java:337)
    at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.execute(HExecutionEngine.java:382)
    at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1209)
    at org.apache.pig.PigServer.storeEx(PigServer.java:885)
    at org.apache.pig.PigServer.store(PigServer.java:827)
    at org.apache.pig.PigServer.openIterator(PigServer.java:739)
    ... 7 more

非常感谢你的帮助!

-Pavel

2 个回答

1

你需要把当前目录添加到 sys.path 里,在你的 test_stream.py 文件中:

#! /usr/bin/env python

import sys
sys.path.append(".")

所以你之前的 SHIP 命令确实可以发送 Python 脚本,但你只需要告诉 Python 到哪里去找这个脚本。

3

上面评论中的正确答案:

依赖的文件没有被打包,如果你想让你的 Python 应用和 Pig 一起工作,你需要把它打成一个压缩包(别忘了加上 init.py 文件!),然后在 Pig 的 SHIP 语句中包含这个 .tar 文件。你首先要做的就是解压这个应用。可能会遇到路径的问题,所以我建议在解压之前,先执行以下代码:sys.path.insert(0, os.getcwd())。

撰写回答