使用readline()时Java和Python进程会冻结,而使用input()时不会

1 投票
1 回答
65 浏览
提问于 2025-04-12 16:16

我有一个Java程序,需要处理一长串输入字符串。为了做到这一点,它会逐个处理每个字符串,把它传给一个Process(一个Python脚本),然后从ProcessOutputStream获取结果,再继续下一个字符串。不过我发现,程序运行几个小时后就会卡住,Java程序在等Python的输出。

为了调试,我做了一个简化版的程序,使用小字符串,不在Java端进行任何缓存,也不修改Python脚本中的数据。但现在我发现它在不同的地方卡住了,Java试图把数据发送给Python脚本,而Python脚本又试图把结果发送回Java。我发现每次运行程序时,它能处理的项目数量略有不同,而且字符串越长,能处理的项目数量就越少。

Java程序:

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.SECONDS;

public class Main {
    public static void main(String[] args) throws IOException, InterruptedException {
        Process process = start("python", "test.py");
        for (int i = 0; i < 1000; i++)  {
            System.out.println(i);
            processText("test string test string test string test string ", process);
        }
        process.getOutputStream().close();
        boolean finished = process.waitFor(10, SECONDS);
        if (!finished) {
            process.destroyForcibly();
        }
    }
    
    public static Process start(String... command) throws IOException {
        ProcessBuilder processBuilder = new ProcessBuilder(command);
        processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT);
        return processBuilder.start();
    }
    
    public static String processText(String text, Process process) throws IOException {
        byte[] bytes = (text + "\n").getBytes(UTF_8);
        OutputStream outputStream = process.getOutputStream();
        System.out.println("Writing...");
        outputStream.write(bytes);
        System.out.println("Done!");
        outputStream.flush();
        System.out.println("Reading...");
        String result = readLn(process);
        System.out.println("Got it!");
        return result;
    }
    
    public static String readLn(Process process) throws IOException {
        InputStream inputStream = process.getInputStream();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        byte newlineByte = "\n".getBytes(UTF_8)[0];
        byte lastByte = -1;
        while (lastByte != newlineByte) {
            lastByte = (byte) inputStream.read();
            byteArrayOutputStream.write(lastByte);
        }
        return byteArrayOutputStream.toString(UTF_8);
    }
}

Python脚本:

import sys
import io

in_stream = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
out_stream = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')

def output(s):
    out_stream.write(s)

text = in_stream.readline()
while text != "":
    print("Outputting result", file=sys.stderr)
    output(text)
    print("Output done!", file=sys.stderr)
    output("\n")
    print("Flushing", file=sys.stderr)
    out_stream.flush()
    print("Flushed", file=sys.stderr)
    text = in_stream.readline()

Java的输出:

0
Writing...
Done!
Reading...
Got it!
1
Writing...
Done!
Reading...
Got it!
.
.
.
379
Writing...
Done!
Reading...
Got it!
380
Writing...
Done! [Freezes here]

Python的输出(通过stderr):

Outputting result
Output done!
Flushing
Flushed
.
.
.
Outputting result
Output done!
Flushing
Flushed
Outputting result
Output done!
Flushing [Freezes here]

当我强制停止Java程序时,我从Python的stderr得到了这个额外的输出:

Flushed
Outputting result
Output done!
Flushing
Traceback (most recent call last):
  File "...\test.py", line 17, in <module>
    out_stream.flush()
OSError: [Errno 22] Invalid argument
Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='cp1252'>
OSError: [Errno 22] Invalid argument

如果我使用print()input()代替in_stream和out_stream,我就能顺利处理完1000个项目。不过我想确保在Java和Python之间传递数据时使用UTF-8编码,这样可以包含所有Unicode字符,不会丢失任何数据。这就是我使用TextIOWrapper的原因,基于我在网上看到的(我觉得这对处理大量数据是最有效的方式)。不过最后这个错误输出似乎在说它使用的是cp1252而不是UTF-8?我该怎么解决这个问题?

我使用的是Windows 10,Java 17和Python 3.10

编辑:我觉得错误信息是说sys.stdout使用的是cp1252编码;而in_stream和out_stream说它们使用的是utf-8编码,所以我认为编码是没问题的。我现在只需把stdin设置为UTF-8的TextIOWrapper并使用input()stdin.readline()不行,它会卡住)。但我不知道为什么原来的方法不行,以及为什么这样就解决了问题。如果有人能解释一下,并指出如何避免将来出现这种问题(out_stream也可能导致卡住吗?)我会接受他们的回答。

import sys
import io

sys.stdin = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
out_stream = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')

def output(str):
    out_stream.write(str)

def read_text():
    try:
        return input()
    except EOFError:
        return ""

text = read_text()
while text != "":
    print("Outputting result", file=sys.stderr)
    output(text)
    print("Output done!", file=sys.stderr)
    output("\n")
    print("Flushing", file=sys.stderr)
    out_stream.flush()
    print("Flushed", file=sys.stderr)
    text = read_text()

1 个回答

0

ProcessBuilder这个工具在运行子进程时,会有三个数据流。如果这些流中的一个被填满了,Java和子进程之间的数据交流就会出现问题,甚至会卡住。如果你把Got It那一行改成打印结果,你会发现你读取的流每发送一行输入,实际上包含了多于一行的内容:

System.out.println("Got it! "+result);

这样就会导致死锁,因为你的Java程序每次只写一行,读一行,而Python的输出却是发送两行回来。一个简单的解决办法是去掉多余的换行符output("\n"),这只适用于你的测试程序。

不过一般来说,使用ProcessBuilder时,子进程可能会因为Java代码没有读取标准输出(STDOUT)而卡住。同时,如果Java代码在写入标准输入(STDIN),而子进程又在等待标准输出,这样Java代码也会看起来像是卡住了。反之亦然。所以,千万不要在同一个线程中同时进行读和写,也不要在一个线程中同时从标准输出和标准错误(STDERR)读取。

最好的解决办法是为标准输入、标准输出和标准错误使用不同的线程。你已经在标准错误中使用了INHERIT模式,这样处理应该没问题,或者可以为标准错误使用一个新线程,或者通过调用processBuilder.redirectErrorStream(true);将标准错误合并到标准输出中。

processText拆分成两个方法:一个是processText,只负责outputStream.write,不进行readLn;另一个是readText,只负责readLn部分。在后台线程中运行processText,在主线程中使用readText

在调用waitFor之后,确保你在后台线程上调用join,以确保它已经完成。

这里有一个例子,展示了如何使用后台线程处理来启动进程,你需要调整一下以便将标准输入作为自己的线程来处理。

另外,Python会缓存输出,所以你可以通过使用python -u来启动Python,使其输出不被缓存,从而避免频繁刷新。

这个版本不会卡住,无论你在Python中使用什么版本的标准输入,因为它将标准输入的写入和标准输出的读取分开了:

public static void main_py(String... args) throws IOException, InterruptedException {
    Process process = start("python", "test.py");

    // Handle STDOUT in different thread to STDIN
    Runnable task = () -> {
        System.out.println("task START");
        try(var from = new BufferedReader(new InputStreamReader(process.getInputStream(), UTF_8))) {
            String result = null;

            while((result = from.readLine()) != null)
                System.out.println("Got it! "+result);
        } catch(IOException io) {
            throw new UncheckedIOException(io);
        }
        System.out.println("task END");
    };
    Thread bg = new Thread(task, "STDERR");
    bg.start();

    for (int i = 0; i < 1000; i++)  {
        System.out.println(i);
        processText2("test string test string test string test string "+i, process);
    }

    process.getOutputStream().close();

    boolean finished = process.waitFor(10, SECONDS);
    bg.join();
    if (!finished) {
        process.destroyForcibly();
    }
    System.out.println("main END");
}

public static void processText2(String text, Process process) throws IOException {
    byte[] bytes = (text + "\n").getBytes(UTF_8);
    OutputStream outputStream = process.getOutputStream();
    System.out.println("Writing..."+text);
    outputStream.write(bytes);
    System.out.println("Done!");
    outputStream.flush();
    System.out.println("Reading...");
}

撰写回答