从新的dict列表更新有序的dict列表(优先级合并)

2024-10-02 04:35:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我现在面临着必须半定期地更新(同步)来自规范变更源的大量dict列表,同时维护我自己的更新。非标准合并,最简单的描述是probably:- 你知道吗

  • A是我自己的dict列表(由我的程序更新以包含缓存值作为附加键)
  • b是从一个源定期发送的信息(a最初与b相同)。它包含一些键,但不是我添加到a的缓存值
  • keys = ['key1', 'key2']是a和b都拥有的键的列表(a拥有的键比那多)
  • mkey = 'mtime'是a和b都有的一个特殊键,它指示我应该使a的缓存值无效

基本上,如果A中的dict与b中的dict匹配,我应该将dict保存在一个unclerb['mtime'] > A['mtime']中。如果一个dict出现在A但不在b中,我就把它去掉,而如果它出现在b但不在A中,我就把它加到A

我的圣杯目标是在A中不丢失任何缓存的键值对,但我在实现这一点上遇到了困难。我当前的解决方案看起来像this:- 你知道吗

def priority_merge(A, b, keys, mkey):
    retval = []
    b_index = 0
    for elemA in A:
        if b_index >= len(b):
            break  # No more items in b
        elemb = b[b_index]
        minA = { k: elemA[k] for k in keys }
        minb = { k: elemb[k] for k in keys }
        if minA == minb:  # Found a match
            if elemA[mkey] >= elemb[mkey]:
                retval.append(elemA)
            else:  # Check mkey to see if take b instead
                retval.append(elemb)
            b_index = b_index + 1
        else:  # No match, check forward by one
            if b_index+1 >= len(b):
                continue
            elembplus = b[b_index+1]
            minb = { k: elembplus[k] for k in keys}
            if minA == minb:
                retval.append(elemb)  # This is a new element
                if elemA[mkey] >= elembplus[mkey]:
                    retval.append(elemA)
                else:
                    retval.append(elembplus)
                b_index = b_index + 2
    if b_index <= len(b):
        retval.extend(b[b_index:])
    return retval

只要一行中没有超过一个添加和/或删除(b相对于A),这个方法就可以正常工作。所以如果A包含1,2,3,5和b包含1,2,3,4,5就可以了,但是如果A包含1,2,5和b包含1,2,3,4,5这就崩溃了

我可以做一个检查,直到len(b)在else case下被注释为# No match, check forward by one,或者首先遍历Ab来映射匹配的元素,然后基于该映射再次遍历以创建retval。不过,这似乎很容易出错(我肯定它的逻辑是可行的,但我也相当肯定我为它编写的代码会有bug)。请推荐一个合适的算法来解决这个问题,不管是我的两个想法还是别的什么


Tags: inforindexlenifpluskeyselse
1条回答
网友
1楼 · 发布于 2024-10-02 04:35:16

正如我所说的,hash方法可以帮助您确保比较,仅基于keys列表,您将能够找到交集元素(要合并的元素)和差异元素

class HashedDictKey(dict):

    def __init__(self, keys_, **kwargs):
        super().__init__(**kwargs)
        self.keys_ = keys_

    def __hash__(self):
        return hash(tuple(sorted((k, self.get(k)) for k in self.keys_)))

    def __eq__(self, other):
        return  hash(self) == hash(other)

def merge(A, B):

    to_be_added = []
    to_be_del = []
    to_be_updated = []

    def get(obj, it):
        for i in it:
            if obj == i:
                return i
        raise ValueError("No %s value" % obj)

    for a, b in zip_longest(A, B):
        if a in B:
            to_be_updated.append(a)
        if a not in B:
            to_be_del.append(a)
        if b not in A:
            to_be_added.append(b)

    for i in to_be_del:
        A.remove(i)

    for j in to_be_added:
        A.append(j)

    for i in to_be_updated:
        a = get(i, A)
        b = get(i, B)
        if b['mtime'] > a['mtime']:
            A.remove(a)

here the complete snippet

相关问题 更多 >

    热门问题