Python| 如何同时向sqlite数据库写入无限流并执行查询?

1 投票
1 回答
2624 浏览
提问于 2025-04-18 11:40
from TwitterAPI import TwitterAPI
import sqlite3, requests

# Create a metadata containing time start, end and duration.

# Create database    
conn = sqlite3.connect('Hashtags.db') # Create connection, data stored in hashtag.db
c = conn.cursor() # create cursor

try:

    # Create table Hashtags
    c.execute('''CREATE TABLE Hashtags(tweet_id integer, 
                                       hashtag text)''')

    # Create table Tweets
    c.execute('''CREATE TABLE Tweets (tweet_id integer PRIMARY KEY, 
                                      user_id integer, 
                                      created_at text, 
                                      lat real, 
                                      long real, 
                                      in_reply_to_screen_name text, 
                                      in_reply_to_user_id integer, 
                                      favorite_count integer, 
                                      retweet_count integer)''')

    # Create table Users
    c.execute('''CREATE TABLE Users (user_id integer PRIMARY KEY, 
                                     screen_name text, 
                                     name text, 
                                     followers_count integer, 
                                     friends_count integer, 
                                     statuses_count integer, 
                                     verified integer)''')

except:
    pass

# Login data
consumer_key= "xxxx"
consumer_secret= "xxxx"
access_token_key= "xxxx"
access_token_secret= "xxxx"

# Log in
api = TwitterAPI(consumer_key,
                 consumer_secret,
                 access_token_key,
                 access_token_secret)

# Specify endpoint and parameters
endpoint = 'statuses/filter'
parameters = {'locations': 
              "-74,40,-73,41"}

try:
    tweets = api.request(endpoint, parameters)
except Exception as e:
    print(e)        

try:
    for tweet in tweets:
        conditions = ("created_at" in tweet) and (tweet['entities']['hashtags'])

        # Only tweets that contain hashtags
        if conditions:

            # For Hashtags table

            # tweet_id
            tweet_id = tweet['id']

            # Many hashtags, put in different rows.
            for hashtags in tweet['entities']['hashtags']:
                hashtag = hashtags['text']
                c.execute("INSERT INTO Hashtags VALUES (?, ?)", 
                         (tweet_id, hashtag))


            # For Tweets table

            # user_id
            user_id = tweet['user']['id']

            # created_at
            created_at = tweet['created_at']

            # coordinates
            if tweet['coordinates'] is None:
                longitude = None
                latitude = None
            else:
                longitude, latitude = tweet['coordinates']['coordinates']

            # in_reply_to_screen_name
            if tweet['in_reply_to_screen_name'] is None:
                in_reply_to_screen_name = None
            else:
                in_reply_to_screen_name = tweet['in_reply_to_screen_name']

            # in_reply_to_user_id
            if tweet['in_reply_to_user_id'] is None:
                in_reply_to_user_id = None
            else:
                in_reply_to_user_id = tweet['in_reply_to_user_id']

            # favorite_count    
            if tweet['favorite_count'] is None:
                favorite_count = None
            else:
                favorite_count = tweet['favorite_count']

            # retweet_count    
            if tweet['retweet_count'] is None:
                retweet_count = None
            else:
                retweet_count = tweet['favorite_count']        

            # Write into Tweets table
            c.execute("INSERT OR REPLACE INTO Tweets VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
                     (tweet_id, 
                      user_id, 
                      created_at, 
                      latitude, 
                      longitude, 
                      in_reply_to_screen_name, 
                      in_reply_to_user_id, 
                      favorite_count, 
                      retweet_count))


            # For Users table

            # screen_name
            screen_name = tweet['user']['screen_name']

            # name
            name = tweet['user']['name']

            # followers_count
            followers_count = tweet['user']['followers_count']

            # friends_count
            friends_count = tweet['user']['friends_count']

            # statuses_count
            statuses_count = tweet['user']['statuses_count']

            # verified
            verified = tweet['user']['verified']

            c.execute("INSERT OR REPLACE INTO Users VALUES (?, ?, ?, ?, ?, ?, ?)",
                     (user_id,
                      screen_name,
                      name,
                      followers_count,
                      friends_count,
                      statuses_count,
                      verified))

# try-except inside for loop means, if the current iteration has exception, it will do what's in except clause and then proceed to the next iteration.

# Ctrl + C stops the program. Exception is caught.
# Any other errors are also caught. Commit the connection.         
except (KeyboardInterrupt, Exception, requests.exceptions.ChunkedEncodingError) as e:
    conn.commit()
    conn.close()

我正在从Twitter的实时数据接口获取数据。简单来说,我创建了一个sqlite数据库,并按照代码中的定义把推文写入表格。当流数据出现错误或者我按下'Ctrl + C'时,我会执行'conn.commit()'。

我已经运行这个代码大约12个小时了,但数据库似乎还是没有更新,因为数据还没有被提交到数据库里。因此,我无法使用sqlitebrowser查询数据库。我该怎么解决这个问题呢?我希望在获取数据的同时,能够使用其他程序(比如R、sqlitebrowser等)进行查询。

1 个回答

0

如果你想要明确地提交数据,可以在一个 with 语句中使用连接对象。具体的用法可以参考官方文档

import sqlite3

con = sqlite3.connect(":memory:")
con.execute("create table person (id integer primary key, firstname varchar unique)")

# Successful, con.commit() is called automatically afterwards
with con:
    con.execute("insert into person(firstname) values (?)", ("Joe",))

# con.rollback() is called after the with block finishes with an exception, the
# exception is still raised and must be caught
try:
    with con:
        con.execute("insert into person(firstname) values (?)", ("Joe",))
except sqlite3.IntegrityError:
    print "couldn't add Joe twice"

撰写回答