Django的微服务
bgtasks的Python项目详细描述
教程
安装
pip install bgtasks
配置
设置.py
AMQP={'USERNAME':'guest','PASSWORD':'guest','VHOST':'/','HOST':'localhost','PORT':5672,'RPC_SLEEP_TIME':0.005}
rpc方法
app1视图.py
importjsonfromdjango.httpimportHttpResponsefrombgtasksimportRPCClientdefadd_user(request):data={'username':request.POST.get('username'),'password':request.POST.get('password')}client=RPCClient()# Create user and get iddata=client.call('user.add',data)content=json.dumps({"success":true,"user_id":data})returnHttpResponse(content)
app2 tasks.py
fromdjango.contrib.authimportUserfrombgtasksimportrpc_tasks@rpc_tasks('user.add')defadd_user(data):user=User.objects.create(username=data['username'])user.set_password(data['password'])returnuser.id
用于列出rabbitmq
python manange.py tasks
作业方法
app1视图.py
fromdjango.httpimportHttpResponsefrombgtasksimportJobClientdefsend_email(request):data={'email':request.POST.get('email')}client=JobClient()data=client.call('user.send_mail',data)content=json.dumps({"success":true})returnHttpResponse(content)
app2 tasks.py
fromdjango.contrib.authimportUserfromdjango.core.mailimportsend_mailfrombgtasksimportjob_tasks@job_tasks('user.send_mail')defsend_mail(data):send_mail('Hi!','Hello World.','from@example.com',[data['email']],fail_silently=False,)
测试
在settings.py中添加environment='测试'
importjsonfromdjango.testimportTestCasefromdjango.testimportClientfrombgtasksimportrpc_tasks@rpc_tasks('user.add')defadd_user(data):return1classRPCTestCase(TestCase):deftest_add_user(self):data={'username':'john','password':'smith'}c=Client()response=c.post('/user/add/',data)data=json.loads(response.content)self.assertEqual(response.status_code,200)self.assertEqual(data['user_id'],1)