Python中处理大量记录的时空查询
我有一个包含60万个x/y坐标点的数据表,这些点还有日期和时间的信息,另外还有一个叫做“状态”的字段,里面有额外的描述信息。
我的目标是,对于每一条记录:
- 在一定的时间和空间范围内,统计“状态”这一列的值。
这个特定的范围是时间上在t之前8小时以内,并且距离小于100米。
目前我把数据放在一个pandas数据框里。
我可以遍历每一行数据,对于每一条记录,筛选出感兴趣的日期,然后计算距离,再进一步限制选择。不过这样做对于这么多记录来说,速度还是很慢。
- 这样运行需要4.4个小时。
我发现可以创建一个三维的kdtree,使用x、y和日期(以时间戳的形式)来表示。不过,我不太确定如何在结合日期和地理距离时,正确限制距离。
这里有一些可以让你们测试的代码:
导入
import numpy.random as npr
import numpy
import pandas as pd
from pandas import DataFrame, date_range
from datetime import datetime, timedelta
创建数据
np.random.seed(111)
生成测试数据的函数
def CreateDataSet(Number=1):
Output = []
for i in range(Number):
# Create a date range with hour frequency
date = date_range(start='10/1/2012', end='10/31/2012', freq='H')
# Create long lat data
laty = npr.normal(4815862, 5000,size=len(date))
longx = npr.normal(687993, 5000,size=len(date))
# status of interest
status = [0,1]
# Make a random list of statuses
random_status = [status[npr.randint(low=0,high=len(status))] for i in range(len(date))]
# user pool
user = ['sally','derik','james','bob','ryan','chris']
# Make a random list of users
random_user = [user[npr.randint(low=0,high=len(user))] for i in range(len(date))]
Output.extend(zip(random_user, random_status, date, longx, laty))
return pd.DataFrame(Output, columns = ['user', 'status', 'date', 'long', 'lat'])
#Create data
data = CreateDataSet(3)
len(data)
#some time deltas
before = timedelta(hours = 8)
after = timedelta(minutes = 1)
加速的函数
def work(df):
output = []
#loop through data index's
for i in range(0, len(df)):
l = []
#first we will filter out the data by date to have a smaller list to compute distances for
#create a mask to query all dates between range for date i
date_mask = (df['date'] >= df['date'].iloc[i]-before) & (df['date'] <= df['date'].iloc[i]+after)
#create a mask to query all users who are not user i (themselves)
user_mask = df['user']!=df['user'].iloc[i]
#apply masks
dists_to_check = df[date_mask & user_mask]
#for point i, create coordinate to calculate distances from
a = np.array((df['long'].iloc[i], df['lat'].iloc[i]))
#create array of distances to check on the masked data
b = np.array((dists_to_check['long'].values, dists_to_check['lat'].values))
#for j in the date queried data
for j in range(1, len(dists_to_check)):
#compute the ueclidean distance between point a and each point of b (the date masked data)
x = np.linalg.norm(a-np.array((b[0][j], b[1][j])))
#if the distance is within our range of interest append the index to a list
if x <=100:
l.append(j)
else:
pass
try:
#use the list of desired index's 'l' to query a final subset of the data
data = dists_to_check.iloc[l]
#summarize the column of interest then append to output list
output.append(data['status'].sum())
except IndexError, e:
output.append(0)
#print "There were no data to add"
return pd.DataFrame(output)
运行代码并计时
start = datetime.now()
out = work(data)
print datetime.now() - start
有没有办法以向量化的方式来完成这个查询?还是说我应该尝试其他的技术。
<3
1 个回答
0
这里有一些方法可以部分解决我的问题。因为这个循环可以独立地处理数据的不同部分,所以在这里使用并行处理是有意义的。
使用 Ipython...
from IPython.parallel import Client
cli = Client()
cli.ids
cli = Client()
dview=cli[:]
with dview.sync_imports():
import numpy as np
import os
from datetime import timedelta
import pandas as pd
#We also need to add the time deltas and output list into the function as
#local variables as well as add the Ipython.parallel decorator
@dview.parallel(block=True)
def work(df):
before = timedelta(hours = 8)
after = timedelta(minutes = 1)
output = []
最终时间 1:17:54.910206,大约是原来时间的四分之一
我仍然很希望有人能在这个函数的内部提出一些小的速度提升建议。