有没有一种方法可以使用pandas从redis服务器读取数据?

2024-05-23 22:06:30 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在进行一个IoT设备项目,这些设备连接到Azure云中托管的dotnet服务器。我目前正在使用for循环读取实时数据,但希望使用Pandas从Redis数据库读取一些实时统计数据。有人能给我解释一下怎么开始吗

使用下面的脚本读取统计信息,但希望开始使用pandas

import os
import re
import json
import traceback
from collections import Counter
import time
import datetime as dt
import redis
from tqdm import tqdm  # taqadum (تقدّم) == progress
from jsonpointer import resolve_pointer as j_get 
from jsonpointer import JsonPointerException
import pandas as pd

os.system("color 0c")  # change console color to red

if False:
    # x Redis
    r = redis.Redis(host="****.redis.cache.windows.net", 
                    port=***,
                    password="***",
                    ssl=True,)
else:
    # y Redis
    r = redis.Redis(host="***.redis.cache.windows.net", 
                    port=****,
                    password="*****",
                    ssl=True,)


print(r.info())
print("Server started at: ", end="")
print(dt.datetime.now() - dt.timedelta(seconds=r.info()['uptime_in_seconds']))

print("Building pipe")
pipe = r.pipeline()
# for key in tqdm(r.scan_iter("MC:SessionInfo*")):
for key in tqdm(r.scan_iter("MC:SessionInfo*", count=2500)):
    pipe.hgetall(key)

print("Executing pipe")
responses = pipe.execute()
print("Processing effluvia")


q = {}
k={}
first = True
last_contact = {}
for data in tqdm(responses):
    try:
        j = json.loads(data[b'LastStatusBody'])
        serial = j['System']['Serial'].lower()
     
        q[serial] = j
        last_contact[serial] = time.time() - int(data[b'LastContact'])
        # TODO: json searching sensibly!
        vac[serial] = j['LiveA']['Unit']['Volatge_Vac']
    except:
        if first:
            traceback.print_exc()
            first = False
        else:
            pass

for key,value in fw_versions.items():
    if value.split(',')[0]=="xx v1.0.0.0":
        x_paired.append(key)
print(x_paired)
print("Total paired :", len(x_paired))`

与上述过程不同,我们希望从熊猫开始,轻松读取数据,并为团队的日常更新制作一些图表


Tags: keyinfromimportredisjsonfortime
1条回答
网友
1楼 · 发布于 2024-05-23 22:06:30

我序列化/反序列化为pyarrowpickle,然后使用附加键作为元数据。这适用于本地、GCloud、AWS EB和Azure

import pandas as pd
import pyarrow as pa, os
import redis,json, os, pickle
import ebutils
from logenv import logenv
from pandas.core.frame import DataFrame
from redis.client import Redis
from typing import (Union, Optional)


class mycache():
    __redisClient:Redis
    CONFIGKEY = "cacheconfig"

    def __init__(self) -> None:
        try:
            ep = os.environ["REDIS_HOST"]
        except KeyError:
            if os.environ["HOST_ENV"] == "GCLOUD":
                os.environ["REDIS_HOST"] = "redis://10.0.0.3"
            elif os.environ["HOST_ENV"] == "EB":
                os.environ["REDIS_HOST"] = "redis://" + ebutils.get_redis_endpoint()
            elif os.environ["HOST_ENV"] == "AZURE":
                #os.environ["REDIS_HOST"] = "redis://ignore:password@redis-sensorvenv.redis.cache.windows.net"
                pass # should be set in azure env variable
            elif os.environ["HOST_ENV"] == "LOCAL":
                os.environ["REDIS_HOST"] = "redis://127.0.0.1"
            else:
                raise "could not initialise redis"
                return # no known redis setup

        #self.__redisClient = redis.Redis(host=os.environ["REDIS_HOST"])
        self.__redisClient = redis.Redis.from_url(os.environ["REDIS_HOST"])
        self.__redisClient.ping()
        # get config as well...
        self.config = self.get(self.CONFIGKEY)
        if self.config is None:
            self.config = {"pyarrow":True, "pickle":False}
            self.set(self.CONFIGKEY, self.config)
        self.alog = logenv.alog()

    def redis(self) -> Redis:
        return self.__redisClient


    def exists(self, key:str) -> bool:
        if self.__redisClient is None:
            return False

        return self.__redisClient.exists(key) == 1

    def get(self, key:str) -> Union[DataFrame, str]:
        keytype = "{k}.type".format(k=key)
        valuetype = self.__redisClient.get(keytype)
        if valuetype is None:
            if (key.split(".")[-1] == "pickle"):
                return pickle.loads(self.redis().get(key))
            else:
                ret = self.redis().get(key)
                if ret is None:
                    return ret
                else:
                    return ret.decode()
        elif valuetype.decode() == str(pd.DataFrame):
            # fallback to pickle serialized form if pyarrow fails
            # https://issues.apache.org/jira/browse/ARROW-7961
            try:
                return pa.deserialize(self.__redisClient.get(key))
            except pa.lib.ArrowIOError as err:
                self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
                return pickle.loads(self.redis().get(f"{key}.pickle"))
            except OSError as err:
                if "Expected IPC" in str(err):
                    self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
                    return pickle.loads(self.redis().get(f"{key}.pickle"))
                else:
                    raise err

        elif valuetype.decode() == str(type({})):
            return json.loads(self.__redisClient.get(key).decode())
        else:
            return self.__redisClient.get(key).decode() # type: ignore

    def set(self, key:str, value:Union[DataFrame, str]) -> None:
        if self.__redisClient is None:
            return
        keytype = "{k}.type".format(k=key)

        if str(type(value)) == str(pd.DataFrame):
            self.__redisClient.set(key, pa.serialize(value).to_buffer().to_pybytes())
            if self.config["pickle"]:
                self.redis().set(f"{key}.pickle", pickle.dumps(value))
                # issue should be transient through an upgrade....
                # once switched off data can go away
                self.redis().expire(f"{key}.pickle", 60*60*24)
        elif str(type(value)) == str(type({})):
            self.__redisClient.set(key, json.dumps(value))
        else:
            self.__redisClient.set(key, value)

        self.__redisClient.set(keytype, str(type(value)))


if __name__ == '__main__':
    os.environ["HOST_ENV"] = "LOCAL"
    r = mycache()
    rr = r.redis()
    for k in rr.keys("cache*"):
        print(k.decode(), rr.ttl(k))
        print(rr.get(k.decode()))

相关问题 更多 >