Paramiko Expect - 尾随

4 投票
2 回答
12198 浏览
提问于 2025-04-30 01:39

我正在尝试查看一个日志文件的实时更新,这个功能是可以用的。但我还需要分析输出内容,查找错误等信息。我使用的是Paramiko-expect这个库的基本示例,但我不知道该怎么做。

import traceback
import paramiko
from paramikoe import SSHClientInteraction

def main():

    # Set login credentials and the server prompt
    hostname = 'server'
    username = 'username'
    password = 'xxxxxxxx'
    port = 22

    # Use SSH client to login
    try:

        # Create a new SSH client object
        client = paramiko.SSHClient()

        # Set SSH key parameters to auto accept unknown hosts
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        # Connect to the host
        client.connect(hostname, port, username, password)

        # Create a client interaction class which will interact with the host
        interact = SSHClientInteraction(client, timeout=10, display=False)

        # Send the tail command
        interact.send('tail -f /var/log/log')

        # Now let the class tail the file for us
        interact.tail(line_prefix=hostname+': ')




    except KeyboardInterrupt:
        print 'Ctrl+C interruption detected, stopping tail'
    except Exception:
        traceback.print_exc()
    finally:
        try:
            client.close()
        except:
            pass

if __name__ == '__main__':
    main()

这个功能可以在运行的控制台中实时打印日志,但通过Paramiko-expect,我无法找到方法来遍历输出内容并查找特定的值。

谁能帮帮我?

https://github.com/fgimian/paramiko-expect

另外,我希望能把日志的输出保存到本地文件中,以便将来查看历史记录。

更新:我发现这些信息是通过 sys.stdout 显示的。我对如何将输出提取成可遍历的形式或者如何修改为其他类型的输出还不太熟悉。我尝试联系这个模块的创建者,但没有得到什么回应。

这个模块离成为一个非常强大的工具只差一步,如果我能找到监控输出的办法,这个模块就会变得非常好用。请大家帮帮我!

Paramiko-Expect模块的输出部分:尾部功能:

        # Read the output one byte at a time so we can detect \n correctly
        buffer = self.channel.recv(1)

        # If we have an empty buffer, then the SSH session has been closed
        if len(buffer) == 0:
            break

        # Strip all ugly \r (Ctrl-M making) characters from the current
        # read
        buffer = buffer.replace('\r', '')

        # Add the currently read buffer to the current line output
        current_line += buffer

        # Display the last read line in realtime when we reach a \n
        # character
        if current_line.endswith('\n'):
            if line_counter and line_prefix:
                sys.stdout.write(line_prefix)
            if line_counter:
                sys.stdout.write(current_line)
                sys.stdout.flush()
            line_counter += 1
            current_line = ''

Paramiko Expect模块:

#
# Paramiko Expect
#
# Written by Fotis Gimian
# http://github.com/fgimian
#
# This library works with a Paramiko SSH channel to provide native SSH
# expect-like handling for servers.  The library may be used to interact
# with commands like 'configure' or Cisco IOS devices or with interactive
# Unix scripts or commands.
#
# You must have Paramiko installed in order to use this library.
#
import sys
import re
import socket
# Windows does not have termios
try:
    import termios
    import tty
    has_termios = True
except ImportError:
    import threading
    has_termios = False

import select


class SSHClientInteraction:
    """This class allows an expect-like interface to Paramiko which allows
    coders to interact with applications and the shell of the connected
    device.
    """

    def __init__(self, client, timeout=60, newline='\r', buffer_size=1024,
                 display=False):
        """The constructor for our SSHClientInteraction class.

        Arguments:
        client -- A Paramiko SSHClient object

        Keyword arguments:
        timeout -- THe connection timeout in seconds
        newline -- The newline character to send after each command
        buffer_size -- The amount of data (in bytes) that will be read at a
                       time after a command is run
        display -- Whether or not the output should be displayed in real-time
                   as it is being performed (especially useful when debugging)

        """
        self.channel = client.invoke_shell()
        self.newline = newline
        self.buffer_size = buffer_size
        self.display = display
        self.timeout = timeout
        self.current_output = ''
        self.current_output_clean = ''
        self.current_send_string = ''
        self.last_match = ''

    def __del__(self):
        """The destructor for our SSHClientInteraction class."""
        self.close()

    def close(self):
        """Attempts to close the channel for clean completion."""
        try:
            self.channel.close()
        except:
            pass

    def expect(self, re_strings=''):
        """This function takes in a regular expression (or regular expressions)
        that represent the last line of output from the server.  The function
        waits for one or more of the terms to be matched.  The regexes are
        matched using expression \n<regex>$ so you'll need to provide an
        easygoing regex such as '.*server.*' if you wish to have a fuzzy match.

        Keyword arguments:
        re_strings -- Either a regex string or list of regex strings that
                      we should expect.  If this is not specified, then
                      EOF is expected (i.e. the shell is completely closed
                      after the exit command is issued)

        Returns:
        - EOF: Returns -1
        - Regex String: When matched, returns 0
        - List of Regex Strings: Returns the index of the matched string as
                                 an integer
        """

        # Set the channel timeout
        self.channel.settimeout(self.timeout)

        # Create an empty output buffer
        self.current_output = ''

        # This function needs all regular expressions to be in the form of a
        # list, so if the user provided a string, let's convert it to a 1
        # item list.
        if len(re_strings) != 0 and isinstance(re_strings, str):
            re_strings = [re_strings]

        # Loop until one of the expressions is matched or loop forever if
        # nothing is expected (usually used for exit)
        while (
            len(re_strings) == 0 or
            not [re_string
                 for re_string in re_strings
                 if re.match('.*\n' + re_string + '$',
                             self.current_output, re.DOTALL)]
        ):

            # Read some of the output
            buffer = self.channel.recv(self.buffer_size)

            # If we have an empty buffer, then the SSH session has been closed
            if len(buffer) == 0:
                break

            # Strip all ugly \r (Ctrl-M making) characters from the current
            # read
            buffer = buffer.replace('\r', '')

            # Display the current buffer in realtime if requested to do so
            # (good for debugging purposes)
            if self.display:
                sys.stdout.write(buffer)
                sys.stdout.flush()

            # Add the currently read buffer to the output
            self.current_output += buffer

        # Grab the first pattern that was matched
        if len(re_strings) != 0:
            found_pattern = [(re_index, re_string)
                             for re_index, re_string in enumerate(re_strings)
                             if re.match('.*\n' + re_string + '$',
                                         self.current_output, re.DOTALL)]

        self.current_output_clean = self.current_output

        # Clean the output up by removing the sent command
        if len(self.current_send_string) != 0:
            self.current_output_clean = (
                self.current_output_clean.replace(
                    self.current_send_string + '\n', ''))

        # Reset the current send string to ensure that multiple expect calls
        # don't result in bad output cleaning
        self.current_send_string = ''

        # Clean the output up by removing the expect output from the end if
        # requested and save the details of the matched pattern
        if len(re_strings) != 0:
            self.current_output_clean = (
                re.sub(found_pattern[0][1] + '$', '',
                       self.current_output_clean))
            self.last_match = found_pattern[0][1]
            return found_pattern[0][0]
        else:
            # We would socket timeout before getting here, but for good
            # measure, let's send back a -1
            return -1

    def send(self, send_string):
        """Saves and sends the send string provided"""
        self.current_send_string = send_string
        self.channel.send(send_string + self.newline)

    def tail(self, line_prefix=None):
        """This function takes control of an SSH channel and displays line
        by line of output as \n is recieved.  This function is specifically
        made for tail-like commands.

        Keyword arguments:
        line_prefix -- Text to append to the left of each line of output.
                       This is especially useful if you are using my
                       MultiSSH class to run tail commands over multiple
                       servers.

        """

        # Set the channel timeout to the maximum integer the server allows,
        # setting this to None breaks the KeyboardInterrupt exception and
        # won't allow us to Ctrl+C out of teh script
        self.channel.settimeout(sys.maxint)

        # Create an empty line buffer and a line counter
        current_line = ''
        line_counter = 0

        # Loop forever, Ctrl+C (KeyboardInterrupt) is used to break the tail
        while True:

            # Read the output one byte at a time so we can detect \n correctly
            buffer = self.channel.recv(1)

            # If we have an empty buffer, then the SSH session has been closed
            if len(buffer) == 0:
                break

            # Strip all ugly \r (Ctrl-M making) characters from the current
            # read
            buffer = buffer.replace('\r', '')

            # Add the currently read buffer to the current line output
            current_line += buffer

            # Display the last read line in realtime when we reach a \n
            # character
            if current_line.endswith('\n'):
                if line_counter and line_prefix:
                    sys.stdout.write(line_prefix)
                if line_counter:
                    sys.stdout.write(current_line)
                    sys.stdout.flush()
                line_counter += 1
                current_line = ''

    def take_control(self):
        """This function is a better documented and touched up version of the
        posix_shell function found in the interactive.py demo script that
        ships with Paramiko"""

        if has_termios:
            # Get attributes of the shell you were in before going to the
            # new one
            original_tty = termios.tcgetattr(sys.stdin)
            try:
                tty.setraw(sys.stdin.fileno())
                tty.setcbreak(sys.stdin.fileno())

                # We must set the timeout to 0 so that we can bypass times when
                # there is no available text to receive
                self.channel.settimeout(0)

                # Loop forever until the user exits (i.e. read buffer is empty)
                while True:
                    select_read, select_write, select_exception = (
                        select.select([self.channel, sys.stdin], [], []))
                    # Read any output from the terminal and print it to the
                    # screen.  With timeout set to 0, we just can ignore times
                    # when there's nothing to receive.
                    if self.channel in select_read:
                        try:
                            buffer = self.channel.recv(self.buffer_size)
                            if len(buffer) == 0:
                                break
                            sys.stdout.write(buffer)
                            sys.stdout.flush()
                        except socket.timeout:
                            pass
                    # Send any keyboard input to the terminal one byte at a
                    # time
                    if sys.stdin in select_read:
                        buffer = sys.stdin.read(1)
                        if len(buffer) == 0:
                            break
                        self.channel.send(buffer)
            finally:
                # Restore the attributes of the shell you were in
                termios.tcsetattr(sys.stdin, termios.TCSADRAIN, original_tty)
        else:
            def writeall(sock):
                while True:
                    buffer = sock.recv(self.buffer_size)
                    if len(buffer) == 0:
                        break
                    sys.stdout.write(buffer)
                    sys.stdout.flush()

            writer = threading.Thread(target=writeall, args=(self.channel,))
            writer.start()

            try:
                while True:
                    buffer = sys.stdin.read(1)
                    if len(buffer) == 0:
                        break
                    self.channel.send(buffer)
            # User has hit Ctrl+Z or F6
            except EOFError:
                pass
暂无标签

2 个回答

6

我是paramiko-expect的作者。

我刚刚在我的模块的0.2版本中实现了一个新功能,这个功能允许你在tail方法中指定一个回调函数,这样你就可以根据自己的需要处理当前行的内容。你可以用它来筛选输出,或者在显示之前进一步处理它。预计这个回调函数会返回一个字符串,这个字符串会在处理完当前行后发送到sys.stdout.write。

这里有一个例子:

import traceback
import paramiko
from paramikoe import SSHClientInteraction


def process_tail(line_prefix, current_line):
    if current_line.startswith('hello'):
        return current_line
    else:
        return ''


def main():

    # Set login credentials and the server prompt
    hostname = 'localhost'
    username = 'fots'
    password = 'password123'
    port = 22

    # Use SSH client to login
    try:

        # Create a new SSH client object
        client = paramiko.SSHClient()

        # Set SSH key parameters to auto accept unknown hosts
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        # Connect to the host
        client.connect(hostname, port, username, password)

        # Create a client interaction class which will interact with the host
        interact = SSHClientInteraction(client, timeout=10, display=False)

        # Send the tail command
        interact.send('tail -f /home/fots/something.log')

        # Now let the class tail the file for us
        interact.tail(line_prefix=hostname+': ', callback=process_tail)

    except KeyboardInterrupt:
        print 'Ctrl+C interruption detected, stopping tail'
    except Exception:
        traceback.print_exc()
    finally:
        try:
            client.close()
        except:
            pass

if __name__ == '__main__':
    main()
3

这个问题在三个不同的网站上已经发布快24小时了,但居然没有人给出一个真正的答案,我有点震惊。我尝试把脚本当作一个子进程运行,然后把输出通过管道传给标准输出,结果成功了:

import subprocess
proc = subprocess.Popen(['python','tail_try.py'],stdout=subprocess.PIPE)
for line in iter(proc.stdout.readline, ''):
        print line

现在我可以控制每一行打印的输出了,使用的是 proc.stdout.readline

其实这个答案真的很简单。

撰写回答