使用Paramiko进行DSA密钥转发?

3 投票
2 回答
1989 浏览
提问于 2025-04-18 02:19

我正在使用Paramiko在远程服务器上执行bash脚本。在这些脚本中,有些会连接到其他服务器。如果我只用bash,不用Python,我的DSA密钥会被转发,并且第一个远程服务器上的bash脚本可以用这个密钥连接到第二个远程服务器。但是当我使用Paramiko时,就不行了。

这是一个bash的例子:

Jean@mydesktop:~ & ssh root@firstserver
root@firstserver:~ # ssh root@secondserver hostname
secondserver.mydomain.org

使用Paramiko的情况:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import paramiko

class SSHSession:


    def __init__(self, server_address, user='root', port=22):
        self.connected = False
        self.server_address = server_address
        self.user           = user
        self.port           = port


    def connect(self, clear_channel=True):
        try:
            if self.server_address == None:
                raise ValueError('No hostname')
        except:
            raise ValueError('No hostname')
        else:
            try:
                self.ssh_client = paramiko.SSHClient()
                self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
                self.ssh_client.connect(self.server_address, username=self.user)
                #self.transport = self.ssh_client.get_transport()
                #self.channel   = self.transport.open_forward_agent_channel()
                self.channel   = self.ssh_client.invoke_shell()
            except:
                self.connected = False
                return False
            else:
                self.connected = True
                return True

    def exec_command(self, command, newline='\r'):
        if not self.connected:
            raise Exception('Not connected')
        else:
            timeout = 31536000 # 365 days in seconds
            self.channel.settimeout(timeout)
            line_buffer    = ''
            channel_buffer = ''
            end_string     = 'CLIENT_EXPECT_CMD_OK'
            print('[SEND   ] >>', command)
            self.channel.send(command + ' ; echo ' + end_string + newline)
            while True:
                channel_buffer = self.channel.recv(1).decode('UTF-8')
                if len(channel_buffer) == 0:
                    raise Exception('connection lost with server: ' + self.server_address)
                    break 
                channel_buffer  = channel_buffer.replace('\r', '')
                if channel_buffer != '\n':
                    line_buffer += channel_buffer
                else:
                    if line_buffer == end_string:
                        break
                    print('[RECEIVE] <<', line_buffer)
                    line_buffer   = ''


    def disconnect(self):
        self.ssh_client.close()


    def __enter__(self):
        self.connect()
        return self


    def __exit__(self, _type, value, traceback):
        self.disconnect()


if __name__ == "__main__":
    server_address = 'firstserver'
    ssh_user       = 'root'
    with SSHSession(server_address) as ssh_session:
        ssh_session.exec_command('hostname')
        ssh_session.exec_command('ssh root@secondserver hostname')

输出结果是:

[SEND   ] >> hostname
[RECEIVE] << [root@firstserver ~]# hostname ; echo CLIENT_EXPECT_CMD_OK
[RECEIVE] << firstserver.mydomain.fr
[SEND   ] >> ssh root@secondserver hostname
[RECEIVE] << [root@firstserver ~]# ssh root@secondserver hostname ; echo CLIENT_EXPECT_CMD_OK
[RECEIVE] << Permission denied (publickey,gssapi-keyex,gssapi-with-mic).

我尝试过:

self.transport = self.ssh_client.get_transport()
self.channel   = self.transport.open_forward_agent_channel()

而不是:

self.channel   = self.ssh_client.invoke_shell()

但是我遇到了一个错误:

paramiko.ssh_exception.ChannelException: Administratively prohibited

有没有人知道这是否可能?我看到有讨论说可以,但我还是找不到具体怎么做。

2 个回答

0

在连接的时候,你应该设置 key_filename

key_filename(字符串或字符串列表) - 这是一个文件名,或者是一个文件名的列表,用来尝试进行身份验证的可选私钥。

4

好的,现在可以用了。我找到了一些很有帮助的帖子:

添加 paramiko ssh 代理转发(可选) #4100
open_forward_agent_channel 与 open_session #89

最终的代码是:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import paramiko

class SSHSession:


    def __init__(self, server_address, user='root', port=22):
        self.connected      = False
        self.server_address = server_address
        self.user           = user
        self.port           = port


    def connect(self):
        try:
            self.ssh_client = paramiko.SSHClient()
            self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.ssh_client.connect(self.server_address, username=self.user)
            self.transport     = self.ssh_client.get_transport()
            self.agent_channel = self.transport.open_session()
            self.agent_handler = paramiko.agent.AgentRequestHandler(self.agent_channel)
            self.channel       = self.ssh_client.invoke_shell()
        except:
            self.connected = False
        else:
            self.connected = True
        return self.connected

    def exec_command(self, command, newline='\r'):
        if not self.connected:
            raise Exception('Not connected')
        else:
            timeout = 31536000 # 365 days in seconds
            self.channel.settimeout(timeout)
            line_buffer    = ''
            channel_buffer = ''
            end_string     = 'CLIENT_EXPECT_CMD_OK'
            print('[SEND   ] >>', command)
            self.channel.send(command + ' ; echo ' + end_string + newline)
            while True:
                channel_buffer = self.channel.recv(1).decode('UTF-8')
                if len(channel_buffer) == 0:
                    raise Exception('connection lost with server: ' +     self.server_address)
                    break 
                channel_buffer  = channel_buffer.replace('\r', '')
                if channel_buffer != '\n':
                    line_buffer += channel_buffer
                else:
                    if line_buffer == end_string:
                        break
                    print('[RECEIVE] <<', line_buffer)
                    line_buffer   = ''


    def disconnect(self):
        self.ssh_client.close()


    def __enter__(self):
        self.connect()
        return self


    def __exit__(self, _type, value, traceback):
        self.disconnect()


if __name__ == "__main__":
    server_address = 'firstserver'
    ssh_user       = 'root'
    with SSHSession(server_address) as ssh_session:
        ssh_session.exec_command('hostname')
        ssh_session.exec_command('ssh root@secondserver hostname')

输出结果是:

[SEND   ] >> hostname
[RECEIVE] << [root@firstserver ~]# hostname ; echo CLIENT_EXPECT_CMD_OK
[RECEIVE] << firstserver.mydomain.fr
[SEND   ] >> ssh root@secondserver hostname
[RECEIVE] << [root@firstserver ~]# ssh root@secondserver hostname ; echo CLIENT_EXPECT_CMD_OK
[RECEIVE] << secondserver.mydomain.fr

启用代理转发的代码重要部分是:

self.agent_channel = self.transport.open_session()
self.agent_handler = paramiko.agent.AgentRequestHandler(self.agent_channel)

撰写回答