我有一段Python代码,它使用SQLAlchemy对bulk_insert_mappings进行批插入。我需要加载的数据集可能有重复的值,因此,如果有多个进程执行并行导入,如何最好地处理这种情况?单批插入非常简单,我只需保留要添加到集合中的密钥集合,并且使用此集合不会多次添加一个密钥:
for line in fs.open(json_path[3:]):
# get dictionary from JSON line
json_dict = json.loads(line)
# get the values we'll store in the database columns
id_hash = json_dict['key']['AddressKeyHash']
address1 = json_dict['address']['Address1']
if id_hash not in distinct_hashes:
distinct_hashes.add(id_hash)
# hold a dictionary mapping columns to values for each row
row_mapping = {
"id": id_hash,
"address1": address1,
}
mappings.append(row_mapping)
count += 1
# if we've reached the batch size (count is even divisible)
# then we save the batch
if (count % batch_size) == 0:
self.session.bulk_insert_mappings(Address, mappings)
self.session.commit()
# clear the mappings list for the next batch
mappings = []
if len(mappings) > 0:
self.session.bulk_insert_mappings(Address, mappings)
self.session.commit()
如果我并行使用上面的方法,那么就无法在并行进程之间通信哪些键已经被添加。因此,我想知道是否有一种方法可以添加一个设置,允许如何处理重复插入,而不是简单的故障/错误/崩溃。例如,如何指定“重复时忽略”或“重复时更新”行为
我正在使用Python和SQLAlchemy。每个作业都将在AWSEC2集群上运行,并执行单个脚本(上面大部分)来执行从S3上JSON文件中的数据到Aurora(Postgres)集群的批插入
也许有一种方法可以利用多处理来实现这一点,利用多个内核而不是多个批处理/EC2作业,并以某种方式利用shared memory object来允许批处理插入过程进行通信?如果没有,那么可能有什么地方允许重复插入被忽略或视为更新
目前没有回答
相关问题 更多 >
编程相关推荐