使用Composer 2.6.2和Airflow 2.5.3时在pubsub消息中出现AttributeError: 'bytes'对象没有'encode'属性错误

1 投票
1 回答
25 浏览
提问于 2025-04-12 15:58

在日志中可以看到以下错误,导致DAG失败:

[2024-03-27, 06:25:37 UTC] {logging_mixin.py:137} INFO - data:b'DNB-MS-MY,2024-03-26,Location' [2024-03-27, 06:25:37 UTC] {taskinstance.py:1778} ERROR - Task failed with exception Traceback (most recent call last):   File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute     return_value = self.execute_callable()   File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 192, in execute_callable     return self.python_callable(*self.op_args, **self.op_kwargs)   File "/home/airflow/gcs/dags/pm_cm_dags/nim_batch_request_location_dag.py", line 148, in publish_message     pubsub_message = {'data': base64.b64encode(data.encode()).decode()} AttributeError: 'bytes' object has no attribute 'encode'

这是导致错误的代码片段:

def publish_message(environ, **kwargs):
 assert isinstance(data, bytes)
 print(f"data:{data}")
 pubsub_message = {'data': base64.b64encode(data.encode()).decode()}
 print(f"pubsub_message:{pubsub_message}")
 topic_name=''
 gcp_pubsub_hook.PubSubHook().publish(project_id=PROJECT_ID,topic=topic_name,messages=pubsub_message)

另外,在将代码改为 pubsub_message = {'data': base64.b64encode(data).decode()} 后,出现了以下错误:

gcp_pubsub_hook.PubSubHook().publish(project_id=PROJECT_ID, topic=topic_name, messages=pubsub_message) 文件 "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py",第475行,在inner_wrapper中 return func(self, *args, **kwargs) 文件 "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/pubsub.py",第130行,在publish中 self._validate_messages(messages) 文件 "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/pubsub.py",第152行,在_validate_messages中 if "data" in message and isinstance(message["data"], str): TypeError: 字符串索引必须是整数

尝试了这种方法,DAG仍然失败,并出现上述错误。如果有人能帮忙解决这个问题,将非常有帮助。

1 个回答

1

看起来 data 已经是一个 bytes 对象了。

你能试试这个吗

pubsub_message = {'data': base64.b64encode(data).decode()}

撰写回答