PySpark:从Oracle表中选择一个值,然后添加到其中

2024-04-20 13:52:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用PySpark将行从Oracle加载到AWS。 我一次抓取10000行,然后存储加载的max seq_id,并将其用于下一个范围

我正试图在PySpark中这样做,但我想不出来。有人能帮我找一个有用的培训资源吗?我已尝试将输出强制转换为Int。我尝试了select.collect[0][0],但也遇到了一个错误。我是PySpark的新手,非常感谢您的帮助

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

def oracle_read(user,pwd,hostname,port,service_name,table_name):
    url = 'jdbc:oracle:thin:'+user+'/'+pwd+'@//'+hostname+':'+port+'/'+service_name
    result = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable",table_name) \
    .option("user", user) \
    .option("password", pwd) \
    .option("driver", "oracle.jdbc.driver.OracleDriver") \
    .load() 
    result = result.toDF(* [c.lower() for c in result.columns])
    return result
    
max_seq_qry = """(SELECT max_val FROM data_owner.tbl_max_seq_load WHERE table_name = 'TBL_A')"""
max_seq = oracle_read(oracle_user,oracle_pass,oracle_host,oracle_port,oracle_service,max_seq_qry)
min_seq = max_seq + 1
max_seq = max_seq + 10000

我得到了以下错误:

TypeError: unsupported operand type(s) for +: 'DataFrame' and 'int'
NameError: name 'IntegerType' is not defined
TypeError: 'instancemethod' object has no attribute '__getitem__'

Tags: namereadportconfpwddynamicresultseq
2条回答

您的函数oracle_read返回一个数据帧(result),您正试图增加它(向其中添加一个),这是不可能的,因此会出现错误

在您的例子中,您只从数据库中获取一列“max_val”,并且可能是第一个匹配项,因此您可以选择此列并将第一个值作为max_seq['max_val'].values[0]

因此,您可以将代码重写为

max_seq = oracle_read(oracle_user,oracle_pass,oracle_host,oracle_port,oracle_service,max_seq_qry)
max_seq = int(max_seq['max_val'].values[0]) + 1

请检查是否有table_name='TBL_A'的行,如果没有行,请尝试添加一个NVL(max_val,0),看看是否有效

与其将序列存储在表中,不如使用Oracle sequence,因为它在多用户环境中更具可扩展性和帮助性

谢谢

相关问题 更多 >