我的堆栈是uwsgi和gevents。我试图用一个decorator来包装我的api端点,将所有请求数据(url、方法、主体和响应)推送到kafka主题中,但它不起作用。我的理论是因为我在使用gevents,并且我试图在异步模式下运行这些,实际上推送到kafka的异步线程不能与gevents一起运行。如果我试图使方法同步,那么它也不起作用,它在product worker中死亡,即在product之后调用永远不会返回。尽管这两种方法在pythonshell和线程上运行uwsgi都很好。在
遵循示例代码: 1使用kafka python(异步)
try:
kafka_producer = KafkaProducer(bootstrap_servers=KAFKAHOST.split(','))
except NoBrokersAvailable:
logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
kafka_producer = None
def send_message_to_kafka(topic, key, message):
"""
:param topic: topic name
:param key: key to decide partition
:param message: json serializable object to send
:return:
"""
if not kafka_producer:
logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
return
data = json.dumps(message)
try:
start = time.time()
kafka_producer.send(topic, key=str(key), value=data)
logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
except KafkaTimeoutError as e:
logger.info(u'Message not sent: {}'.format(KAFKAHOST))
logger.info(e)
pass
except Exception as e:
logger.info(u'Message not sent: {}'.format(KAFKAHOST))
logger.exception(e)
pass
与皮卡夫卡(同步):
try:
client = KafkaClient(hosts=KAFKAHOST)
except Exception as e:
logger.info(u'Kafka Host Not Found: {}'.format(KAFKAHOST))
client = None
def send_message_to_kafka(topic, key, message):
"""
:param topic: topic name
:param key: key to decide partition
:param message: json serializable object to send
:return:
"""
if not client:
logger.info(u'Kafka Host is None')
return
data = json.dumps(message)
try:
start = time.time()
topic = client.topics[topic]
with topic.get_sync_producer() as producer:
producer.produce(data, partition_key='{}'.format(key))
logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
except Exception as e:
logger.exception(e)
pass
我对派卡夫卡有更多的经验,所以我可以回答这一部分。pykafka使用了一个可插入的线程处理程序,并且内置了gevent支持。您需要用
use_greenlets=True
实例化KafkaClient。文档here关于你的方法的其他想法。为每条消息创建一个新的topic对象和生产者是非常昂贵的。最好一次创建并重用。在
最后,kafka从批处理和压缩中获得了所有它的速度。使用sync producer可以防止客户端利用这些特性。它可以工作,但速度较慢,占用更多空间。有些应用程序需要同步,但如果遇到性能瓶颈,重新考虑应用程序以批处理消息可能是有意义的。在
相关问题 更多 >
编程相关推荐