def get_col_types(df):
'''
Helper function to create/modify Snowflake tables; gets the column and dtype pair for each item in the dataframe
args:
df: dataframe to evaluate
'''
import numpy as np
# get dtypes and convert to df
ct = df.dtypes.reset_index().rename(columns={0:'col'})
ct = ct.apply(lambda x: x.astype(str).str.upper()) # case matching as snowflake needs it in uppers
# only considers objects at this point
# only considers objects and ints at this point
ct['col'] = np.where(ct['col']=='OBJECT', 'VARCHAR', ct['col'])
ct['col'] = np.where(ct['col'].str.contains('DATE'), 'DATETIME', ct['col'])
ct['col'] = np.where(ct['col'].str.contains('INT'), 'NUMERIC', ct['col'])
ct['col'] = np.where(ct['col'].str.contains('FLOAT'), 'FLOAT', ct['col'])
# get the column dtype pair
l = []
for index, row in ct.iterrows():
l.append(row['index'] + ' ' + row['col'])
string = ', '.join(l) # convert from list to a string object
string = string.strip()
return string
def create_table(table, action, col_type, df):
'''
Function to create/replace and append to tables in Snowflake
args:
table: name of the table to create/modify
action: whether do the initial create/replace or appending; key to control logic
col_type: string with column name associated dtype, each pair separated by a comma; comes from get_col_types() func
df: dataframe to load
dependencies: function get_col_types(); helper function to get the col and dtypes to create a table
'''
import pandas as pd
import snowflake.connector as snow
from snowflake.connector.pandas_tools import write_pandas
from snowflake.connector.pandas_tools import pd_writer
database=database
warehouse=warehouse
schema=schema
# set up connection
conn = snow.connect(
account = ACCOUNT,
user = USER,
password = PW,
warehouse = warehouse,
database = database,
schema = schema,
role = ROLE)
# set up cursor
cur = conn.cursor()
if action=='create_replace':
# set up execute
cur.execute(
""" CREATE OR REPLACE TABLE
""" + table +"""(""" + col_type + """)""")
#prep to ensure proper case
df.columns = [col.upper() for col in df.columns]
# write df to table
write_pandas(conn, df, table.upper())
elif action=='append':
# convert to a string list of tuples
df = str(list(df.itertuples(index=False, name=None)))
# get rid of the list elements so it is a string tuple list
df = df.replace('[','').replace(']','')
# set up execute
cur.execute(
""" INSERT INTO """ + table + """
VALUES """ + df + """
""")
工作示例:
# create df
l1 = ['cats','dogs','frogs']
l2 = [10, 20, 30]
df = pd.DataFrame(zip(l1,l2), columns=['type','age'])
col_type = get_col_types(df)
create_table('table_test', 'create_replace', col_type, df)
# now that the table is created, append to it
l1 = ['cow','cricket']
l2 = [45, 20]
df2 = pd.DataFrame(zip(l1,l2), columns=['type','age'])
append_table('table_test', 'append', None, df2)
Windows 10、Python 3.9.4、雪花连接器Python 2.4.2、熊猫1.1.5
我对write\u函数也有同样的问题
我对Snowflake有accountadmin特权。下面是Python代码和错误回溯
但是,如果我要显式编写CSV文件,我可以使用以下两个功能从CSV文件上载数据:
因此,它肯定是一款具有写入功能的产品
write_pandas()
不会自动创建表。如果事先不存在该表,则需要自己创建该表。对于每次运行write_pandas()
,它只会将数据帧附加到指定的表中另一方面,如果您使用
df.to_sql(..., method=pd_writer)
将pandas数据帧写入snowflake,它将自动为您创建表,并且如果表已经存在,您可以使用to_sql()
中的if_exists
指定不同的行为-追加、替换或失败我有一个相当不雅观的解决方案,可以在不离开Jupyter的情况下完成表的创建和附加工作
我将此代码保存在sql实用程序文件中。get_col_types函数将创建创建表所需的col名称和数据类型的字典
工作示例:
相关问题 更多 >
编程相关推荐