我有一个高通量kafka producer的用例,我希望每秒推送数千条json消息
我有一个3节点的kafka集群,我正在使用最新的kafka python库,并使用以下方法生成消息
def publish_to_kafka(topic):
data = get_data(topic)
producer = KafkaProducer(bootstrap_servers=['b1', 'b2', 'b3'],
value_serializer=lambda x: dumps(x).encode('utf-8'), compression_type='gzip')
try:
for obj in data:
producer.send(topic, value=obj)
except Exception as e:
logger.error(e)
finally:
producer.close()
我的主题有3个分区
方法有时工作正常,但出现错误“KafkaTimeoutError:在60.0秒后更新元数据失败”
我需要更改哪些设置才能使其顺利工作
如果某个主题不存在,而您正试图生成该主题,并且“自动创建主题”设置为false,则可能会发生
可能的解决方案:在代理配置中(server.properties)
auto.create.topics.enable=true
(注意,这在Confluent Kafka中是默认的)另一种情况可能是网络拥塞或速度,如果使用Kafka代理更新元数据需要60秒以上
可能的分辨率:生产者配置:
max.block.ms = 1200000
(120秒,用于ex)检查您的代理是否由于某种原因(例如,负载过大)而停机,以及为什么他们无法响应元数据请求。通常,您可以在server.log文件中看到它们
相关问题 更多 >
编程相关推荐