气流单板机

2024-05-29 01:34:08 发布

您现在位置:Python中文网/ 问答频道 /正文

嗨,我正在体验SimpleHttpOperator的怪异行为。 我扩展了这个操作符,如下所示:

class EPOHttpOperator(SimpleHttpOperator):
    """
    Operator for retrieving data from EPO API, performs token validity check,
    gets a new one, if old one close to not valid.
    """

    @apply_defaults
    def __init__(self, entity_code, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.entity_code = entity_code
        self.endpoint = self.endpoint + self.entity_code

    def execute(self, context):
        try:
            token_data = json.loads(Variable.get(key="access_token_data", deserialize_json=False))
            if (datetime.now() - datetime.strptime(token_data["created_at"],
                                                   '%Y-%m-%d %H:%M:%S.%f')).seconds >= 19 * 60:

                Variable.set(value=json.dumps(get_EPO_access_token(), default=str), key="access_token_data")

            self.headers = {
                "Authorization": f"Bearer {token_data['token']}",
                "Accept": "application/json"
            }

            super(EPOHttpOperator, self).execute(context)

        except HTTPError as http_err:
            logging.error(f'HTTP error occurred during getting EPO data: {http_err}')
            raise http_err

        except Exception as e:
            logging.error(e)
            raise e

我编写了一个简单的单元测试:

def test_get_EPO_data(requests_mock):
    requests_mock.get('http://ops.epo.org/rest-services/published-data/publication/epodoc/EP1522668',
                      text='{"text": "test"}')
    requests_mock.post('https://ops.epo.org/3.2/auth/accesstoken',
                       text='{"access_token":"test", "status": "we just testing"}')

    dag = DAG(dag_id='test_data', start_date=datetime.now())
    task = EPOHttpOperator(
        xcom_push=True,
        do_xcom_push=True,
        http_conn_id='http_EPO',
        endpoint='published-data/publication/epodoc/',
        entity_code='EP1522668',
        method='GET',
        task_id='get_data_task',
        dag=dag,
    )
    ti = TaskInstance(task=task, execution_date=datetime.now(), )
    task.execute(ti.get_template_context())
    assert ti.xcom_pull(task_ids='get_data_task') == {"text": "test"}

测试没有通过,HttpHook中的XCOM值从未作为XCOM推送,我已经检查了hook类中负责推送逻辑的代码是否被调用:

....
  if self.response_check:
            if not self.response_check(response):
                raise AirflowException("Response check returned False.")
  if self.xcom_push_flag:
     return response.text

我做错了什么?这是虫子吗


Tags: texttestselftokenjsonhttptaskdata
1条回答
网友
1楼 · 发布于 2024-05-29 01:34:08

因此,我实际上设法通过将xcom值设置为super(EPOHttpOperator, self).execute(context)的结果来实现这一点

def execute(self, context):
        try:
             .
             .
             .
            self.headers = {
                "Authorization": f"Bearer {token_data['token']}",
                "Accept": "application/json"
            }

            super(EPOHttpOperator, self).execute(context) -> Variable.set(value=super(EPOHttpOperator, self).execute(context),key='foo')

文件在这方面有点误导;还是我做错了什么

相关问题 更多 >

    热门问题