Python| 如何同时向sqlite数据库写入无限流并执行查询?
from TwitterAPI import TwitterAPI
import sqlite3, requests
# Create a metadata containing time start, end and duration.
# Create database
conn = sqlite3.connect('Hashtags.db') # Create connection, data stored in hashtag.db
c = conn.cursor() # create cursor
try:
# Create table Hashtags
c.execute('''CREATE TABLE Hashtags(tweet_id integer,
hashtag text)''')
# Create table Tweets
c.execute('''CREATE TABLE Tweets (tweet_id integer PRIMARY KEY,
user_id integer,
created_at text,
lat real,
long real,
in_reply_to_screen_name text,
in_reply_to_user_id integer,
favorite_count integer,
retweet_count integer)''')
# Create table Users
c.execute('''CREATE TABLE Users (user_id integer PRIMARY KEY,
screen_name text,
name text,
followers_count integer,
friends_count integer,
statuses_count integer,
verified integer)''')
except:
pass
# Login data
consumer_key= "xxxx"
consumer_secret= "xxxx"
access_token_key= "xxxx"
access_token_secret= "xxxx"
# Log in
api = TwitterAPI(consumer_key,
consumer_secret,
access_token_key,
access_token_secret)
# Specify endpoint and parameters
endpoint = 'statuses/filter'
parameters = {'locations':
"-74,40,-73,41"}
try:
tweets = api.request(endpoint, parameters)
except Exception as e:
print(e)
try:
for tweet in tweets:
conditions = ("created_at" in tweet) and (tweet['entities']['hashtags'])
# Only tweets that contain hashtags
if conditions:
# For Hashtags table
# tweet_id
tweet_id = tweet['id']
# Many hashtags, put in different rows.
for hashtags in tweet['entities']['hashtags']:
hashtag = hashtags['text']
c.execute("INSERT INTO Hashtags VALUES (?, ?)",
(tweet_id, hashtag))
# For Tweets table
# user_id
user_id = tweet['user']['id']
# created_at
created_at = tweet['created_at']
# coordinates
if tweet['coordinates'] is None:
longitude = None
latitude = None
else:
longitude, latitude = tweet['coordinates']['coordinates']
# in_reply_to_screen_name
if tweet['in_reply_to_screen_name'] is None:
in_reply_to_screen_name = None
else:
in_reply_to_screen_name = tweet['in_reply_to_screen_name']
# in_reply_to_user_id
if tweet['in_reply_to_user_id'] is None:
in_reply_to_user_id = None
else:
in_reply_to_user_id = tweet['in_reply_to_user_id']
# favorite_count
if tweet['favorite_count'] is None:
favorite_count = None
else:
favorite_count = tweet['favorite_count']
# retweet_count
if tweet['retweet_count'] is None:
retweet_count = None
else:
retweet_count = tweet['favorite_count']
# Write into Tweets table
c.execute("INSERT OR REPLACE INTO Tweets VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(tweet_id,
user_id,
created_at,
latitude,
longitude,
in_reply_to_screen_name,
in_reply_to_user_id,
favorite_count,
retweet_count))
# For Users table
# screen_name
screen_name = tweet['user']['screen_name']
# name
name = tweet['user']['name']
# followers_count
followers_count = tweet['user']['followers_count']
# friends_count
friends_count = tweet['user']['friends_count']
# statuses_count
statuses_count = tweet['user']['statuses_count']
# verified
verified = tweet['user']['verified']
c.execute("INSERT OR REPLACE INTO Users VALUES (?, ?, ?, ?, ?, ?, ?)",
(user_id,
screen_name,
name,
followers_count,
friends_count,
statuses_count,
verified))
# try-except inside for loop means, if the current iteration has exception, it will do what's in except clause and then proceed to the next iteration.
# Ctrl + C stops the program. Exception is caught.
# Any other errors are also caught. Commit the connection.
except (KeyboardInterrupt, Exception, requests.exceptions.ChunkedEncodingError) as e:
conn.commit()
conn.close()
我正在从Twitter的实时数据接口获取数据。简单来说,我创建了一个sqlite数据库,并按照代码中的定义把推文写入表格。当流数据出现错误或者我按下'Ctrl + C'时,我会执行'conn.commit()'。
我已经运行这个代码大约12个小时了,但数据库似乎还是没有更新,因为数据还没有被提交到数据库里。因此,我无法使用sqlitebrowser查询数据库。我该怎么解决这个问题呢?我希望在获取数据的同时,能够使用其他程序(比如R、sqlitebrowser等)进行查询。
1 个回答
0
如果你想要明确地提交数据,可以在一个 with
语句中使用连接对象。具体的用法可以参考官方文档:
import sqlite3
con = sqlite3.connect(":memory:")
con.execute("create table person (id integer primary key, firstname varchar unique)")
# Successful, con.commit() is called automatically afterwards
with con:
con.execute("insert into person(firstname) values (?)", ("Joe",))
# con.rollback() is called after the with block finishes with an exception, the
# exception is still raised and must be caught
try:
with con:
con.execute("insert into person(firstname) values (?)", ("Joe",))
except sqlite3.IntegrityError:
print "couldn't add Joe twice"