我正在使用python和pyodbc自动化一些查询提取,然后转换成拼花格式,并发送到AWSS3
到目前为止,我的脚本解决方案运行良好,但我遇到了一个问题。我有一个模式,我们称之为模式a,它里面有几个表,表1,表2。。。。表11
该架构中的所有表都可以使用相同的凭据进行访问
因此,我使用这样的脚本来自动化任务
def get_stream(cursor, batch_size=100000):
while True:
row = cursor.fetchmany(batch_size)
if row is None or not row:
break
yield row
cnxn = pyodbc.connect(driver='pyodbc driver here',
host='host name',
database='schema name',
user='user name,
password='password')
print('Connection stabilished ...')
cursor = cnxn.cursor()
print('Initializing cursos ...')
if len(sys.argv) > 1:
table_name = sys.argv[1]
cursor.execute('SELECT * FROM {}'.format(table_name))
else:
exit()
print('Query fetched ...')
row_batch = get_stream(cursor)
print('Getting Iterator ...')
cols = cursor.description
cols = [col[0] for col in cols]
print('Initalizin batch data frame ..')
df = pd.DataFrame(columns=cols)
start_time = time.time()
for rows in row_batch:
tmp = pd.DataFrame.from_records(rows, columns=cols)
df = df.append(tmp, ignore_index=True)
tmp = None
print("--- Batch inserted inn%s seconds ---" % (time.time() - start_time))
start_time = time.time()
我运行了一个类似于Airflow tasks中的代码,对于所有其他表都可以正常工作。但是我有两个表,让我们调用表I和表II,当我执行cursor.fetchmany(batch_size)
时会产生以下错误:
ERROR - ('ODBC SQL type -151 is not yet supported. column-index=16 type=-151', 'HY106')
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1310, in _execute_task
result = task_copy.execute(context=context)
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/home/ubuntu/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 128, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/ubuntu/prea-ninja-airflow/jobs/plugins/extract/fetch.py", line 58, in fetch_data
for rows in row_batch:
File "/home/ubuntu/prea-ninja-airflow/jobs/plugins/extract/fetch.py", line 27, in stream
row = cursor.fetchmany(batch_size)
使用SQLElectron检查这些表,并查询前几行,当我使用SQL server语言查找该列的数据类型时,我意识到表_I和表_II都有一个名为“GeoLocalizao”的列:
SELECT DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = 'TABLE_I' AND
COLUMN_NAME = 'Geolocalizacao';
它产生:
DATA_TYPE
geography
在这里搜索堆栈溢出时,我找到了这个解决方案:python pyodbc SQL Server Native Client 11.0 cannot return geometry column
根据用户的描述,添加以下内容似乎效果良好:
def unpack_geometry(raw_bytes):
# adapted from SSCLRT information at
# https://docs.microsoft.com/en-us/openspecs/sql_server_protocols/ms-ssclrt/dc988cb6-4812-4ec6-91cd-cce329f6ecda
tup = struct.unpack('<i2b3d', raw_bytes)
# tup contains: (unknown, Version, Serialization_Properties, X, Y, SRID)
return tup[3], tup[4], tup[5]
然后:
cnxn.add_output_converter(-151, unpack_geometry)
在创建连接之后。但它不适用于地理数据类型,当我使用此代码(在python脚本上添加import struct
)时,它会给出以下错误:
Traceback (most recent call last):
File "benchmark.py", line 79, in <module>
for rows in row_batch:
File "benchmark.py", line 39, in get_stream
row = cursor.fetchmany(batch_size)
File "benchmark.py", line 47, in unpack_geometry
tup = struct.unpack('<i2b3d', raw_bytes)
struct.error: unpack requires a buffer of 30 bytes
此列具有的值示例遵循给定的模板:
{"srid":4326,"version":1,"points":[{}],"figures":[{"attribute":1,"pointOffset":0}],"shapes":[{"parentOffset":-1,"figureOffset":0,"type":1}],"segments":[]}
老实说,我不知道如何使代码适应这个给定的结构,有人能帮我吗?其他所有的表都很好用,但是我有两个表和这个专栏,每个表都给了我很多帮助
目前没有回答
相关问题 更多 >
编程相关推荐