使用Asyncio的非阻塞PostgreSQL查询生成器
windyquer的Python项目详细描述
windyquery-一个非阻塞python postgresql查询生成器
windyquery是一个带有asyncio的非阻塞postgresql查询生成器。
安装
$ pip install windyquery
连接
importasynciofromwindyqueryimportDB,Schema# create DB connection for CRUD operatonsdb=DB()asyncio.get_event_loop().run_until_complete(db.connect('db_name',{'host':'localhost','port':'5432','database':'db_name','username':'db_user_name','password':'db_user_password'},default=True))# create DB connection for migration operationsschema=Schema()asyncio.get_event_loop().run_until_complete(schema.connect('db_name',{'host':'localhost','port':'5432','database':'db_name','username':'db_user_name','password':'db_user_password'},default=True,min_size=1,max_size=1))
积垢示例
# SELECTresult=awaitdb.table('users').select().first()result['name']# INSERTawaitdb.table('users').insert({'email':'test1@example.com','password':'my precious'},{'email':'test2@example.com','password':'my precious'})# UPDATEawaitdb.table('users').where('id',2).update({'name':'new name'})# DELETEawaitdb.table('users').where('id',2).delete()
迁移示例
# CREATE TABLEawaitschema.create('users',schema.column('id').serial().primary_key(),schema.column('email').string().nullable(False).unique(),schema.column('password').string().nullable(False),schema.column('created_at').timestamp().nullable(False).default("NOW()"),)# ADD TABLE COLUMNawaitschema.table('users',schema.column('admin').boolean().nullable(False).default(False))# DROP TABLE COLUMNawaitschema.table('users',schema.column('admin').drop(),)# ADD INDEXawaitschema.table('users',schema.index('email','created_at'),)# DROP INDEXawaitschema.dropIndex('users_email_created_at_idx')
jsonb示例
# Create JSONBawaitschema.create('users',schema.column('id').serial().primary_key(),schema.column('data').jsonb(),)# Insert JSONBawaitdb.table('users').insert({'data':{'name':'user1','address':{'city':'Chicago','state':'IL'}}},{'data':{'name':'user2','address':{'city':'New York','state':'NY'},'admin':True}},)# SELECT JSONBuser=awaitdb.table('users').select('data->name AS name','data->>name AS name_text','data->address AS address').where('id',2).first()# row['name'] == '"user2"'# row['name_text'] == 'user2'# row['address'] == '{"name":"user2", "address":{"city":"New York", "state":"NY"}}'# UPDATE JSONBawaitdb.table('users').where('id',2).update({'data':{'address':{'city':'Richmond'}}})awaitdb.table('users').where('id',2).update({'data->address->city':'Richmond'})# JSONB in WHERE clauseusers=awaitdb.table('users').select('data->>name AS name').where("data->address->city",'Chicago')
原始sql示例
# select_rawawaitdb.table('users').select_raw('ROUND(AVG(id),1) AS avg_id, COUNT(1) AS copies').where('id',[4,5,6]).first()# insertRawdb.table('users').insertRaw('("id", "name") SELECT $1, $2 WHERE NOT EXISTS (SELECT "id" FROM users WHERE "id" = $1)',[10,'user name']))# rawawaitdb.raw('SELECT * FROM users WHERE id = $1',[2]).first()# use asyncpg (https://magicstack.github.io/asyncpg/current/usage.html)asyncwithdb.conn_pools['db_name'].acquire()asconnection:awaitconnection.fetchrow('SELECT * FROM test')
型号
要使用模型,下表需要primary key。
# setup connectionfromwindyqueryimportDBfromwindyquery.modelimportEventmodel_db=DB()asyncio.get_event_loop().run_until_complete(model_db.connect('db_name',{'host':'localhost','port':'5432','database':'db_name','username':'db_user_name','password':'db_user_password'},default=True))Event.db.on_next(model_db)# a Model with default setupclassUser(Model):pass# User.table == 'users'# more about naming conventionclassAdminUser(Model):pass# AdminUser.table == 'admin_users'# override table nameclassCustom(Model):table='my_custom'# Custom.table == 'my_custom'# find by iduser=awaitUser.find(2)# user.id == 2# find mutipleusers=awaitUser.find([1,2])# users[1].id == 2# allall_users=awaitUser.all()# find by whereuser=awaitUser.where("email",'test@example.com').first()users=User.where("email",'test@example.com')# save a new recorduser=User(email='test@example.com',password='password')user=awaituser.save()# create a new record if not founduser=User.where('id',10).where('name','not_such_name').first_or_new()# update existing recorduser=awaitUser.find(2)user.name='new name'awaituser.save()# JOSNB is converted to the matching python types (dict, list)user=User.find(2)user.data# {'data': {'name': 'user2', 'address': {'city': 'New York', 'state': 'NY'}}user.data['address']['city']='Richmond'awaituser.save()