使用Composer 2.6.2和Airflow 2.5.3时在pubsub消息中出现AttributeError: 'bytes'对象没有'encode'属性错误
在日志中可以看到以下错误,导致DAG失败:
[2024-03-27, 06:25:37 UTC] {logging_mixin.py:137} INFO - data:b'DNB-MS-MY,2024-03-26,Location' [2024-03-27, 06:25:37 UTC] {taskinstance.py:1778} ERROR - Task failed with exception Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute return_value = self.execute_callable() File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 192, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) File "/home/airflow/gcs/dags/pm_cm_dags/nim_batch_request_location_dag.py", line 148, in publish_message pubsub_message = {'data': base64.b64encode(data.encode()).decode()} AttributeError: 'bytes' object has no attribute 'encode'
这是导致错误的代码片段:
def publish_message(environ, **kwargs):
assert isinstance(data, bytes)
print(f"data:{data}")
pubsub_message = {'data': base64.b64encode(data.encode()).decode()}
print(f"pubsub_message:{pubsub_message}")
topic_name=''
gcp_pubsub_hook.PubSubHook().publish(project_id=PROJECT_ID,topic=topic_name,messages=pubsub_message)
另外,在将代码改为 pubsub_message = {'data': base64.b64encode(data).decode()} 后,出现了以下错误:
gcp_pubsub_hook.PubSubHook().publish(project_id=PROJECT_ID, topic=topic_name, messages=pubsub_message) 文件 "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py",第475行,在inner_wrapper中 return func(self, *args, **kwargs) 文件 "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/pubsub.py",第130行,在publish中 self._validate_messages(messages) 文件 "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/pubsub.py",第152行,在_validate_messages中 if "data" in message and isinstance(message["data"], str): TypeError: 字符串索引必须是整数
尝试了这种方法,DAG仍然失败,并出现上述错误。如果有人能帮忙解决这个问题,将非常有帮助。
1 个回答
看起来 data
已经是一个 bytes
对象了。
你能试试这个吗
pubsub_message = {'data': base64.b64encode(data).decode()}