使用精确连接数的多进程FTP上传
我用多进程的方法,能够同时把多个文件上传到一个服务器,使用了下面这两个函数:
import ftplib,multiprocessing,subprocess
def upload(t):
server=locker.server,user=locker.user,password=locker.password,service=locker.service #These all just return strings representing the various fields I will need.
ftp=ftplib.FTP(server)
ftp.login(user=user,passwd=password,acct="")
ftp.storbinary("STOR "+t.split('/')[-1], open(t,"rb"))
ftp.close() # Doesn't seem to be necessary, same thing happens whether I close this or not
def ftp_upload(t=files,server=locker.server,user=locker.user,password=locker.password,service=locker.service):
parsed_targets=parse_it(t)
ftp=ftplib.FTP(server)
ftp.login(user=user,passwd=password,acct="")
remote_files=ftp.nlst(".")
ftp.close()
files_already_on_server=[f for f in t if f.split("/")[-1] in remote_files]
files_to_upload=[f for f in t if not f in files_already_on_server]
connections_to_make=3 #The maximum connections allowed the the server is 5, and this error will pop up even if I use 1
pool=multiprocessing.Pool(processes=connections_to_make)
pool.map(upload,files_to_upload)
不过,我的问题是,我经常会遇到一些错误,比如:
File "/usr/lib/python2.7/multiprocessing/pool.py", line 227, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 528, in get
raise self._value
ftplib.error_temp: 421 Too many connections (5) from this IP
另外,有时还会出现超时错误,但我在等着它再次出现,到时候我会把它发出来。
当我使用命令行(比如“ftp -inv”,“open SERVER”,“user USERNAME PASSWORD”,“mput *.rar”)时,我不会遇到这个错误,即使我同时运行了3个这样的实例。
我看过ftplib和multiprocessing的文档,但还是搞不清楚这些错误是怎么回事。这对我来说有点麻烦,因为我经常需要备份大量数据和文件。
- 有没有什么办法可以避免这些错误,或者有没有其他方法可以让脚本完成这个任务?
- 有没有办法告诉脚本,如果遇到这个错误,它可以等一秒再继续工作?
- 有没有办法让脚本按照列表中的顺序上传文件(当然,由于速度不同,可能不会总是连续上传4个文件,但目前的顺序看起来基本是随机的)?
- 有人能解释一下为什么同时连接到这个服务器的数量比脚本请求的还要多吗?
处理这些异常似乎有效(除了偶尔会出现递归错误……我还是搞不懂那是什么情况)。
关于第3点,我并不是想要完全按顺序,只希望脚本能选择列表中的下一个文件进行上传(所以由于进程速度的不同,顺序可能不会完全连续,但相比现在几乎无序的情况,会有更少的变化)。
2 个回答
1
具体来说,回答你的问题(2),有没有办法让脚本在遇到这个错误时,等一秒钟再继续工作?
有的。
ftplib.error_temp: 421 Too many connections (5) from this IP
这是一种异常情况。你可以捕捉到它并进行处理。虽然Python不支持尾调用,这样写法并不是很好,但其实可以简单到这个程度:
def upload(t):
server=locker.server,user=locker.user,password=locker.password,service=locker.service #These all just return strings representing the various fields I will need.
try:
ftp=ftplib.FTP(server)
ftp.login(user=user,passwd=password,acct="")
ftp.storbinary("STOR "+t.split('/')[-1], open(t,"rb"))
ftp.close() # Doesn't seem to be necessary, same thing happens whether I close this or not
except ftplib.error_temp:
ftp.close()
sleep(2)
upload(t)
至于你的问题(3),如果你想这样做,就要顺序上传,而不是并行上传。
我期待你更新一下,告诉我关于(4)的答案。我想到的唯一可能是有其他进程通过FTP连接到这个IP。
4
你可以尝试在每个进程中使用一个单独的 ftp
实例:
def init(*credentials):
global ftp
server, user, password, acct = credentials
ftp = ftplib.FTP(server)
ftp.login(user=user, passwd=password, acct=acct)
def upload(path):
with open(path, 'rb') as file:
try:
ftp.storbinary("STOR " + os.path.basename(path), file)
except ftplib.error_temp as error: # handle temporary error
return path, error
else:
return path, None
def main():
# ...
pool = multiprocessing.Pool(processes=connections_to_make,
initializer=init, initargs=credentials)
for path, error in pool.imap_unordered(upload, files_to_upload):
if error is not None:
print("failed to upload %s" % (path,))