如何使用“upsert”使用SQLAlchemy执行批插入,以便忽略(或更新)现有密钥,并且不会使执行崩溃?

2024-06-01 08:38:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一段Python代码,它使用SQLAlchemy对bulk_insert_mappings进行批插入。我需要加载的数据集可能有重复的值,因此,如果有多个进程执行并行导入,如何最好地处理这种情况?单批插入非常简单,我只需保留要添加到集合中的密钥集合,并且使用此集合不会多次添加一个密钥:

            for line in fs.open(json_path[3:]):
                # get dictionary from JSON line
                json_dict = json.loads(line)

                # get the values we'll store in the database columns
                id_hash = json_dict['key']['AddressKeyHash']
                address1 = json_dict['address']['Address1']

                if id_hash not in distinct_hashes:
                    distinct_hashes.add(id_hash)

                    # hold a dictionary mapping columns to values for each row
                    row_mapping = {
                        "id": id_hash,
                        "address1": address1,
                    }
                    mappings.append(row_mapping)
                    count += 1

                # if we've reached the batch size (count is even divisible)
                # then we save the batch
                if (count % batch_size) == 0:
                    self.session.bulk_insert_mappings(Address, mappings)
                    self.session.commit()

                    # clear the mappings list for the next batch
                    mappings = []

            if len(mappings) > 0:
                self.session.bulk_insert_mappings(Address, mappings)
                self.session.commit()

如果我并行使用上面的方法,那么就无法在并行进程之间通信哪些键已经被添加。因此,我想知道是否有一种方法可以添加一个设置,允许如何处理重复插入,而不是简单的故障/错误/崩溃。例如,如何指定“重复时忽略”或“重复时更新”行为

我正在使用Python和SQLAlchemy。每个作业都将在AWSEC2集群上运行,并执行单个脚本(上面大部分)来执行从S3上JSON文件中的数据到Aurora(Postgres)集群的批插入

也许有一种方法可以利用多处理来实现这一点,利用多个内核而不是多个批处理/EC2作业,并以某种方式利用shared memory object来允许批处理插入过程进行通信?如果没有,那么可能有什么地方允许重复插入被忽略或视为更新


Tags: theinselfidjsonforifsession