我已在localhost
上安装了kafka
。它正在运行
● kafka.service - Apache Kafka Server
Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
Active: active (running) since Fri 2021-06-11 03:36:36 EEST; 23min ago
Docs: http://kafka.apache.org/documentation.html
Main PID: 103920 (java)
Tasks: 71 (limit: 9353)
Memory: 367.0M
CGroup: /system.slice/kafka.service
└─103920 /usr/lib/jvm/java-11-openjdk-amd64/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHe>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,214] INFO [/config/changes-event-process-thread]: Starting (kafka>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,292] INFO [SocketServer brokerId=0] Starting socket server accept>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,300] INFO [SocketServer brokerId=0] Started data-plane acceptor a>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,300] INFO [SocketServer brokerId=0] Started socket server accepto>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,307] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.App>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,307] INFO Kafka commitId: 448719dc99a19793 (org.apache.kafka.comm>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,307] INFO Kafka startTimeMs: 1623371801301 (org.apache.kafka.comm>
июн 11 03:36:41 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:36:41,317] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
июн 11 03:57:24 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:57:24,797] INFO Creating topic PairsUpdated with configuration {} and initial>
июн 11 03:57:25 dmytruk kafka-server-start.sh[103920]: [2021-06-11 03:57:25,022] INFO [KafkaApi-0] Auto creation of topic PairsUpdated with 1 parti>
我正在使用kafka-python
作为客户端。当我试图发送消息时,我得到了错误
main.py
def send_kafka(topic: str, data: dict):
kafka_producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=lambda m: json.dumps(m).encode("utf-8"),
)
kafka_producer.send("PairsUpdated", b"")
kafka.log
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv4 ('127.0.0.1', 9092)]> Request 68: MetadataRequest_v1(topics=['PairsUpdated'])
DEBUG:kafka.protocol.parser:Received correlation id: 68
DEBUG:kafka.protocol.parser:Processing response MetadataResponse_v1
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv4 ('127.0.0.1', 9092)]> Response 68 (3.993988037109375 ms): MetadataResponse_v1(brokers=[], controller_id=-1, topics=[(error_code=5, topic='PairsUpdated', is_internal=False, partitions=[])])
DEBUG:kafka.producer.kafka:_wait_on_metadata woke after 6.909250736236572 secs.
DEBUG:kafka.producer.kafka:Requesting metadata update for topic PairsUpdated
DEBUG:kafka.client:Sending metadata request MetadataRequest_v1(topics=['PairsUpdated']) to node bootstrap-0
DEBUG:kafka.protocol.parser:Sending request MetadataRequest_v1(topics=['PairsUpdated'])
python.log
Internal Server Error: /api/v1/pairs/
Traceback (most recent call last):
File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/django/core/handlers/exception.py", line 47, in inner
response = get_response(request)
File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/django/core/handlers/base.py", line 181, in _get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/django/views/decorators/csrf.py", line 54, in wrapped_view
return view_func(*args, **kwargs)
File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/viewsets.py", line 125, in view
return self.dispatch(request, *args, **kwargs)
File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/views.py", line 509, in dispatch
response = self.handle_exception(exc)
File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/views.py", line 469, in handle_exception
self.raise_uncaught_exception(exc)
File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/views.py", line 480, in raise_uncaught_exception
raise exc
File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/rest_framework/views.py", line 506, in dispatch
response = handler(request, *args, **kwargs)
File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/backoffice/controllers.py", line 78, in create
send_kafka("PairsUpdated", b"")
File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/backoffice/main.py", line 33, in send_kafka
kafka_producer.send("PairsUpdated", b"")
File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/kafka/producer/kafka.py", line 576, in send
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
File "/home/m0nte-cr1st0/work_projects/trading_bot/backoffice/venv/lib/python3.8/site-packages/kafka/producer/kafka.py", line 702, in _wait_on_metadata
raise Errors.KafkaTimeoutError(
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
telnet
>>> telnet 127.0.0.1 9092
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
Ubuntu==20.04
Python==3.8.5
卡夫卡python==2.0.2
kafka.service
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64"
ExecStart=/usr/local/kafka-server/bin/kafka-server-start.sh /usr/local/kafka-server/config/server.properties
ExecStop=/usr/local/kafka-server/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
查看卡夫卡服务状态,很明显,您的卡夫卡服务正在低内存(367.0 M)上运行
您可以验证超时here背后的原因
您可以通过更新Kafka服务文件从外部提供内存
最后重新启动卡夫卡服务
相关问题 更多 >
编程相关推荐