Java/Python中的快速IPC/Socket通信

9 投票
2 回答
17301 浏览
提问于 2025-04-17 12:54

在我的应用程序中,有两个进程(一个是Java,另一个是Python)需要进行通信。我发现,使用socket进行通信占用了93%的运行时间。为什么通信这么慢呢?我是不是应该考虑其他的通信方式,还是说可以让这个过程更快一些?

更新:我发现了一个简单的解决办法。似乎Buffered输出流在某些未知的原因下并没有真正起到缓冲的作用。所以,我现在在客户端和服务器端的进程中都把所有数据放进字符串缓冲区,然后在flush方法中将其写入socket。

我仍然对使用共享内存来快速交换数据的例子很感兴趣。

一些额外的信息:

  1. 应用程序中的消息大小大多数时间都在64kb以下。
  2. 服务器是用Java写的,客户端是用Python写的。
  3. 下面是socket进程间通信的实现:发送200字节需要50个周期!这个时间太长了。如果我在5000个周期中只发送2字节,所需时间要少得多。
  4. 这两个进程都在同一台Linux机器上运行。
  5. 在实际应用中,每个周期大约会调用10次客户端的iFid.write()。
  6. 这个操作是在Linux系统上进行的。

这是服务器端的代码:

public class FastIPC{
    public PrintWriter out;
    BufferedReader in;
    Socket socket = null;
    ServerSocket serverSocket = null;


    public FastIPC(int port) throws Exception{
        serverSocket = new ServerSocket(port);
        socket = serverSocket.accept();
        out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())), true);
        in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    }

    public void send(String msg){
        out.println(msg); // send price update to socket
    }

    public void flush(){
        out.flush();
    }

    public String recv() throws Exception{
        return in.readLine();
    }

    public static void main(String[] args){
        int port = 32000;
        try{
            FastIPC fip = new FastIPC(port);
            long start = new Date().getTime();
            System.out.println("Connected.");
            for (int i=0; i<50; i++){
                for(int j=0; j<100; j++)
                    fip.send("+");
                fip.send(".");
                fip.flush();
                String msg = fip.recv();
            }
            long stop = new Date().getTime();
            System.out.println((double)(stop - start)/1000.);
        }catch(Exception e){
            System.exit(1);
        }
    }
}

客户端的代码是:

import sys
import socket

class IPC(object):
    def __init__(self):
        self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.s.connect(("localhost", 32000))
        self.fid = self.s.makefile() # file wrapper to read lines
        self.listenLoop() # wait listening for updates from server

    def listenLoop(self):
        fid = self.fid
        print "connected"
        while True:
            while True:
                line = fid.readline()
                if line[0]=='.':
                    break
            fid.write('.\n')
            fid.flush()

if __name__ == '__main__':
    st = IPC()

2 个回答

1

一些想法

  • 服务器是用Java写的,客户端是用Python写的。

这听起来有点奇怪,但有没有什么理由不能通过标准输入输出互相调用呢?

  • 下面实现了套接字进程间通信(IPC):发送200字节需要50个周期!这个时间太长了。如果我发送2字节,5000个周期就能快很多。

任何对操作系统的调用相对来说都会比较慢(延迟方面)。使用共享内存可以绕过内核。如果你的问题是吞吐量,我发现如果延迟不是问题,使用套接字可以达到1-2 GB/s的速度。

  • 这两个进程都在同一台Linux机器上运行。

这使得共享内存非常理想。

  • 在实际应用中,每个周期大约会调用10次客户端的iFid.write()。

我不太明白为什么会这样。为什么不构建一个单一的结构/缓冲区,然后一次性写入呢?我会使用NIO中的直接缓冲区来减少延迟。使用字符转换的成本很高,特别是如果你只需要ASCII的话。

  • 这一切都是在Linux系统上完成的。

应该很容易进行优化。

我通过内存映射文件使用共享内存。这是因为我需要记录每条消息以备审计。我获得的平均延迟大约是180纳秒的往返时间,持续处理数百万条消息,而在实际应用中大约是490纳秒。

这种方法的一个优点是,如果有短暂的延迟,读取者可以很快跟上写入者的速度。它也支持轻松的重启和复制。

这仅在Java中实现,但这个原理足够简单,我相信在Python中也能工作。

https://github.com/peter-lawrey/Java-Chronicle

15

你有很多选择。既然你在用Linux,可以考虑使用UNIX域套接字。或者,你也可以把数据转换成ASCII、JSON或者其他格式,然后通过管道、共享内存、消息队列、DBUS等方式传输。考虑一下你要处理的数据类型,因为这些进程间通信(IPC)方式的性能特点各不相同。这里有一篇USENIX草稿论文,对各种选择的优缺点进行了很好的分析,值得一读。

既然你在这个回答的评论中提到你更喜欢使用共享内存,这里有一些代码示例可以帮助你入门。使用Python的posix_ipc库:

import posix_ipc # POSIX-specific IPC
import mmap      # From Python stdlib

class SharedMemory(object):
    """Python interface to shared memory. 
    The create argument tells the object to create a new SHM object,
    rather than attaching to an existing one.
    """

    def __init__(self, name, size=posix_ipc.PAGE_SIZE, create=True):
        self.name = name
        self.size = size
        if create:
            memory = posix_ipc.SharedMemory(self.name, posix_ipc.O_CREX,
                                            size=self.size)
        else:
            memory = posix_ipc.SharedMemory(self.name)
        self.mapfile = mmap.mmap(memory.fd, memory.size)
        os.close(memory.fd)
        return

    def put(self, item):
        """Put item in shared memory.
        """
        # TODO: Deal with the case where len(item) > size(self.mapfile)
        # TODO: Guard this method with a named semaphore
        self.mapfile.seek(0)
        pickle.dump(item, self.mapfile, protocol=2)
        return

    def get(self):
        """Get a Python object from shared memory.
        """
        # TODO: Deal with the case where len(item) > size(self.mapfile)
        # TODO: Guard this method with a named semaphore
        self.mapfile.seek(0)
        return pickle.load(self.mapfile)

    def __del__(self):
        try:
            self.mapfile.close()
            memory = posix_ipc.SharedMemory(self.name)
            memory.unlink()
        except:
            pass
        return    

在Java这边,你需要创建一个相同的类,尽管我在评论中说过的,JTux似乎提供了相应的功能,而你需要的API在UPosixIPC类中。

下面的代码是你需要实现的内容的大致框架。不过,还有几个地方需要补充——异常处理是显而易见的缺失,还有一些标志(可以在UConstant中找到),你还需要添加一个信号量来保护putget方法。不过,这应该能让你走上正轨。记住,mmap或内存映射文件是一个类似文件的接口,用于访问一段RAM。因此,你可以像使用普通文件的fd一样使用它的文件描述符。

import jtux.*;

class SHM {

    private String name;
    private int size;
    private long semaphore;
    private long mapfile; // File descriptor for mmap file

    /* Lookup flags and perms in your system docs */
    public SHM(String name, int size, boolean create, int flags, int perms) {
        this.name = name;
        this.size = size;
        int shm;
        if (create) {
            flags = flags | UConstant.O_CREAT;
            shm = UPosixIPC.shm_open(name, flags, UConstant.O_RDWR);
        } else {
            shm = UPosixIPC.shm_open(name, flags, UConstant.O_RDWR);
        }
        this.mapfile = UPosixIPC.mmap(..., this.size, ..., flags, shm, 0);
        return;
    }


    public void put(String item) {
        UFile.lseek(this.mapfile(this.mapfile, 0, 0));
        UFile.write(item.getBytes(), this.mapfile);
        return;
    }


    public String get() {    
        UFile.lseek(this.mapfile(this.mapfile, 0, 0));
        byte[] buffer = new byte[this.size];
        UFile.read(this.mapfile, buffer, buffer.length);
        return new String(buffer);
    }


    public void finalize() {
        UPosix.shm_unlink(this.name);
        UPosix.munmap(this.mapfile, this.size);
    }

}

撰写回答