kafka-server-stop.sh在Python脚本启动Kafka时无法工作

20 投票
5 回答
51615 浏览
提问于 2025-04-18 20:03

在远程节点上部署了一些Apache Kafka实例后,我发现了一个问题,跟Kafka归档包里的kafka-server-stop.sh脚本有关。

默认情况下,这个脚本的内容是:

#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM

这个脚本在我以非后台进程的方式执行Apache Kafka时效果很好,比如:

/var/lib/kafka/bin/kafka-server-start.sh /var/lib/kafka/config/server.properties

而且当我以后台进程的方式执行时,它也能正常工作:

/var/lib/kafka/bin/kafka-server-start.sh /var/lib/kafka/config/server.properties &

但是在我的远程节点上,我是通过这个Python脚本(使用Ansible)来执行的:

#!/usr/bin/env python
import argparse
import os
import subprocess

KAFKA_PATH = "/var/lib/kafka/"

def execute_command_pipe_output(command_to_call):
  return subprocess.Popen(command_to_call, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

def execute_command_no_output(command_to_call):
  with open(os.devnull, "w") as null_file:
    return subprocess.Popen(command_to_call, stdout=null_file, stderr=subprocess.STDOUT)  

def start_kafka(args):
  command_to_call = ["nohup"]
  command_to_call += [KAFKA_PATH + "bin/zookeeper-server-start.sh"]
  command_to_call += [KAFKA_PATH + "config/zookeeper.properties"]

  proc = execute_command_no_output(command_to_call)

  command_to_call = ["nohup"]
  command_to_call += [KAFKA_PATH + "bin/kafka-server-start.sh"]
  command_to_call += [KAFKA_PATH + "config/server.properties"]

  proc = execute_command_no_output(command_to_call)

def stop_kafka(args):
  command_to_call = [KAFKA_PATH + "bin/kafka-server-stop.sh"]

  proc = execute_command_pipe_output(command_to_call)
  for line in iter(proc.stdout.readline, b''):
    print line,

  command_to_call = [KAFKA_PATH + "bin/zookeeper-server-stop.sh"]

  proc = execute_command_pipe_output(command_to_call)
  for line in iter(proc.stdout.readline, b''):
    print line,


if __name__ == "__main__":
  parser = argparse.ArgumentParser(description="Starting Zookeeper and Kafka instances")
  parser.add_argument('action', choices=['start', 'stop'], help="action to take")

  args = parser.parse_args()

  if args.action == 'start':
    start_kafka(args)
  elif args.action == 'stop':
    stop_kafka(args)
  else:
    parser.print_help()

执行后,

manage-kafka.py start
manage-kafka.py stop

Zookeeper正常关闭(这没问题),但Kafka却仍然在运行。

更有趣的是,当我手动调用

nohup /var/lib/kafka/bin/kafka-server-stop.sh

或者

nohup /var/lib/kafka/bin/kafka-server-stop.sh &

时,kafka-server-stop.sh能够正确关闭Kafka实例。我怀疑这个问题可能跟某些Linux或Python的设置有关。

5 个回答

0

kafka-server-stop.sh 里的命令改成这个解决了我的问题:

PIDS=$(ps axww | grep -i 'kafka\.Kafka' | grep java | grep -v grep | nawk '{print $1}')


解释:
问题在于 kafka-server-stop.sh 使用了一个命令来获取需要终止的进程ID(PIDs):

PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')

终端的 'ps' 80 列问题:
这个问题是,ps ax 的输出没有显示完整,因为它被截断了,通常只显示80列(这是以前老式终端的默认宽度)。而我的终端宽度是168列,这个可以通过 stty -a 查看。改成 ps axww 就能显示完整的输出,简单来说就是把输出宽度加宽了。

awk 输入记录长度问题:
另一个问题是,awk 对每个输入记录的字符数有限制,最多只能处理3000个字符,具体可以查看 这里。而 nawk 则没有这个限制,它的限制是由 C long 的值决定的。gawk 也可以正常工作。

这样做的缺点是,我在修改一个核心脚本,这个脚本在升级时可能会被覆盖。虽然这样做比较快,但可能有点不太干净,不过对我来说能解决问题就好。

附注:如果你感兴趣,我在这里找到一个 jira 问题 链接

0

我猜:kafka-server-stop.sh这个脚本使用了shell管道。所以在用Popen的时候需要加上shell=True这个参数。

可以查看这个链接了解更多:https://docs.python.org/2/library/subprocess.html#subprocess.Popen

2

在运行 kafka-zookeeper-stop.sh 管理工具之前,请先执行 kafka-server-stop.sh。这个步骤会先把服务器和 zookeeper 断开连接,然后再停止 zookeeper 本身。请在重新启动之前等待 3-4 秒钟。

14

我之前也遇到过这个问题,后来找到了一种简单粗暴的解决办法。

问题是,Kafka突然关闭了,但那个端口还是被占用着。

你可以按照以下步骤操作:

  1. 找到正在使用那个端口的进程的ID: lsof -t -i :YOUR_PORT_NUMBER 。##这是在Mac上用的命令
  2. 结束那个进程 kill -9 process_id
16

Kafka的代理需要在Zookeeper完成关闭之前,先完成自己的关闭过程。

所以,先启动Zookeeper,然后Kafka的代理会重新尝试关闭

我也遇到过类似的情况。问题是我的配置没有等到Kafka的代理关闭就直接进行了下一步。希望这能帮助到某些人。我花了一段时间才弄明白这个问题……

撰写回答