如何使用Python通过Azure EventHub将JSON数据获取到Azure时间序列洞察中?

2024-04-19 16:52:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用Python获取一些示例JSON数据以显示在Azure Time Series Insights(TSI)中,这样我就可以在他们的探索性浏览器中可视化这些数据

我已经了解了有关Azure EventHubs和Time Series Insights设置的必要先决条件。这些措施包括:

  1. 在Azure门户中创建资源组
  2. 在资源组内创建事件中心命名空间
  3. 在该事件中心命名空间内创建事件中心实体
  4. 在事件中心实体内创建消费者组
  5. 我在资源组中设置了Azure TSI环境
  6. 最后,我将不同创建的源作为详细信息(资源组、事件中心名称空间、事件中心名称等)添加到Azure TSI环境中

除此之外,我还测试了使用python成功地将事件消息发送到我的事件中心(但不发送到TSI环境),方法是遵循以下文档:https://docs.microsoft.com/en-us/azure/event-hubs/get-started-python-send-v2并使用以下代码(尽管填写了con_str和eventhub_name:

import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

async def run():
    # Create a producer client to send messages to the event hub.
    # Specify a connection string to your event hubs namespace and
        # the event hub name.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE - CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        # Add events to the batch.
        event_data_batch.add(EventData('First event '))
        event_data_batch.add(EventData('Second event'))
        event_data_batch.add(EventData('Third event'))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

我还测试了通过以下文档将Microsoft的风车模拟器数据成功发送到我的TSI环境:https://docs.microsoft.com/en-us/azure/time-series-insights/time-series-insights-send-events

我现在不知道如何使用Python将示例JSON数据实际获取到Azure TSI环境中

任何帮助都将不胜感激。谢谢


Tags: producertheto数据eventsenddata环境
1条回答
网友
1楼 · 发布于 2024-04-19 16:52:02

正如文章提到的here,您必须使用JSON格式的字符串发送EventData的主体。

将上述代码的修改片段共享给您:

import asyncio
import nest_asyncio
nest_asyncio.apply()
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
import json

async def run():
    # Create a producer client to send messages to the event hub.
    # Specify a connection string to your event hubs namespace and
        # the event hub name.
    producer = EventHubProducerClient.from_connection_string("<>", eventhub_name="<>")
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        # Add events to the batch.

        #Method 1 - You provide a JSON string 
        body1 = '{"id":"device2","timestamp":"2016-01-17T01:17:00Z"}' 
        event_data_batch.add(EventData(body1))

        #Method 2 - You get the JSON Object and convert to string
        json_obj = {"id":"device3","timestamp":"2016-01-18T01:17:00Z"}
        body2= json.dumps(json_obj)
        event_data_batch.add(EventData(body2))


        #This just sending the string which will not be captured by TSI
        event_data_batch.add(EventData('Third event'))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)


loop = asyncio.get_event_loop()
loop.run_until_complete(run())

输出:

enter image description here

相关问题 更多 >