KafkaTimeoutError:在60.0秒后更新元数据失败

2024-05-29 03:23:15 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个高通量kafka producer的用例,我希望每秒推送数千条json消息

我有一个3节点的kafka集群,我正在使用最新的kafka python库,并使用以下方法生成消息

def publish_to_kafka(topic):
    data = get_data(topic)
    producer = KafkaProducer(bootstrap_servers=['b1', 'b2', 'b3'],
                             value_serializer=lambda x: dumps(x).encode('utf-8'), compression_type='gzip')
    try:
        for obj in data:
           producer.send(topic, value=obj)
    except Exception as e:
            logger.error(e)
    finally:
        producer.close()

我的主题有3个分区

方法有时工作正常,但出现错误“KafkaTimeoutError:在60.0秒后更新元数据失败”

我需要更改哪些设置才能使其顺利工作


Tags: producerkafka方法jsonobj消息datatopic
1条回答
网友
1楼 · 发布于 2024-05-29 03:23:15
  1. 如果某个主题不存在,而您正试图生成该主题,并且“自动创建主题”设置为false,则可能会发生

    可能的解决方案:在代理配置中(server.properties)auto.create.topics.enable=true(注意,这在Confluent Kafka中是默认的)

  2. 另一种情况可能是网络拥塞或速度,如果使用Kafka代理更新元数据需要60秒以上

    可能的分辨率:生产者配置:max.block.ms = 1200000(120秒,用于ex)

  3. 检查您的代理是否由于某种原因(例如,负载过大)而停机,以及为什么他们无法响应元数据请求。通常,您可以在server.log文件中看到它们

相关问题 更多 >

    热门问题