使用SSH(paramiko?)在Python中创建可配置的多线程日志文件累加器
我准备写点代码,想先把我的想法分享出来,看看有没有人给点意见。
我想在Python中创建一个类,用来同时监控(并合并)多个日志文件,这些日志文件会用在自动化测试中。我希望能在每个文件上执行'tail -f'(可能通过SSH和paramiko),每个文件用一个独立的线程来处理。然后,每隔几秒钟,就从每个线程获取输出,并把它们合并成一个文件,每行加上一个后缀,以便标识来源。这样我就可以编写分布式系统的测试,同时监控大约十台机器的日志(其中很多机器的功能相同,并且在负载均衡器后面等)。
Startup:
for machine, logfile in config_list:
create thread running tail -f on logfile on machine
create accumulator thread that:
wakes up each second and
gets all config_list stdout and merges it into one in-memory list
Test_API:
method to get/query data from the in memory accumulator.
in memory list would be the only data item needed to be synchronized
所以,我想知道:paramiko是不是合适的选择?在处理线程时有没有什么需要注意的地方(我之前从没用过Python的线程)?还有没有其他的想法?
提前谢谢大家!
欢迎分享代码片段。我完成后会更新这个帖子,提供一个可用的解决方案。我预计这个解决方案会很简单。
刚发现这个链接:使用Paramiko同时创建多个SSH连接
编辑
看了其他几个帖子后,我目前有了这个进展。它只是执行了一个tail,而不是tail -f,也没有我需要的轮询功能。
from someplace import TestLogger
import threading
import paramiko
def start_watching():
logger = TestLogger().get()
logs_to_watch = [('somemachine1', '/var/log/foo'),
('somemachine2', '/var/log/bar')]
threads = []
for machine, filename in logs_to_watch:
logger.info(machine)
logger.info(filename)
t = threading.Thread(target=workon, args=(machine, filename))
t.start()
threads.append(t)
for t in threads:
t.join()
for merge_line in merged_log:
logger.info(merge_line.dump())
outlock = threading.Lock()
merged_log = []
def workon(host, logfile):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host, username='yourusername', allow_agent=True, look_for_keys=True)
stdin, stdout, stderr = ssh.exec_command('sudo tail ' + logfile)
stdin.flush()
with outlock:
line = stdout.readline()
while line:
line = stdout.readline()
merged_log.append(MergeLogLine(line, host, logfile))
class MergeLogLine():
def __init__(self, line, host, logfile):
self._line = line
self._host = host
self._logfile = logfile
def line(self):
return self._line
def host(self):
return self._host
def logfile(self):
return self._logfile
def dump(self):
return self._line + '(from host = ' + self._host + ', log = ' + self._logfile + ')'
1 个回答
1
这件事其实挺难的。下面是一个可以运行的示例:
示例“客户端代码”:
import sys
import traceback
import tail_accumulate as ta
import time
def main(argv):
user = 'cmead'
logs_to_watch = [('somemachine1', '/var/log/bar/sample.log'),
('somemachine2', '/var/log/foo')]
tac = ta.TailAccumulateConfig(logs_to_watch, user)
try:
ta.start_watching(tac)
time.sleep(10)
for merge_line in ta.get_merged_log():
print merge_line.dump()
except Exception as e:
print traceback.format_exc()
ta.stop()
if __name__ == "__main__":
main(sys.argv[1:])
尾部累积包:
import threading
import paramiko
import select
import time
threads = []
stopFlag = None
class TailAccumulateConfig():
def __init__(self, log_list, user):
self._log_list = log_list
self._user = user
def user(self):
return self._user
def log_list(self):
return self._log_list
def start_watching(tail_accumulate_config):
global stopFlag
stopFlag = threading.Event()
for machine, filename in tail_accumulate_config.log_list():
t = LogListenWorker(stopFlag, machine, filename, tail_accumulate_config.user())
t.start()
global threads
threads.append(t)
def stop():
global stopFlag
stopFlag.set()
def get_merged_log():
with outlock:
global merged_log
temp = merged_log[:]
del merged_log[:]
return temp
outlock = threading.Lock()
merged_log = []
class LogListenWorker(threading.Thread):
def __init__(self, event, host, logfile, username):
threading.Thread.__init__(self)
self.stopped = event
self.host = host
self.logfile = logfile
self.username = username
def run(self):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.host, username=self.username)
transport = ssh.get_transport()
channel = transport.open_session()
channel.exec_command('sudo tail -f ' + self.logfile)
while not self.stopped.isSet():
try:
rl, wl, xl = select.select([channel],[],[],3.0)
if len(rl) > 0:
# Must be stdout
line = channel.recv(1024)
else:
time.sleep(1.0)
continue
except Exception as e:
break
if line:
with outlock:
sublines = line.split('\n')
for subline in sublines:
merged_log.append(MergeLogLine(subline, self.host, self.logfile))
ssh.close()
class MergeLogLine():
def __init__(self, line, host, logfile):
self._line = line
self._host = host
self._logfile = logfile
def line(self):
return self._line
def host(self):
return self._host
def logfile(self):
return self._logfile
def dump(self):
return self._line + ' ---> (from host = ' + self._host + ', log = ' + self._logfile + ')'