KafkaTimeoutError:使用kafkapython在60.0秒后更新元数据失败

2024-05-15 22:00:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我已在localhost上安装了kafka。它正在运行

● kafka.service - Apache Kafka Server
     Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
     Active: active (running) since Fri 2021-06-11 03:36:36 EEST; 23min ago
       Docs: http://kafka.apache.org/documentation.html
   Main PID: 103920 (java)
      Tasks: 71 (limit: 9353)
     Memory: 367.0M
     CGroup: /system.slice/kafka.service
             └─103920 /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHe>

июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,214] INFO [/config/changes-event-process-thread]: Starting (kafka>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,292] INFO [SocketServer brokerId=0] Starting socket server accept>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,300] INFO [SocketServer brokerId=0] Started data-plane acceptor a>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,300] INFO [SocketServer brokerId=0] Started socket server accepto>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,307] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.App>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,307] INFO Kafka commitId: 448719dc99a19793 (org.apache.kafka.comm>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,307] INFO Kafka startTimeMs: 1623371801301 (org.apache.kafka.comm>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,317] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
июн 11 03:57:24 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:57:24,797] INFO Creating topic PairsUpdated with configuration {} and initial>
июн 11 03:57:25 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:57:25,022] INFO [KafkaApi-0] Auto creation of topic PairsUpdated with 1 parti>

我正在使用kafka-python作为客户端。当我试图发送消息时,我得到了错误

main.py

def send_kafka(topic: str, data: dict):
    kafka_producer = KafkaProducer(
        bootstrap_servers=["localhost:9092"],
        value_serializer=lambda m: json.dumps(m).encode("utf-8"),
    )
    kafka_producer.send("PairsUpdated", b"")

kafka.log

DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv4 ('127.0.0.1', 9092)]> Request 68: MetadataRequest_v1(topics=['PairsUpdated'])
DEBUG:kafka.protocol.parser:Received correlation id: 68
DEBUG:kafka.protocol.parser:Processing response MetadataResponse_v1
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv4 ('127.0.0.1', 9092)]> Response 68 (3.993988037109375 ms): MetadataResponse_v1(brokers=[], controller_id=-1, topics=[(error_code=5, topic='PairsUpdated', is_internal=False, partitions=[])])
DEBUG:kafka.producer.kafka:_wait_on_metadata woke after 6.909250736236572 secs.
DEBUG:kafka.producer.kafka:Requesting metadata update for topic PairsUpdated
DEBUG:kafka.client:Sending metadata request MetadataRequest_v1(topics=['PairsUpdated']) to node bootstrap-0
DEBUG:kafka.protocol.parser:Sending request MetadataRequest_v1(topics=['PairsUpdated'])

python.log

Internal Server Error: /api/v1/pairs/
Traceback (most recent call last):
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/django/core/handlers/exception.py", line 47, in inner
    response = get_response(request)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/django/core/handlers/base.py", line 181, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/django/views/decorators/csrf.py", line 54, in wrapped_view
    return view_func(*args, **kwargs)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/viewsets.py", line 125, in view
    return self.dispatch(request, *args, **kwargs)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/views.py", line 509, in dispatch
    response = self.handle_exception(exc)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/views.py", line 469, in handle_exception
    self.raise_uncaught_exception(exc)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/views.py", line 480, in raise_uncaught_exception
    raise exc
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/views.py", line 506, in dispatch
    response = handler(request, *args, **kwargs)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/backoffice/controllers.py", line 78, in create
    send_kafka("PairsUpdated", b"")
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/backoffice/main.py", line 33, in send_kafka
    kafka_producer.send("PairsUpdated", b"")
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/kafka/producer/kafka.py", line 576, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/kafka/producer/kafka.py", line 702, in _wait_on_metadata
    raise Errors.KafkaTimeoutError(
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

telnet

>>> telnet 127.0.0.1 9092
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.

Ubuntu==20.04

Python==3.8.5

卡夫卡python==2.0.2

kafka.service

[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64"
ExecStart=/usr/local/kafka-server/bin/kafka-server-start.sh /usr/local/kafka-server/config/server.properties
ExecStop=/usr/local/kafka-server/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

Tags: kafkapyhomeserverlibbotshline
1条回答
网友
1楼 · 发布于 2024-05-15 22:00:14

查看卡夫卡服务状态,很明显,您的卡夫卡服务正在低内存(367.0 M)上运行


您可以验证超时here背后的原因

Raises:
KafkaTimeoutError – if unable to fetch topic metadata, or unable to obtain memory buffer prior to configured max_block_ms


您可以通过更新Kafka服务文件从外部提供内存

Environment="KAFKA_HEAP_OPTS=-Xmx1G -Xms1G"

最后重新启动卡夫卡服务

相关问题 更多 >