将CSV转换为sqlite3数据库。将列表从Utf8转换为unicode
import csv, sqlite3
conn = sqlite3.connect("mycustomers9.sql")
curs = conn.cursor()
try:
curs.execute("CREATE TABLE t (unknown1 TEXT, county TEXT, businessName TEXT, address1 TEXT, city1 TEXT, zip1 INTEGER, phone1 INTEGER,Email1 TEXT, approvalstatus TEXT, date1 TEXT, date2 TEXT, typeofConstruct TEXT, typeofBiz TEXT, unknown2 TEXT, unknown3 TEXT, unknown4 TEXT, unknown5 TEXT, unknown6 TEXT,BizName2 TEXT,Address2 TEXT, City2 TEXT,Zip2 TEXT,Country2 TEXT,Phone2 TEXT,Email2 TEXT,Phone3 TEXT);")
except sqlite3.OperationalError:
print "Table already exist"
with open('HR_plan_review.csv', 'rb') as infile:
dr = csv.DictReader(infile, delimiter = ',')
to_db = [(i["unknown1"], i['county'], i['businessName'], i['address1'], i['city1'], i['zip1'], i['phone1'], i['Email1'], i['approvalstatus'], i['date1'],i['date2'], i['typeofConstruct'], i['typeofBiz'], i['unknown2'], i['unknown3'], i['unknown4'], i['unknown5'], i['unknown6'], i['BizName2'], i['Address2'], i['City2'], i['Zip2'], i['Country2'], i['Phone2'], i['Email2'], i['Phone3']) for i in dr]
curs.executemany("INSERT INTO t (unknown1, county, businessName, address1, city1,zip1, phone1, Email1, approvalstatus, date1, date2,typeofConstruct, typeofBiz, unknown2, unknown3, unknown4,unknown5, unknown6,BizName2,Address2, City2,Zip2,Country2,Phone2,Email2,Phone3) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);", to_db)
to_db 返回的是一个用 utf-8 编码的列表,而 sqlite 数据库似乎要求格式是 unicode。在运行上面的 SQL 语句之前,我该如何将 "to_db" 列表转换为 unicode 呢?下面是我运行上述代码时收到的错误信息。
sqlite3.ProgrammingError: 你不能使用 8 位字节字符串,除非你使用一个可以解释 8 位字节字符串的 text_factory(比如 text_factory = str)。强烈建议你直接将应用程序切换到 Unicode 字符串。
根据回答的反馈进行了编辑
修订后的代码(如下)现在可以成功执行,但它没有将从 csv 中获取的值插入到数据库中。
import csv, sqlite3
conn = sqlite3.connect("mycustomers12.sql")
curs = conn.cursor()
try:
curs.execute(""" CREATE TABLE t (unknown1 TEXT, county TEXT, businessName TEXT, address1 TEXT, city1 TEXT, zip1 INTEGER, \n
phone1 INTEGER,Email1 TEXT, approvalstatus TEXT, date1 TEXT, date2 TEXT, typeofConstruct TEXT, typeofBiz TEXT, unknown2 TEXT, \n
unknown3 TEXT, unknown4 TEXT, unknown5 TEXT, unknown6 TEXT,BizName2 TEXT,Address2 TEXT, City2 TEXT,Zip2 TEXT,Country2 TEXT,\n
Phone2 TEXT,Email2 TEXT,Phone3 TEXT);""")
except sqlite3.OperationalError:
print "Table already exist"
infile = open('HR_plan_review.csv', 'rb')
dr = csv.DictReader(infile, delimiter = ',')
keys=("unknown1", 'county', 'businessName', 'address1',
'city1', 'zip1', 'phone1', 'Email1', 'approvalstatus',
'date1','date2', 'typeofConstruct', 'typeofBiz', 'unknown2',
'unknown3', 'unknown4', 'unknown5', 'unknown6', 'BizName2',
'Address2', 'City2', 'Zip2', 'Country2', 'Phone2',
'Email2', 'Phone3')
args=[tuple(key.decode('utf-8') for key in keys) for row in dr]
sql='INSERT INTO t ({f}) VALUES ({p})'.format(
f=','.join(keys),
p=','.join(['?']*len(keys)))
curs.executemany(sql, args)
3 个回答
0
你需要提交执行的内容:
conn.commit()
另外,你的 executemany 语句应该放在 "with" 这个代码块里面:
import csv, sqlite3
myfile = 'CSV FILE PATH'
conn = sqlite3.connect("DBNAME.sqlite3")
curs = conn.cursor()
try:
curs.execute("CREATE TABLE t (webrank INTEGER, term, TEXT PRIMARY KEY, gloss TEXT);")
except sqlite3.OperationalError:
print "Table already exist"
with open('{}.csv'.format(myfile), 'rb') as infile:
dr = csv.DictReader(infile, delimiter = ',')
def unicoded_data():
for row in dr:
# Assuming infile encoding is utf-8.
yield int(row['WebRank']), unicode(row['term'], encoding='utf-8'), unicode(row['gloss'], encoding='utf-8')
curs.executemany("INSERT INTO t (webrank, term, gloss) VALUES (?,?,?);", unicoded_data())
conn.commit()
0
keys=("unknown1", 'county', 'businessName', 'address1',
'city1', 'zip1', 'phone1', 'Email1', 'approvalstatus',
'date1','date2', 'typeofConstruct', 'typeofBiz', 'unknown2',
'unknown3', 'unknown4', 'unknown5', 'unknown6', 'BizName2',
'Address2', 'City2', 'Zip2', 'Country2', 'Phone2',
'Email2', 'Phone3')
args=[tuple(i[key].decode('utf-8') for key in keys) for row in dr]
sql='INSERT INTO t ({f}) VALUES ({p})'.format(
f=','.join(keys),
p=','.join(['?']*len(keys)))
curs.executemany(sql, args)
或者,如果你想要一个更强大的解决方案,可以使用UnicodeDictReader,这是对UnicodeReader(来自csv文档)的稍微修改版,它可以把每一行返回为包含Unicode值的字典:
class UTF8Recoder:
"""
Iterator that reads an encoded stream and reencodes the input to UTF-8
"""
def __init__(self, f, encoding):
self.reader = codecs.getreader(encoding)(f)
def __iter__(self):
return self
def next(self):
return self.reader.next().encode("utf-8")
class UnicodeDictReader:
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
f = UTF8Recoder(f, encoding)
self.reader = csv.DictReader(f, dialect=dialect, **kwds)
def next(self):
row = self.reader.next()
return dict((key,unicode(val, "utf-8")) for key,val in row.iteritems())
def __iter__(self):
return self
with open('HR_plan_review.csv', 'rb') as infile:
dr = UnicodeDictReader(infile, delimiter = ',')
我上面发布的代码仍然可以使用,只需要把
args=[tuple(i[key].decode('utf-8') for key in keys) for row in dr]
改成
args=[tuple(i[key] for key in keys) for row in dr]
1
很遗憾,csv
在 Python 2.7 中无法处理 Unicode 字符(也就是一些特殊的文字和符号)。不过,你可以通过把 DictReader
放在一个生成器里来解决这个问题。
with open('HR_plan_review.csv', 'rb') as infile:
dr = csv.DictReader(infile, delimiter = ',')
def unicoded_data():
for row in dr:
# Assuming infile encoding is utf-8.
yield dict([(key, unicode(value, encoding='utf-8'))
for key, value in row.iteritems()])
to_db = [(i["unknown1"], i['county'], i['businessName'], i['address1'], i['city1'], i['zip1'], i['phone1'], i['Email1'], i['approvalstatus'], i['date1'],i['date2'], i['typeofConstruct'], i['typeofBiz'], i['unknown2'], i['unknown3'], i['unknown4'], i['unknown5'], i['unknown6'], i['BizName2'], i['Address2'], i['City2'], i['Zip2'], i['Country2'], i['Phone2'], i['Email2'], i['Phone3']) for i in unicoded_data()]