如何使用Pandas处理Paramiko从内存中的远程shell命令返回的数据?

2024-04-19 16:04:23 发布

您现在位置:Python中文网/ 问答频道 /正文

我在通过Paramiko从hive导出数据时遇到问题。通常,我会在同一台服务器上执行以下操作,以替代坏行错误

with open('xxx.tsv', 'r') as temp_f:
    # get No of columns in each line
    col_count = [ len(l.split(",")) for l in temp_f.readlines() ]
### Generate column names  (names will be 0, 1, 2, ..., maximum columns - 1)
column_names = [i for i in range(0, max(col_count))]
### Read csv
df2 = pd.read_csv('xxx.tsv', header=None, 
delimiter="\t", names=column_names)
df2 = df2.rename(columns=df2.iloc[0]).drop(df2.index[0])
df2 = df2[['content_id', 'title','product_id', 'type', 'episode_total','template_model','tags_name','grade','isdeleted' ,'actor']]

现在我想做的是如何将上面的代码与下面的代码结合起来

import paramiko 
import traceback
from io import StringIO 
import pandas as pd 

host = 'xxxx'
conn_obj = paramiko.SSHClient()
conn_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())

conn_obj.connect(host, username="xxxx",
                 password='xxxx')# insert username and password

query='"select content_id as content_id, title as title,product_id as product_id, type as type, episode_total as episode_total, template_model as template_model, tags_name as tags_name,grade as grade, isdeleted as isdeleted, actor as actor from aaa.aaa;"' 
hive_query = 'beeline xxxx --outputformat=tsv2 -e '+ query 
print(hive_query)
std_in, std_out, std_err = conn_obj.exec_command(hive_query)
edge_out_str = str(std_out.read())
edge_out_str_n = "\n".join(edge_out_str.split("\\n")) 
edge_out_csv = StringIO(edge_out_str_n)
with open(edge_out_csv) as temp_f:
    #get No of columns in each line
    col_count = [ len(l.split(",")) for l in temp_f.readlines() ]
### Generate column names  (names will be 0, 1, 2, ..., maximum columns - 1)
column_names = [i for i in range(0, max(col_count))]
### Read csv
df2 = pd.read_csv(temp_f, header=None, delimiter="\t", names=column_names)
df2 = df2.rename(columns=df2.iloc[0]).drop(df2.index[0])
df2 = df2[['content_id', 'title','product_id', 'type', 'episode_total', 'template_model', 'tags_name','grade','isdeleted' ,'actor']]
conn_obj.close()

当我执行脚本时,我会得到如下错误

Error :Traceback (most recent call last):
  File "<ipython-input-13-360c6dba28e1>", line 21
    with open(edge_out_csv) as temp_f:
TypeError: expected str, bytes or os.PathLike object, not _io.StringIO

Tags: columnscsvinidobjnamesascolumn
1条回答
网友
1楼 · 发布于 2024-04-19 16:04:23

StringIO已经是类似于对象的文件。因此您使用它而不是temp_f文件

with StringIO(edge_out_str_n) as edge_out_csv:
    # get No of columns in each line
    col_count = [ len(l.split(",")) for l in edge_out_csv.readlines() ]
    ### Generate column names  (names will be 0, 1, 2, ..., maximum columns - 1)
    column_names = [i for i in range(0, max(col_count))]
    # Seek back to the beginning
    edge_out_csv.seek(0)
    ### Read csv
    df2 = pd.read_csv(temp_f, header=None, delimiter="\t", names=column_names)

强制性警告:不要使用AutoAddPolicy–这样做会失去对MITM attacks的保护。有关正确的解决方案,请参见Paramiko "Unknown Server"

相关问题 更多 >