在指定时间后停止Tweepy流 (#行, 秒, #推文等)

3 投票
2 回答
4793 浏览
提问于 2025-04-18 12:42

我正在使用Tweepy这个工具来捕捉带有#WorldCup这个标签的实时推文,下面的代码就是实现这个功能的。它运行得很好。

class StdOutListener(StreamListener):
  ''' Handles data received from the stream. '''

  def on_status(self, status):
      # Prints the text of the tweet
      print('Tweet text: ' + status.text)

      # There are many options in the status object,
      # hashtags can be very easily accessed.
      for hashtag in status.entries['hashtags']:
          print(hashtag['text'])

      return true

    def on_error(self, status_code):
        print('Got an error with status code: ' + str(status_code))
        return True # To continue listening

    def on_timeout(self):
        print('Timeout...')
        return True # To continue listening

if __name__ == '__main__':
   listener = StdOutListener()
   auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
   auth.set_access_token(access_token, access_token_secret)

   stream = Stream(auth, listener)
   stream.filter(follow=[38744894], track=['#WorldCup'])

因为现在#WorldCup这个标签非常热门,所以搜索推文的速度很快,能在一次操作中获取到Tweepy允许的最多推文数量。不过,如果我想搜索#StackOverflow这个标签,速度可能就会慢很多。因此,我想要一种方法来停止这个推文流。我可以根据几个条件来停止,比如说收集到100条推文后停止,或者3分钟后停止,或者当输出的文本文件达到150行时停止等等。我知道socket超时的设置并不能用来实现这个功能。

我看过一个类似的问题:

Tweepy Streaming - 在收集到x条推文时停止

不过,看起来那个问题并没有使用流式API。它收集的数据也比较杂乱,而我这里的文本输出是比较整洁的。

有没有人能建议一种方法,让Tweepy在使用这种流式方法时,根据用户输入的参数来停止,而不是通过键盘中断?

谢谢!

2 个回答

1

上面的解决方案在通过标签获取推文时很有帮助,尽管在定义 getTweetByHashtag 函数时有个小错误。你用了 Listener.stopAt,而应该用 Listener.stop_at=stop_at_number。

我稍微调整了一下代码,这样你就可以轻松地设置代码运行的秒数了。

定义了新的函数 init 来帮助调整秒数,还有一个 "on_data" 函数,它包含了比 on_status 函数更多的信息。

享受吧:

from tweepy import (Stream, OAuthHandler)
from tweepy.streaming import StreamListener

class Listener(StreamListener):

    tweet_counter = 0 # Static variable

    def login(self):
        CONSUMER_KEY =
        CONSUMER_SECRET =
        ACCESS_TOKEN =
        ACCESS_TOKEN_SECRET =

        auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
        auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
        return auth

    def __init__(self, time_limit=8):
        self.start_time = time.time()
        self.limit = time_limit
        super(Listener, self).__init__()

    def on_data(self, data):
        Listener.tweet_counter += 1
        if (time.time() - self.start_time) < self.limit and Listener.tweet_counter < Listener.stop_at:
            print(str(Listener.tweet_counter)+data)
            return True
        else:
            print("Either Max number reached or time limit up at:"+ str(Listener.tweet_counter)+" outputs")
            self.saveFile.close()
            return False

    #def on_status(self, status):
        #Listener.tweet_counter += 1
        #print(str(Listener.tweet_counter) + '. Screen name = "%s" Tweet = "%s"'
              #%(status.author.screen_name, status.text.replace('\n', ' ')))

        #if Listener.tweet_counter < Listener.stop_at and (time.time() - self.start_time) < self.limit:
            #return True
        
        #else:
            #print('Max num reached or time elapsed= ' + str(Listener.tweet_counter))
            #return False

    def getTweetsByGPS(self, stop_at_number, latitude_start, longitude_start, latitude_finish, longitude_finish):
        try:
            Listener.stop_at = stop_at_number # Create static variable
            auth = self.login()
            streaming_api = Stream(auth, Listener(), timeout=60) # Socket timeout value
            streaming_api.filter(follow=None, locations=[latitude_start, longitude_start, latitude_finish, longitude_finish])
        except KeyboardInterrupt:
            print('Got keyboard interrupt')

    def getTweetsByHashtag(self, stop_at_number, hashtag):
        try:
            Listener.stop_at = stop_at_number
            auth = self.login()
            streaming_api = Stream(auth, Listener(), timeout=60)
            # Atlanta area.
            streaming_api.filter(track=[hashtag])
        except KeyboardInterrupt:
            print('Got keyboard interrupt')
   

    listener = Listener()
    #listener.getTweetsByGPS(20, -84.395198, 33.746876, -84.385585, 33.841601) # Atlanta area.
    listener.getTweetsByHashtag(1000,"hi")

你可以把 1000 这个值改成你想要的最大推文数量,把 "hi" 改成你需要查找的关键词。在 init 函数下面,把 8 的 time_limit 改成你想要的秒数。这样你可以根据自己的需求来使用。

你可以设置一个有限的时间,同时把推文数量调得很高,或者设定需要的推文数量,再给一个更高的时间值,这样就能达到你想要的数量。随你选择!Chukwu Gozie unu (上帝保佑!)

3

我解决了这个问题,所以我决定成为那个在网上自己回答自己问题的英雄。

这个问题的解决方法是使用静态的Python变量来作为计数器和停止值(比如说,抓取20条推文后停止)。现在这个是基于地理位置的搜索,但你也可以很简单地换成根据话题标签来搜索,只需要使用 getTweetsByHashtag() 方法就可以了。

#!/usr/bin/env python
from tweepy import (Stream, OAuthHandler)
from tweepy.streaming import StreamListener

class Listener(StreamListener):

    tweet_counter = 0 # Static variable

    def login(self):
        CONSUMER_KEY =
        CONSUMER_SECRET =
        ACCESS_TOKEN =
        ACCESS_TOKEN_SECRET =

        auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
        auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
        return auth

    def on_status(self, status):
        Listener.tweet_counter += 1
        print(str(Listener.tweet_counter) + '. Screen name = "%s" Tweet = "%s"'
              %(status.author.screen_name, status.text.replace('\n', ' ')))

        if Listener.tweet_counter < Listener.stop_at:
            return True
        else:
            print('Max num reached = ' + str(Listener.tweet_counter))
            return False

    def getTweetsByGPS(self, stop_at_number, latitude_start, longitude_start, latitude_finish, longitude_finish):
        try:
            Listener.stop_at = stop_at_number # Create static variable
            auth = self.login()
            streaming_api = Stream(auth, Listener(), timeout=60) # Socket timeout value
            streaming_api.filter(follow=None, locations=[latitude_start, longitude_start, latitude_finish, longitude_finish])
        except KeyboardInterrupt:
            print('Got keyboard interrupt')

    def getTweetsByHashtag(self, stop_at_number, hashtag):
        try:
            Listener.stopAt = stop_at_number
            auth = self.login()
            streaming_api = Stream(auth, Listener(), timeout=60)
            # Atlanta area.
            streaming_api.filter(track=[hashtag])
        except KeyboardInterrupt:
            print('Got keyboard interrupt')

listener = Listener()
listener.getTweetsByGPS(20, -84.395198, 33.746876, -84.385585, 33.841601) # Atlanta area.

撰写回答