Python条件/元数据驱动的类导入和执行

2024-04-26 08:08:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我很难实现元数据驱动的方法来调度代码执行

基本上,我希望构建一个要处理的、标记为特定类型的工作项队列。有关联的“使用者”类和方法,我想处理工作项

最终目标是采用完全数据库驱动的方法,其中作业进入特定类型的队列,并实例化相关的类来处理作业

现在,我只使用dict(proc\u dict)

queue_sources=['queue1','queue2','queue3']

for source in queue_sources:
    collection=jq[source]
    for item in collection.find({}):
        schedule_type = item['schedule']
        date_last_run = item['ts']['date_last_run']
        item['job_type']=source

    if schedule_type=='daily':
        days=1
    elif schedule_type=='weekly':
        days=7

    if date_last_run is None:
        jobqueue.pub(item)

    elif date_last_run  > datetime.now()-timedelta(days=days):
        jobqueue.pub(item)

    else:
        module_logger.debug("Job does not meet criteria: ",item)
        module_logger.debug(" -- schedule type: ",schedule_type)
        module_logger.debug(" -- date last run: ",date_last_run)



def process_schedule(item):
    proc_dict={"queue1":["s_type1","type1_class","type1_method"]
              ,"queue2":["s_type2","type2_class","type2_method"]
              ,"queue3":["s_type3","type3_class","type3_method"]
              }

    job_type=item['job_type']

    if job_type in proc_dict.keys():
            consumer=[item][job_type][0]
            consumer_class=[item][job_type][1]
            consumer_method=[item][job_type][2]
            from consumer import consumer_class

            ...instantiate class

            ...do stuff with item

一份工作看起来像:

{ 
    "_id" : ObjectId("59f6490b816c7bcb39375b8d"), 
    "data" : {
        "ts" : {
            "date_last_modified" : ISODate("2017-10-29T20:42:50.801+0000"), 
            "date_last_run" : null
        }, 
        "manufacturer" : "xxxxx", 
        "status" : "active", 
        "_id" : ObjectId("59f63d5da54d752e56150af7"), 
        "schedule" : "weekly", 
        "manufacturer_type" : "Business", 
        "url" : "https://www.xxxx/", 
        "job_type" : "queue1"
    }, 
    "status" : "waiting", 
    "ts" : {
        "created" : ISODate("2017-10-29T21:32:59.130+0000"), 
        "started" : null, 
        "done" : null
    }
}

我很难理解:

  1. 如果应该坚持这种模式
  2. 如果是这样的话,如何动态实例化和调用这些类

谢谢你的建议


Tags: 方法rundateconsumertypejobprocitem
1条回答
网友
1楼 · 发布于 2024-04-26 08:08:46

更多研究发现:

importlib.

因此它变成了这样:

    if job_type in proc_dict.keys():
        consumer=[item][job_type][0]
        consumer_class=[item][job_type][1]
        consumer_method=[item][job_type][2]

        imported_class=importlib.__import__(consumer_class)
        imported_class_instance=imported_class()
        imported_class_instance.consumer_method(item)

相关问题 更多 >