是否可以在一个线程中附加到数据帧,并在另一个线程中对其执行操作?

2024-04-24 15:38:57 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在构建一个脚本,从json/REST流接收数据,然后将其添加到数据库中。我想构建一个缓冲区,从流中收集数据并存储它,直到它成功插入数据库。你知道吗

其思想是,一个线程将数据从api流到dataframe,另一个线程将尝试将数据提交到数据库中,一旦成功地将数据项插入到数据库中,就从dataframe中删除它们。你知道吗

我编写了以下代码来测试这个概念-唯一的问题是,它不起作用!你知道吗

import threading
from threading import Thread
import pandas as pd
import numpy as np
import time
from itertools import count

# set delay
d=5

# add items to dataframe every few seconds
def appender():
    testdf = pd.DataFrame([])
    print('starting streamsim')
    for i in count():
        testdf1 = pd.DataFrame(np.random.randint(0,100,size=(np.random.randint(0,25), 4)), columns=list('ABCD'))
        testdf = testdf.append(testdf1)
        print('appended')
        print('len is now {0}'.format(len(testdf)))
        return testdf
        time.sleep(np.random.randint(0,5))

# combine the dfs, and operate on them
def dumper():
    print('starting dumpsim')
    while True:
        # check if there are values in the df
        if len(testdf.index) > 0:
            print('searching for values')
            for index, row in testdf.iterrows():
                if row['A'] < 10:
                    testdf.drop(index, inplace=True)
                    print('val dropped')
                else:
                    print('no vals found')

            # try to add rows to csv to simulate sql insert command
            for index, row in testdf.iterrows():
                # try to append to csv
                try:
                    with open('footest.csv', 'a') as f:
                        row.to_csv(f, mode= 'a', header=True)

                except:
                    print('append operation failed, skipping')
                    pass

                #if operation succeeds, drop the row
                else:
                    testdf.drop(index)
                    print('row dropped after success')

        if len(testdf.index) == 0:
            print('df is empty')
            pass

        time.sleep(d)

if __name__ == '__main__':
    Thread(target = appender).start()
    Thread(target = dumper).start()

有没有办法让这一切顺利?或者当一个线程正在处理数据帧时,它是否被“锁定”?你知道吗


Tags: csvto数据inimport数据库forindex