使用sqlite、sqlalchemy和python在数据库中重复插入的问题
我正在学习Python,通过网上的资源和这个网站上的朋友们的帮助,我渐渐掌握了它。在我的第一个脚本中,我正在解析Twitter的RSS源,并把结果插入到数据库里,但还有一个问题我解决不了。也就是说,有重复的条目被插入到其中一个表里。
先说一下背景,我最开始在HalOtis.com找到了一个基础脚本,用来下载RSS源,然后我对它做了几处修改:1)修改了以适应Twitter RSS源的一些特殊情况(它没有分开内容、标题、网址等);2)增加了“标签”的表,以及用于多对多关系的表(entry_tag表);3)把表的设置改成了sqlalchemy;4)做了一些临时的修改,以解决出现的奇怪的unicode问题。因此,代码在某些地方看起来很乱,但这是一个很好的学习经历,现在它能正常工作了——除了在“entries”表中不断插入重复数据。
因为我不确定什么对大家最有帮助,所以我把整个代码粘贴在下面,并在几个地方加了一些注释,指出我认为最重要的部分。
我非常感谢任何帮助。谢谢!
编辑:有人建议我提供数据库的结构。我以前从没做过这个,所以如果我做的不对,请多包涵。我正在设置四个表:
- RSSFeeds,包含Twitter RSS源的列表
- RSSEntries,包含从每个源下载的单个条目的列表(解析后),有内容、标签、日期、网址等列
- Tags,包含在单个条目(推文)中找到的所有标签的列表
- entry_tag,包含允许我将标签映射到条目的列。
简而言之,下面的脚本从RSS Feeds表中获取五个测试RSS源,下载每个源的20个最新条目/推文,解析这些条目,并把信息放入RSS Entries、Tags和entry_tag表中。
#!/usr/local/bin/python
import sqlite3
import threading
import time
import Queue
from time import strftime
import re
from string import split
import feedparser
from django.utils.encoding import smart_str, smart_unicode
from sqlalchemy import schema, types, ForeignKey, select, orm
from sqlalchemy import create_engine
engine = create_engine('sqlite:///test98.sqlite', echo=True)
metadata = schema.MetaData(engine)
metadata.bind = engine
def now():
return datetime.datetime.now()
#set up four tables, with many-to-many relationship
RSSFeeds = schema.Table('feeds', metadata,
schema.Column('id', types.Integer,
schema.Sequence('feeds_seq_id', optional=True), primary_key=True),
schema.Column('url', types.VARCHAR(1000), default=u''),
)
RSSEntries = schema.Table('entries', metadata,
schema.Column('id', types.Integer,
schema.Sequence('entries_seq_id', optional=True), primary_key=True),
schema.Column('feed_id', types.Integer, schema.ForeignKey('feeds.id')),
schema.Column('short_url', types.VARCHAR(1000), default=u''),
schema.Column('content', types.Text(), nullable=False),
schema.Column('hashtags', types.Unicode(255)),
schema.Column('date', types.String()),
)
tag_table = schema.Table('tag', metadata,
schema.Column('id', types.Integer,
schema.Sequence('tag_seq_id', optional=True), primary_key=True),
schema.Column('tagname', types.Unicode(20), nullable=False, unique=True),
)
entrytag_table = schema.Table('entrytag', metadata,
schema.Column('id', types.Integer,
schema.Sequence('entrytag_seq_id', optional=True), primary_key=True),
schema.Column('entryid', types.Integer, schema.ForeignKey('entries.id')),
schema.Column('tagid', types.Integer, schema.ForeignKey('tag.id')),
)
metadata.create_all(bind=engine, checkfirst=True)
# Insert test set of Twitter RSS feeds
stmt = RSSFeeds.insert()
stmt.execute(
{'url': 'http://twitter.com/statuses/user_timeline/14908909.rss'},
{'url': 'http://twitter.com/statuses/user_timeline/52903246.rss'},
{'url': 'http://twitter.com/statuses/user_timeline/41902319.rss'},
{'url': 'http://twitter.com/statuses/user_timeline/29950404.rss'},
{'url': 'http://twitter.com/statuses/user_timeline/35699859.rss'},
)
#These 3 lines for threading process (see HalOtis.com for example)
THREAD_LIMIT = 20
jobs = Queue.Queue(0)
rss_to_process = Queue.Queue(THREAD_LIMIT)
#connect to sqlite database and grab the 5 test RSS feeds
conn = engine.connect()
feeds = conn.execute('SELECT id, url FROM feeds').fetchall()
#This block contains all the parsing and DB insertion
def store_feed_items(id, items):
""" Takes a feed_id and a list of items and stores them in the DB """
for entry in items:
conn.execute('SELECT id from entries WHERE short_url=?', (entry.link,))
#note: entry.summary contains entire feed entry for Twitter,
#i.e., not separated into content, etc.
s = unicode(entry.summary)
test = s.split()
tinyurl2 = [i for i in test if i.startswith('http://')]
hashtags2 = [i for i in s.split() if i.startswith('#')]
content2 = ' '.join(i for i in s.split() if i not in tinyurl2+hashtags2)
content = unicode(content2)
tinyurl = unicode(tinyurl2)
hashtags = unicode (hashtags2)
print hashtags
date = strftime("%Y-%m-%d %H:%M:%S",entry.updated_parsed)
#Insert parsed feed data into entries table
#THIS IS WHERE DUPLICATES OCCUR
result = conn.execute(RSSEntries.insert(), {'feed_id': id, 'short_url': tinyurl,
'content': content, 'hashtags': hashtags, 'date': date})
entry_id = result.last_inserted_ids()[0]
#Look up tag identifiers and create any that don't exist:
tags = tag_table
tag_id_query = select([tags.c.tagname, tags.c.id], tags.c.tagname.in_(hashtags2))
tag_ids = dict(conn.execute(tag_id_query).fetchall())
for tag in hashtags2:
if tag not in tag_ids:
result = conn.execute(tags.insert(), {'tagname': tag})
tag_ids[tag] = result.last_inserted_ids()[0]
#insert data into entrytag table
if hashtags2: conn.execute(entrytag_table.insert(),
[{'entryid': entry_id, 'tagid': tag_ids[tag]} for tag in hashtags2])
#Rest of file completes the threading process
def thread():
while True:
try:
id, feed_url = jobs.get(False) # False = Don't wait
except Queue.Empty:
return
entries = feedparser.parse(feed_url).entries
rss_to_process.put((id, entries), True) # This will block if full
for info in feeds: # Queue them up
jobs.put([info['id'], info['url']])
for n in xrange(THREAD_LIMIT):
t = threading.Thread(target=thread)
t.start()
while threading.activeCount() > 1 or not rss_to_process.empty():
# That condition means we want to do this loop if there are threads
# running OR there's stuff to process
try:
id, entries = rss_to_process.get(False, 1) # Wait for up to a second
except Queue.Empty:
continue
store_feed_items(id, entries)
1 个回答
看起来你把SQLAlchemy这个工具加到了一个原本不使用它的脚本里。这里面涉及的东西太多了,大家似乎都不太明白。
我建议你从头开始。先别用线程,也别用SQLAlchemy。开始的时候甚至可以不使用SQL数据库。写一个简单的脚本,尽量用最简单的方式收集你想要的信息,使用简单的循环和可能的time.sleep()。等这个能正常工作后,再考虑把数据存到SQL数据库里。我觉得直接写SQL语句并不比用ORM工具难,而且调试起来更简单。很有可能你根本不需要用到线程。
“如果你觉得自己聪明到可以写多线程程序,那你其实并不聪明。” -- James Ahlstrom。