Plotly Dash/Apache Kafka我希望看到图形实时打印,而不是在打印完成后显示

2024-05-16 14:52:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我在plotly中查看了流图的实时更新,但这并不是我想要做的。我有一个图形组件,点击按钮组件就会填充它。在更新图形的回调中,将创建一个对象,并从该类调用一个方法,该类使用ApacheKafka streaming producer返回15个值的流。然后,我在回调中使用ApacheKafka消费者,它逐个接收这些值。然而,我试图让这些值也一个接一个地出现在我的仪表板图上。我尝试使用一个interval组件,但这似乎只是每秒调用一次函数,这意味着脚本在调用函数时陷入困境,什么也不做

如有任何帮助,我们将不胜感激,如有必要,我们可以提供更多细节


Tags: producer对象方法脚本图形组件消费者仪表板
2条回答

使用者循环是无限的,不能对同一组记录重复同一使用者对象两次。不能使用列表理解,因此需要单独存储列表

以下内容未经测试,但展示了您需要做什么的总体思路

x = []
y = []

LIMIT = 15

for r in consumer_obj:

  if len(x) >= LIMIT:
    del x[0]
  if len(y) >= LIMIT:
    del y[0]

  x.append(r.value['day'])
  y.append(r.value['biomass'])
  
  # TODO: Update graph 
  data = {"data": [
          {"x": x,"y": y, 
          "type": "lines",
        }]

这里有一个项目使用Bokeh Int代替Plotly-https://github.com/Aakash282/kafka-bokeh-dashboard


或者您可以使用Kafka Connect写入SQLite数据库,并使绘图使用该数据库并定期进行更新

对于Streams,您将希望将使用者放在其自己的线程中。你已经把制作人放在一个线程中了,我也建议不要把制作人加入到主线程中

您要做的是将使用者放在一个线程中,并使用队列或线程安全的数据结构或锁定,队列是线程安全的,因此您可以根据需要提取数据

您还希望在def或startup中运行线程,而不是在回调中运行线程,每次在回调中,您只需要请求队列中的最新项目,并且可以在pref上将间隔设置为100ms左右的dep

我有一个简单的例子如下:

  • while循环将是您的回调间隔-im使用while作为时间

  • 您必须将使用者设置在回调之外-否则您会一次又一次地构建它,将使用者放置在回调范围之外,并在启动时运行它,在我使用的main下面


    from time import sleep
    from json import dumps
    from kafka import KafkaConsumer
    from multiprocessing import Process, Queue
    import os
    
    
    
    def consumer(q):
    
        consumer_obj = KafkaConsumer('test')
        for message in  consumer_obj:
         q.put(message.value)
    
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=consumer, args=(q,))
        p.start()
        while True:
            print(q.get())
            sleep(1)




相关问题 更多 >