使用SSH(paramiko?)在Python中创建可配置的多线程日志文件累加器

0 投票
1 回答
809 浏览
提问于 2025-04-17 23:08

我准备写点代码,想先把我的想法分享出来,看看有没有人给点意见。

我想在Python中创建一个类,用来同时监控(并合并)多个日志文件,这些日志文件会用在自动化测试中。我希望能在每个文件上执行'tail -f'(可能通过SSH和paramiko),每个文件用一个独立的线程来处理。然后,每隔几秒钟,就从每个线程获取输出,并把它们合并成一个文件,每行加上一个后缀,以便标识来源。这样我就可以编写分布式系统的测试,同时监控大约十台机器的日志(其中很多机器的功能相同,并且在负载均衡器后面等)。

Startup:
    for machine, logfile in config_list:
        create thread running tail -f on logfile on machine
    create accumulator thread that:
        wakes up each second and 
        gets all config_list stdout and merges it into one in-memory list

Test_API:
    method to get/query data from the in memory accumulator.  
    in memory list would be the only data item needed to be synchronized

所以,我想知道:paramiko是不是合适的选择?在处理线程时有没有什么需要注意的地方(我之前从没用过Python的线程)?还有没有其他的想法?

提前谢谢大家!

欢迎分享代码片段。我完成后会更新这个帖子,提供一个可用的解决方案。我预计这个解决方案会很简单。

刚发现这个链接:使用Paramiko同时创建多个SSH连接


编辑

看了其他几个帖子后,我目前有了这个进展。它只是执行了一个tail,而不是tail -f,也没有我需要的轮询功能。

from someplace import TestLogger
import threading
import paramiko


def start_watching():

    logger = TestLogger().get()
    logs_to_watch = [('somemachine1', '/var/log/foo'),
                     ('somemachine2', '/var/log/bar')]

    threads = []
    for machine, filename in logs_to_watch:
        logger.info(machine)
        logger.info(filename)
        t = threading.Thread(target=workon, args=(machine, filename))
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

    for merge_line in merged_log:
        logger.info(merge_line.dump())

outlock = threading.Lock()
merged_log = []

def workon(host, logfile):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(host, username='yourusername', allow_agent=True, look_for_keys=True)
    stdin, stdout, stderr = ssh.exec_command('sudo tail ' + logfile)

    stdin.flush()

    with outlock:
        line = stdout.readline()
        while line:
            line = stdout.readline()
            merged_log.append(MergeLogLine(line, host, logfile))


class MergeLogLine():
    def __init__(self, line, host, logfile):
        self._line = line
        self._host = host
        self._logfile = logfile

    def line(self):
        return self._line

    def host(self):
        return self._host

    def logfile(self):
        return self._logfile

    def dump(self):
        return self._line + '(from host = ' + self._host + ', log = ' + self._logfile + ')'

1 个回答

1

这件事其实挺难的。下面是一个可以运行的示例:

示例“客户端代码”:

import sys
import traceback
import tail_accumulate as ta
import time


def main(argv):

    user = 'cmead'
    logs_to_watch = [('somemachine1', '/var/log/bar/sample.log'),
                     ('somemachine2', '/var/log/foo')]

    tac = ta.TailAccumulateConfig(logs_to_watch, user)

    try:
        ta.start_watching(tac)

        time.sleep(10)

        for merge_line in ta.get_merged_log():
            print merge_line.dump()

    except Exception as e:
        print traceback.format_exc()

    ta.stop()


if __name__ == "__main__":
    main(sys.argv[1:])

尾部累积包:

import threading
import paramiko
import select
import time

threads = []
stopFlag = None


class TailAccumulateConfig():
    def __init__(self, log_list, user):
        self._log_list = log_list
        self._user = user

    def user(self):
        return self._user

    def log_list(self):
        return self._log_list


def start_watching(tail_accumulate_config):
    global stopFlag
    stopFlag = threading.Event()
    for machine, filename in tail_accumulate_config.log_list():
        t = LogListenWorker(stopFlag, machine, filename, tail_accumulate_config.user())
        t.start()
        global threads
        threads.append(t)


def stop():
    global stopFlag
    stopFlag.set()


def get_merged_log():
    with outlock:
        global merged_log
        temp = merged_log[:]
        del merged_log[:]
        return temp

outlock = threading.Lock()
merged_log = []


class LogListenWorker(threading.Thread):
    def __init__(self, event, host, logfile, username):
        threading.Thread.__init__(self)
        self.stopped = event
        self.host = host
        self.logfile = logfile
        self.username = username

    def run(self):
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(self.host, username=self.username)
        transport = ssh.get_transport()
        channel = transport.open_session()
        channel.exec_command('sudo tail -f ' + self.logfile)

        while not self.stopped.isSet():
            try:
                rl, wl, xl = select.select([channel],[],[],3.0)
                if len(rl) > 0:
                    # Must be stdout
                    line = channel.recv(1024)
                else:
                    time.sleep(1.0)
                    continue

            except Exception as e:
                break
            if line:
                with outlock:
                    sublines = line.split('\n')
                    for subline in sublines:
                        merged_log.append(MergeLogLine(subline, self.host, self.logfile))

        ssh.close()


class MergeLogLine():
    def __init__(self, line, host, logfile):
        self._line = line
        self._host = host
        self._logfile = logfile

    def line(self):
        return self._line

    def host(self):
        return self._host

    def logfile(self):
        return self._logfile

    def dump(self):
        return self._line + ' ---> (from host = ' + self._host + ', log = ' + self._logfile + ')'

撰写回答