嗨,我正在体验SimpleHttpOperator的怪异行为。 我扩展了这个操作符,如下所示:
class EPOHttpOperator(SimpleHttpOperator):
"""
Operator for retrieving data from EPO API, performs token validity check,
gets a new one, if old one close to not valid.
"""
@apply_defaults
def __init__(self, entity_code, *args, **kwargs):
super().__init__(*args, **kwargs)
self.entity_code = entity_code
self.endpoint = self.endpoint + self.entity_code
def execute(self, context):
try:
token_data = json.loads(Variable.get(key="access_token_data", deserialize_json=False))
if (datetime.now() - datetime.strptime(token_data["created_at"],
'%Y-%m-%d %H:%M:%S.%f')).seconds >= 19 * 60:
Variable.set(value=json.dumps(get_EPO_access_token(), default=str), key="access_token_data")
self.headers = {
"Authorization": f"Bearer {token_data['token']}",
"Accept": "application/json"
}
super(EPOHttpOperator, self).execute(context)
except HTTPError as http_err:
logging.error(f'HTTP error occurred during getting EPO data: {http_err}')
raise http_err
except Exception as e:
logging.error(e)
raise e
我编写了一个简单的单元测试:
def test_get_EPO_data(requests_mock):
requests_mock.get('http://ops.epo.org/rest-services/published-data/publication/epodoc/EP1522668',
text='{"text": "test"}')
requests_mock.post('https://ops.epo.org/3.2/auth/accesstoken',
text='{"access_token":"test", "status": "we just testing"}')
dag = DAG(dag_id='test_data', start_date=datetime.now())
task = EPOHttpOperator(
xcom_push=True,
do_xcom_push=True,
http_conn_id='http_EPO',
endpoint='published-data/publication/epodoc/',
entity_code='EP1522668',
method='GET',
task_id='get_data_task',
dag=dag,
)
ti = TaskInstance(task=task, execution_date=datetime.now(), )
task.execute(ti.get_template_context())
assert ti.xcom_pull(task_ids='get_data_task') == {"text": "test"}
测试没有通过,HttpHook中的XCOM值从未作为XCOM推送,我已经检查了hook类中负责推送逻辑的代码是否被调用:
....
if self.response_check:
if not self.response_check(response):
raise AirflowException("Response check returned False.")
if self.xcom_push_flag:
return response.text
我做错了什么?这是虫子吗
因此,我实际上设法通过将xcom值设置为
super(EPOHttpOperator, self).execute(context)
的结果来实现这一点文件在这方面有点误导;还是我做错了什么
相关问题 更多 >
编程相关推荐