允许在消息总线/卡夫卡上使用灵活、快速程序的包。
metrobus的Python项目详细描述
大都市
哑总线上智能路由的概念证明和示例。
这是一个小项目,重点是我的博客文章,围绕一个愚蠢的消息总线路由。就像卡夫卡一样。
无状态和哑:https://medium.com/capital-one-developers/stateless-and-dumb-microservices-on-a-message-bus-be78bca93ccb" rel="nofollow">https://medium.com/capital-one developers/stateless-and-dumb-microservices-on-a-message-bus-be78bca93ccb
快速缓存:https://medium.com/capital-one-developers/blazing-fast-data-lookup-in-a-microservices-world-dd3ae548ca45" rel="nofollow">https://medium.com/capital-one developers/blazing-fast-data-lookup-in-a-microservices-world-dd3ae548ca45
我在这里使用一些缓存思想来查找一些数据。
"test"示例应用程序的概念相当简单。"pusher"生成记录,就像来自客户机或应用程序一样。这些记录是简单的json结构。在我们的例子中,我们会得到一些很酷的东西,比如帐号(假的!)。推送者将消息发送到卡夫卡的"来源"主题。将此视为上游客户的公共入口。
接下来,"司机"接手。然后"busdriver"从源中提取数据,并将其格式化为下游可以使用的数据。在我们的例子中,我们添加了一个"header"对象。此"头"包含一个"路由"(和一个初始路由,以及一个用于跟踪的历史路由)。一旦基于某种智能(如果消息处于某种状态或某种类型,等等)创建了它,那么"总线驱动程序"将其配置为这样。这就是"metrobus"框架发挥作用的地方。一旦对"busdriver"的回调返回"wrapped"对象,框架就会从路由堆栈中提取(弹出)下一个停止(kafka主题),并将消息发送到下游。
在这种情况下,我们在"busdriver"之后的第一站是使用外部数据获取与帐户关联的电子邮件地址的微服务。这是我们的"接触点"微服务。你知道,我们应该在什么时候联系客户?电子邮件地址?来吧。通用的!"contactpoint"微服务从外部缓存中提取并加载电子邮件地址。如果缓存不包含电子邮件地址,则回调函数返回none,表示邮件已被适当"丢弃"。我可以将此更改为特定的"异常"。很快?
接下来是"白名单"服务。长话短说,同样的模式。如果电子邮件地址在白名单缓存中,我们可以发送给它。否则,消息将被丢弃。
接下来是"logstop"服务。如果我们真的在构建一个电子邮件服务,我们会在这里通过一个smtp服务器或类似mailgun或mailchimp之类的酷东西发送消息。在这种情况下,我们就放弃它。因为你知道…示例代码。
有问题吗?打电话给我:chris[@][at](模糊处理)faie.com
哦,我写这个框架的全部原因是为了让我的每一站都变得"愚蠢",只关注一件事…正在处理消息。不管它是从哪里来的。不管它去哪里。做好你的工作。接吻。< / P>
importtimeimportjsonfrommetrobusimportmetrobusimportsysimportrandom# To consume latest messages and auto-commit offsetsWHITE_LIST=set()defcallback(message):print("Received in CB: ",message)real_message=messageifreal_message['email']inWHITE_LIST:real_message['whitelist']=Truereturnreal_messageprint("DROPPING due to missing white list.")returnif__name__=="__main__":print("Trying to start app.")print("Loading whitelist")counter=0withopen('./data/whitelist.dat','r')asinput_file:forlineininput_file:line=line.strip()WHITE_LIST.add(line)counter+=1ifcounter%100000==0:print('added another 100k, up to ',counter)topic_in="WhiteList"metrostop=metrobus.MetroStop(callback,in_topic=topic_in)metrostop.start()
这个"stop"的唯一代码位是"topic-in"(配置文件,认真的说,伙计们),加载缓存(whitelist.dat),然后处理每条消息。说真的,就像12行代码…这就是我的意思,当我说"你在开玩笑吗?就像10行代码一样!"
设置
下载克隆/下载此repo后,让我们在测试路径中设置一些内容:
- Git克隆it@github.com">git@github.com:chrisfauerbach/metrobus.git
- cd metrobus
- CD测试
- pip安装--升级pipenv
- PIPENV外壳
- mkdir数据
- python make_whitelist.py>;data/whitelist.dat
- python make_account_email.py>;数据/电子邮件.dat
- /run.sh
然后你会看到很多Docker编写的东西发生了。环境变量在metrobus.env中
设置为:
- 卡夫卡:卡夫卡:9092
- redis:redis:6379
这些主机名在docker compose文件中设置。
docker compose.yml
网络状态
Web应用程序,以便查看来自Redis的一些超级基本统计信息。
web.py中的flask应用程序
推进器
很酷的名字,嗯?推杆?
这是数据的"源"示例
原始json消息将进入kafka…
推进器.py
总线驱动器
这是流程的第一个主要步骤。
巴士司机是我的智能路由器伙计。实际值
应用程序,总线驱动器可以/将检查
数据进入的状态。然后,它将
动态建立路线
联系人-添加电子邮件(第一阶段)
此应用程序使用假帐户ID
在推进器中生成。添加电子邮件
基于cps.dat文件的地址
以前生成的。如果找不到,则删除消息
白名单(第二阶段)
白名单的例子。我在博客上写了黑名单的事。
同样的交易?如果记录(在本例中是电子邮件)不是
在列表中,我们可以删除消息(return)
日志停止(第三阶段)
这是Metrobus上的一个示例"sink"停止
这个坏孩子所做的就是记录信息..
在现实世界中,这可能会推进ElasticSearch
或者是一些不太酷的东西。
注
这是个人项目,与我的雇主或任何合同/客户无关。
不过,我会把这个项目制作成麻省理工学院许可证,供任何人使用和滥用。