在Python中:某些子进程变为僵尸进程,原因不明

2 投票
1 回答
8679 浏览
提问于 2025-04-17 19:49

补充说明:答案是操作系统因为我占用了所有内存而终止了一些进程。

我启动了足够的子进程,确保负载平均和核心数保持1:1,但在某个时刻,这个脚本可以运行好几天,结果有3个进程崩溃了:

tipu   14804  0.0  0.0 328776   428 pts/1    Sl   00:20   0:00 python run.py
tipu   14808 64.4 24.1 2163796 1848156 pts/1 Rl   00:20  44:41 python run.py
tipu   14809  8.2  0.0      0     0 pts/1    Z    00:20   5:43 [python] <defunct>
tipu   14810 60.3 24.3 2180308 1864664 pts/1 Rl   00:20  41:49 python run.py
tipu   14811 20.2  0.0      0     0 pts/1    Z    00:20  14:04 [python] <defunct>
tipu   14812 22.0  0.0      0     0 pts/1    Z    00:20  15:18 [python] <defunct>
tipu   15358  0.0  0.0 103292   872 pts/1    S+   01:30   0:00 grep python

我不知道为什么会这样,附上了主进程和从进程的代码。如果需要,我也可以附上mysql/pg的封装代码,有什么建议吗?

slave.py

from boto.s3.key import Key
import multiprocessing
import gzip
import os
from  mysql_wrapper import MySQLWrap
from pgsql_wrapper import PGSQLWrap
import boto
import re

class Slave:

    CHUNKS = 250000

    BUCKET_NAME = "bucket"
    AWS_ACCESS_KEY = ""
    AWS_ACCESS_SECRET = ""
    KEY = Key(boto.connect_s3(AWS_ACCESS_KEY, AWS_ACCESS_SECRET).get_bucket(BUCKET_NAME))
    S3_ROOT = "redshift_data_imports"
    COLUMN_CACHE = {}
    DEFAULT_COLUMN_VALUES = {}

    def __init__(self, job_queue):
        self.log_handler = open("logs/%s" % str(multiprocessing.current_process().name), "a");
        self.mysql = MySQLWrap(self.log_handler)
        self.pg = PGSQLWrap(self.log_handler)
        self.job_queue = job_queue


    def do_work(self):
        self.log(str(os.getpid()))
        while True:

            #sample job in the abstract: mysql_db.table_with_date-iteration
            job = self.job_queue.get()

            #queue is empty
            if job is None:
                self.log_handler.close()
                self.pg.close()
                self.mysql.close()
                print("good bye and good day from %d" % (os.getpid()))
                self.job_queue.task_done()
                break

            #curtail iteration
            table = job.split('-')[0]

            #strip redshift table from job name
            redshift_table = re.sub(r"(_[1-9].*)", "", table.split(".")[1])

            iteration = int(job.split("-")[1])
            offset = (iteration - 1) * self.CHUNKS

            #columns redshift is expecting
            #bad tables will slip through and error out, so we catch it
            try:
                colnames = self.COLUMN_CACHE[redshift_table]
            except KeyError:
                self.job_queue.task_done()
                continue

            #mysql fields to use in SELECT statement
            fields = self.get_fields(table)

            #list subtraction determining which columns redshift has that mysql does not
            delta = (list(set(colnames) - set(fields.keys())))

            #subtract columns that have a default value and so do not need padding
            if delta:
                delta = list(set(delta) - set(self.DEFAULT_COLUMN_VALUES[redshift_table]))

            #concatinate columns with padded \N
            select_fields = ",".join(fields.values()) + (",\\N" * len(delta))

            query = "SELECT %s FROM %s LIMIT %d, %d" % (select_fields, table,
                    offset, self.CHUNKS)

            rows = self.mysql.execute(query)

            self.log("%s: %s\n" % (table, len(rows)))

            if not rows:
                self.job_queue.task_done()
                continue

            #if there is more data potentially, add it to the queue
            if len(rows) == self.CHUNKS:
                self.log("putting %s-%s" % (table, (iteration+1)))
                self.job_queue.put("%s-%s" % (table, (iteration+1)))

            #various characters need escaping
            clean_rows = []
            redshift_escape_chars = set( ["\\", "|", "\t", "\r", "\n"] )
            in_chars = ""

            for row in rows:
                new_row = []
                for value in row:
                    if value is not None:
                        in_chars = str(value)
                    else:
                        in_chars = ""

                    #escape any naughty characters
                    new_row.append("".join(["\\" + c if c in redshift_escape_chars else c for c in in_chars]))
                new_row = "\t".join(new_row)
                clean_rows.append(new_row)

            rows = ",".join(fields.keys() + delta)
            rows += "\n" + "\n".join(clean_rows)

            offset = offset + self.CHUNKS

            filename = "%s-%s.gz" % (table, iteration) 
            self.move_file_to_s3(filename, rows)

            self.begin_data_import(job, redshift_table, ",".join(fields.keys() +
               delta))

            self.job_queue.task_done()


    def move_file_to_s3(self, uri, contents):

        tmp_file = "/dev/shm/%s" % str(os.getpid())

        self.KEY.key = "%s/%s" % (self.S3_ROOT, uri)
        self.log("key is %s" % self.KEY.key )

        f = gzip.open(tmp_file, "wb")
        f.write(contents)
        f.close()

        #local saving allows for debugging when copy commands fail
        #text_file = open("tsv/%s" % uri, "w")
        #text_file.write(contents)
        #text_file.close()

        self.KEY.set_contents_from_filename(tmp_file, replace=True)

    def get_fields(self, table):
        """
            Returns a dict used as: 
                {"column_name": "altered_column_name"}
            Currently only the debug column gets altered
        """
        exclude_fields = ["_qproc_id", "_mob_id", "_gw_id", "_batch_id", "Field"]

        query = "show columns from %s" % (table)
        fields = self.mysql.execute(query)

        #key raw field, value mysql formatted field
        new_fields = {}

        #for field in fields:
        for field in [val[0] for val in fields]:
            if field in exclude_fields:
                continue
            old_field = field

            if "debug_mode" == field.strip():
                field = "IFNULL(debug_mode, 0)"

            new_fields[old_field] = field

        return new_fields

    def log(self, text):
        self.log_handler.write("\n%s" % text)

    def begin_data_import(self, table, redshift_table, fields):
        query = "copy %s (%s) from 's3://bucket/redshift_data_imports/%s' \
            credentials 'aws_access_key_id=%s;aws_secret_access_key=%s' delimiter '\\t' \
            gzip NULL AS '' COMPUPDATE ON ESCAPE IGNOREHEADER 1;" \
            % (redshift_table, fields, table, self.AWS_ACCESS_KEY, self.AWS_ACCESS_SECRET)
        self.pg.execute(query)

master.py

from slave import Slave as Slave 
import multiprocessing
from mysql_wrapper import MySQLWrap as MySQLWrap
from pgsql_wrapper import PGSQLWrap as PGSQLWrap


class Master:

    SLAVE_COUNT = 5

    def __init__(self):
        self.mysql = MySQLWrap()
        self.pg = PGSQLWrap()

    def do_work(table):
        pass

    def get_table_listings(self):
        """Gathers a list of MySQL log tables needed to be imported"""

        query = 'show databases'
        result = self.mysql.execute(query)

        #turns list[tuple] into a flat list
        databases = list(sum(result, ()))

        #overriding during development
        databases = ['db1', 'db2', 'db3']]

        exclude = ('mysql', 'Database', 'information_schema')
        scannable_tables = []

        for database in databases:
            if database in exclude:
                continue

            query = "show tables from %s" % database
            result = self.mysql.execute(query)

            #turns list[tuple] into a flat list
            tables = list(sum(result, ()))

            for table in tables:
                exclude = ("Tables_in_%s" % database, "(", "201303", "detailed", "ltv")

                #exclude any of the unfavorables
                if any(s in table for s in exclude):
                    continue

                scannable_tables.append("%s.%s-1" % (database, table))

        return scannable_tables

    def init(self):
        #fetch redshift columns once and cache
        #get columns from redshift so we can pad the mysql column delta with nulls
        tables = ('table1', 'table2', 'table3')

        for table in tables:

            #cache columns
            query = "SELECT column_name FROM information_schema.columns WHERE \
            table_name = '%s'" % (table)
            result = self.pg.execute(query, async=False, ret=True)
            Slave.COLUMN_CACHE[table] = list(sum(result, ()))

            #cache default values
            query = "SELECT column_name FROM information_schema.columns WHERE \
            table_name = '%s' and column_default is not \
            null" % (table)

            result = self.pg.execute(query, async=False, ret=True)

            #turns list[tuple] into a flat list
            result = list(sum(result, ()))

            Slave.DEFAULT_COLUMN_VALUES[table] = result

    def run(self):
        self.init()

        job_queue = multiprocessing.JoinableQueue()
        tables = self.get_table_listings()
        for table in tables:
            job_queue.put(table)

        processes = []
        for i in range(Master.SLAVE_COUNT):
            process = multiprocessing.Process(target=slave_runner, args=(job_queue,))
            process.daemon = True
            process.start()
            processes.append(process)

        #blocks this process until queue reaches 0
        job_queue.join()

        #signal each child process to GTFO
        for i in range(Master.SLAVE_COUNT):
            job_queue.put(None)

        #blocks this process until queue reaches 0
        job_queue.join()

        job_queue.close()

        #do not end this process until child processes close out
        for process in processes:
            process.join()

        #toodles !
        print("this is master saying goodbye")


def slave_runner(queue):
    slave = Slave(queue)
    slave.do_work()

1 个回答

6

信息不太够,不能完全确定,但问题很可能是因为 Slave.do_work 抛出了一个没有处理的异常。(你的代码中有很多地方可能会在不同情况下出现这种情况。)

当发生这种情况时,子进程会直接退出。

在 POSIX 系统上……具体情况有点复杂,但在简单的情况下(就像你这里的情况),一个退出的子进程会变成一个 <defunct> 进程,直到它被清理掉(因为父进程要么对它 wait,要么自己退出)。由于你的父进程在队列完成之前并没有等待子进程,所以就会出现这种情况。

所以,有一个简单的解决办法:

def do_work(self):
    self.log(str(os.getpid()))
    while True:
        try:
            # the rest of your code
        except Exception as e:
            self.log("something appropriate {}".format(e))
            # you may also want to post a reply back to the parent

你可能还想把那个庞大的 try 拆分成几个小的,这样你就能区分出不同的阶段,看看在哪些地方可能出错(特别是有些地方需要回复,有些则不需要)。


不过,看起来你想要做的事情是完全复制 multiprocessing.Pool 的行为,但在几个地方没有做到。这就引出了一个问题:为什么不直接使用 Pool 呢?这样你可以通过使用 map 家族的方法进一步简化和优化代码。例如,你的整个 Master.run 可以简化为:

self.init()
pool = multiprocessing.Pool(Master.SLAVE_COUNT, initializer=slave_setup)
pool.map(slave_job, tables)
pool.join()

这样会帮你处理异常,并允许你在后面需要时返回值或异常,还能让你使用内置的 logging 库,而不是自己去构建一个,等等。而且只需要对 Slave 做大约十几行的小改动,就可以完成了。


如果你想在任务内部提交新任务,最简单的方法可能是使用基于 Future 的 API(这会改变思路,把未来的结果作为重点,而池/执行器则是提供这些结果的工具,而不是把池作为重点,结果作为附带的东西),不过用 Pool 也有多种方法可以做到。例如,现在你没有从每个任务返回任何东西,所以你可以直接返回一个要执行的 tables 列表。这里有一个简单的例子,展示了怎么做:

import multiprocessing

def foo(x):
    print(x, x**2)
    return list(range(x))

if __name__ == '__main__':
    pool = multiprocessing.Pool(2)
    jobs = [5]
    while jobs:
        jobs, oldjobs = [], jobs
        for job in oldjobs:
            jobs.extend(pool.apply(foo, [job]))
    pool.close()
    pool.join()

显然,你可以通过用列表推导式替代整个循环来简化这个,并且通过给每个任务传递一个“提交者”对象来让代码看起来更整洁,而不是返回一个新任务的列表,等等。但我想尽量让它变得明确,以展示其实没什么复杂的。


无论如何,如果你觉得显式队列更容易理解和管理,那就去做吧。只需查看 multiprocessing.worker 和/或 concurrent.futures.ProcessPoolExecutor 的源代码,看看你需要自己做些什么。这并不难,但有很多地方可能会出错(就我个人而言,我每次尝试做这种事情时,总会忘记至少一个边界情况),所以查看那些做对的代码是值得的。


另外,似乎你不能在这里使用 concurrent.futures.ProcessPoolExecutor 的唯一原因是你需要初始化一些每个进程的状态(比如 boto.s3.key.KeyMySqlWrap 等),这可能是出于很好的缓存原因。(如果这涉及到网络服务查询、数据库连接等,你肯定不想每个任务都做一次!)但有几种方法可以解决这个问题。

你可以子类化 ProcessPoolExecutor 并重写未记录的函数 _adjust_process_count(查看 源代码,你会发现这很简单),来传递你的设置函数,然后……就这样。

或者你可以混合使用。把 concurrent.futuresFuture 包装在 multiprocessingAsyncResult 周围。

撰写回答