如何在发生错误时重启tweepy脚本?
我有一个Python脚本,它会不断地把与我关注的关键词相关的推文存储到一个文件里。不过,这个脚本经常因为下面的错误而崩溃。我想知道怎么修改这个脚本,让它能自动重启。我看到过很多解决方案,包括这个(在异常后重启程序),但我不太确定怎么在我的脚本中实现。
import sys
import tweepy
import json
import os
consumer_key=""
consumer_secret=""
access_key = ""
access_secret = ""
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)
# directory that you want to save the json file
os.chdir("C:\Users\json_files")
# name of json file you want to create/open and append json to
save_file = open("12may.json", 'a')
class CustomStreamListener(tweepy.StreamListener):
def __init__(self, api):
self.api = api
super(tweepy.StreamListener, self).__init__()
# self.list_of_tweets = []
def on_data(self, tweet):
print tweet
save_file.write(str(tweet))
def on_error(self, status_code):
print >> sys.stderr, 'Encountered error with status code:', status_code
return True # Don't kill the stream
print "Stream restarted"
def on_timeout(self):
print >> sys.stderr, 'Timeout...'
return True # Don't kill the stream
print "Stream restarted"
sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
sapi.filter(track=["test"])
===========================================================================
Traceback (most recent call last):
File "C:\Users\tweets_to_json.py", line 41, in <module>
sapi.filter(track=["test"])
File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 316, in filter
self._start(async)
File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 235, in _start
self._run()
File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 165, in _run
self._read_loop(resp)
File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 206, in _read_loop
for c in resp.iter_content():
File "C:\Python27\lib\site-packages\requests-1.2.3-py2.7.egg\requests\models.py", line 541, in generate
chunk = self.raw.read(chunk_size, decode_content=True)
File "C:\Python27\lib\site-packages\requests-1.2.3-py2.7.egg\requests\packages\urllib3\response.py", line 171, in read
data = self._fp.read(amt)
File "C:\Python27\lib\httplib.py", line 543, in read
return self._read_chunked(amt)
File "C:\Python27\lib\httplib.py", line 603, in _read_chunked
value.append(self._safe_read(amt))
File "C:\Python27\lib\httplib.py", line 660, in _safe_read
raise IncompleteRead(''.join(s), amt)
IncompleteRead: IncompleteRead(0 bytes read, 1 more expected)
5 个回答
我写了一个使用tweepy的双进程流式处理程序。它会下载数据、压缩数据,然后把数据存储到文件里,这些文件每小时会轮换一次。这个程序每小时会重启一次,并且可以定期检查流式处理的状态,看看有没有下载到新的推文。如果没有下载到新的推文,它就会重启整个系统。
代码可以在这里找到。需要注意的是,它在压缩数据时使用了管道。如果不需要压缩,修改源代码是很简单的。
一个选择是尝试使用 模块 multiprocessing
。我认为这样做有两个原因。
- 可以在设定的时间内运行这个进程,而不需要“杀掉”整个脚本或进程。
- 你可以把它放在一个循环里,这样每当它崩溃或者你选择结束它时,它就会重新启动。
我采取了完全不同的方法,但这部分是因为我在定期(或者说是应该定期)保存我的推文。@Eugeune Yan,我觉得使用try except是一种简单而优雅的解决方案。虽然,希望有人能对此发表意见;用这种方法你并不真正知道它什么时候失败了,或者是否失败了,但我不知道这是否真的重要(而且写几行代码来实现这一点也很简单)。
import tiipWriter #Twitter & Textfile writer I wrote with Tweepy.
from add import ThatGuy # utility to supply log file names that won't overwrite old ones.
import multiprocessing
if __name__ == '__main__':
#number of time increments script needs to run
n = 60
dir = "C:\\Temp\\stufffolder\\twiitlog"
list = []
print "preloading logs"
ThatGuy(n,dir,list) #Finds any existing logs in the folder and one-ups it
for a in list:
print "Collecting Tweets....."
# this is my twitter/textfile writer process
p = multiprocessing.Process(target=tiipWriter.tiipWriter,args = (a,))
p.start()
p.join(1800) # num of seconds the process will run
if p.is_alive():
print " \n Saving Twitter Stream log @ " + str(a)
p.terminate()
p.join()
a = open(a,'r')
a.close()
if a.closed == True:
print "File successfully closed"
else: a.close()
print "jamaica" #cuz why not
使用递归调用比无限循环要好。看看下面的filter函数,举个例子。
from tweepy import Stream
from service.twitter.listener.tweety_listener import TweetyStreamDataListener
from settings import twitter_config
class Tweety(object):
def __init__(self, listener=TweetyStreamDataListener()):
self.listener = listener
self.__auth__ = None
def __authenticate__(self):
from tweepy import OAuthHandler
if self.__auth__ is None:
self.__auth__ = OAuthHandler(twitter_config['consumer_key'], twitter_config['consumer_secret'])
self.__auth__.set_access_token(twitter_config['access_token'], twitter_config['access_token_secret'])
return self.__auth__ is not None
def __streamer__(self):
is_authenticated = self.__authenticate__()
if is_authenticated:
return Stream(self.__auth__, self.listener)
return None
def filter(self, keywords=None, async=True):
streamer = self.__streamer__()
try:
print "[STREAM] Started steam"
streamer.filter(track=keywords, async=async)
except Exception as ex:
print "[STREAM] Stream stopped! Reconnecting to twitter stream"
print ex.message, ex.args
self.filter(keywords=keywords, async=async)
我最近遇到了这个问题,想分享一些更详细的信息。
导致这个问题的错误是因为选择的流过滤器太宽泛了,使用了test
。这样一来,你收到的数据流速度比你能处理的快,这就会导致IncompleRead
错误。
解决这个问题的方法有两个:要么缩小搜索范围,要么使用更具体的异常处理:
from http.client import IncompleteRead
...
try:
sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
sapi.filter(track=["test"])
except IncompleRead:
pass
我找到了一个方法,可以通过为流写一个新函数来使用while/try循环:
def start_stream():
while True:
try:
sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
sapi.filter(track=["Samsung", "s4", "s5", "note" "3", "HTC", "Sony", "Xperia", "Blackberry", "q5", "q10", "z10", "Nokia", "Lumia", "Nexus", "LG", "Huawei", "Motorola"])
except:
continue
start_stream()
我通过手动按下CMD + C来中断程序,测试了自动重启的功能。不过,如果有更好的测试这种功能的方法,我很乐意听听。