<p>这是一个<a href="https://en.wikipedia.org/wiki/Travelling_salesman_problem" rel="nofollow noreferrer">Travelling Salesman Problem</a>,一个众所周知的难以优化解决的问题。给出的代码在大约15分钟内解决了10000个简单互连节点(即每个节点有一个或两个关系)的问题。对于互连更为丰富的序列,它的性能较差。下面的测试结果对此进行了探讨</p>
<p>OP提到的优先权的概念没有被探讨</p>
<p>给出的代码包括一个<a href="https://en.wikipedia.org/wiki/Heuristic" rel="nofollow noreferrer">heuristic</a>解决方案、一个对大型<code>node_set</code>来说是最佳但不实用的蛮力解决方案,以及一些简单但可扩展的测试数据生成器,其中一些具有已知的最佳解决方案。启发式为OP的“ABC”示例(我自己的8项<code>node_set</code>)和已知最佳解决方案的可伸缩测试数据找到最佳解决方案</p>
<p>如果性能不够好,至少这是第一次尝试,并且开始了一个测试“研讨会”,以改进启发式</p>
<h2>测试结果</h2>
<pre><code>>>> python sortbylinks.py
===============================
"ABC" (sequence length 3)
===============================
Working...
...choosing seed candidates
Finding heuristic candidates...
Number of candidates with top score: 3
[('A', 'B', 'C'), ('C', 'A', 'B'), ('B', 'A', 'C')], ...and more
Top score: 1
"optimise_order" function took 0.0s
Optimal quality: 1
===============================
"Nick 8-digit" (sequence length 8)
===============================
Working...
...choosing seed candidates
Finding heuristic candidates...
Number of candidates with top score: 1
[('A', 'E', 'F', 'C', 'D', 'G', 'B', 'H')], ...and more
Top score: 6
"optimise_order" function took 0.0s
Optimal quality: 6
</code></pre>
<p>简短、相对琐碎的案例似乎没有问题</p>
<pre><code>===============================
"Quality <1000, cycling subsequence, small number of relations" (sequence length 501)
===============================
Working...
...choosing seed candidates
Finding heuristic candidates...
Number of candidates with top score: 3
[('AAAC', 'AAAL', 'AAAU', ...), ...], ...and more
Top score: 991
"optimise_order" function took 2.0s
Optimal quality: 991
===============================
"Quality <1000, cycling subsequence, small number of relations, shuffled" (sequence length 501)
===============================
Working...
...choosing seed candidates
Finding heuristic candidates...
Number of candidates with top score: 3
[('AADF', 'AAKM', 'AAOZ', ...), ...], ...and more
Top score: 991
"optimise_order" function took 2.0s
Optimal quality: 991
</code></pre>
<p>这个<code>"Quality <1000, cycling subsequence" (sequence length 501)</code>很有趣。通过使用<code>{0, 1}</code>关系集对节点进行分组,质量分数几乎可以增加一倍。启发式算法找到这个最优序列。质量1000是不太可能的,因为这些双链接组需要每隔一段时间通过单个链接节点相互连接(例如<code>{'AA': {0, 1}, 'AB': {0, 1}, ..., 'AZ': {0, 1}, <...single link here...> 'BA': {1, 2}, 'BB': {1, 2}, ...}</code>)</p>
<p>对于每个节点关系很少的测试数据,性能仍然很好</p>
<pre><code>"Quality 400, single unbroken chain, initial solution is optimal" (sequence length 401)
===============================
Working...
Finding heuristic candidates...
Number of candidates with top score: 1
[('AAAA', 'AAAB', 'AAAC', ...), ...], ...and more
Top score: 400
"optimise_order" function took 0.0s
Optimal quality: 400
===============================
"Quality 400, single unbroken chain, shuffled" (sequence length 401)
===============================
Working...
...choosing seed candidates
Finding heuristic candidates...
Number of candidates with top score: 1
[('AAAA', 'AAAB', 'AAAC', ...), ...], ...and more
Top score: 400
"optimise_order" function took 0.0s
Optimal quality: 400
</code></pre>
<p>旅行推销员问题(TSP)的困难之一是知道何时有一个最优解。即使从接近最优或最优的开始,启发式算法似乎也不会更快地收敛</p>
<pre><code>===============================
"10,000 items, single unbroken chain, initial order is optimal" (sequence length 10001)
===============================
Working...
Finding heuristic candidates...
Number of candidates with top score: 1
[('AOUQ', 'AAAB', 'AAAC', ...), ...], ...and more
Top score: 10002
"optimise_order" function took 947.0s
Optimal quality: 10000
</code></pre>
<p>当关系数量非常少时,即使有许多节点,性能也相当好,结果可能接近最优</p>
<pre><code>===============================
"Many random relations per node (0..n), n=200" (sequence length 200)
===============================
Working...
...choosing seed candidates
Finding heuristic candidates...
Number of candidates with top score: 1
[('AAEO', 'AAHC', 'AAHQ', ...), ...], ...and more
Top score: 6861
"optimise_order" function took 94.0s
Optimal quality: ?
===============================
"Many random relations per node (0..n), n=500" (sequence length 500)
===============================
Working...
...choosing seed candidates
Finding heuristic candidates...
Number of candidates with top score: 1
[('AAJT', 'AAHU', 'AABT', ...), ...], ...and more
Top score: 41999
"optimise_order" function took 4202.0s
Optimal quality: ?
</code></pre>
<p>这更像是OP生成的数据,也更像是经典的旅行商问题(TSP),其中每个城市对(对于“城市”读取“节点”)之间有一组距离,并且节点通常彼此连接紧密。在这种情况下,节点之间的链接是部分的,因此无法保证任何2个节点之间的链接</p>
<p>在这种情况下,启发式算法的时间性能要差得多。对于<code>n</code>节点,每个节点的随机关系介于0和<code>n</code>之间。这可能意味着更多的互换组合产生更好的质量,互换和质量检查本身成本更高,在启发式收敛到最佳结果之前需要更多的过程。在最坏的情况下,这可能意味着O(n^3)</p>
<p>随着节点和关系数量的增加,性能会下降(请注意<code>n=200</code>3分钟和<code>n=500</code>70分钟之间的差异)。因此,目前启发式算法可能不适用于数千个高度互连的节点</p>
<p>此外,由于暴力解决方案在计算上是不可行的,因此无法准确地知道该测试结果的质量<code>6861 / 200 = 34.3</code>和<code>41999 / 500 = 84.0</code>节点对之间的平均连接看起来并不太遥远</p>
<h2>启发式和蛮力求解器代码</h2>
<pre class="lang-py prettyprint-override"><code>import sys
from collections import deque
import itertools
import random
import datetime
# TODO: in-place swapping? (avoid creating copies of sequences)
def timing(f):
"""
decorator for displaying execution time for a method
"""
def wrap(*args):
start = datetime.datetime.now()
f_return_value = f(*args)
end = datetime.datetime.now()
print('"{:s}" function took {:.1f}s'.format(f.__name__, (end-start).seconds))
return f_return_value
return wrap
def flatten(a):
# e.g. [a, [b, c], d] -> [a, b, c, d]
return itertools.chain.from_iterable(a)
class LinkAnalysis:
def __init__(self, node_set, max_ram=100_000_000, generate_seeds=True):
"""
:param node_set: node_ids and their relation sets to be arranged in sequence
:param max_ram: estimated maximum RAM to use
:param generate_seeds: if true, attempt to generate some initial candidates based on sorting
"""
self.node_set = node_set
self.candidates = {}
self.generate_seeds = generate_seeds
self.seeds = {}
self.relations = []
# balance performance and RAM using regular 'weeding'
candidate_size = sys.getsizeof(deque(self.node_set.keys()))
self.weed_interval = max_ram // candidate_size
def create_initial_candidates(self):
print('Working...')
self.generate_seed_from_presented_sequence()
if self.generate_seeds:
self.generate_seed_candidates()
def generate_seed_from_presented_sequence(self):
"""
add initially presented order of nodes as one of the seed candidates
this is worthwhile because the initial order may be close to optimal
"""
presented_sequence = self.presented_sequence()
self.seeds[tuple(self.presented_sequence())] = self.quality(presented_sequence)
def presented_sequence(self) -> list:
return list(self.node_set.keys()) # relies on Python 3.6+ to preserve key order in dicts
def generate_seed_candidates(self):
initial_sequence = self.presented_sequence()
# get relations from the original data structure
relations = sorted(set(flatten(self.node_set.values())))
# sort by lowest precedence relation first
print('...choosing seed candidates')
for relation in reversed(relations):
# use true-false ordering: in Python, True > False
initial_sequence.sort(key=lambda sortkey: not relation in self.node_set[sortkey])
sq = self.quality(initial_sequence)
self.seeds[tuple(initial_sequence)] = sq
def quality(self, sequence):
"""
calculate quality of full sequence
:param sequence:
:return: quality score (int)
"""
pairs = zip(sequence[:-1], sequence[1:])
scores = [len(self.node_set[a].intersection(self.node_set[b]))
for a, b in pairs]
return sum(scores)
def brute_force_candidates(self, sequence):
for sequence in itertools.permutations(sequence):
yield sequence, self.quality(sequence)
def heuristic_candidates(self, seed_sequence):
# look for solutions with higher quality scores by swapping elements
# start with large distances between elements
# then reduce by power of 2 until swapping next-door neighbours
max_distance = len(seed_sequence) // 2
max_pow2 = int(pow(max_distance, 1/2))
distances = [int(pow(2, r)) for r in reversed(range(max_pow2 + 1))]
for distance in distances:
yield from self.seed_and_variations(seed_sequence, distance)
# seed candidate may be better than its derived sequences include it as a candidate
yield seed_sequence, self.quality(seed_sequence)
def seed_and_variations(self, seed_sequence, distance=1):
# swap elements at a distance, starting from beginning and end of the
# sequence in seed_sequence
candidate_count = 0
for pos1 in range(len(seed_sequence) - distance):
pos2 = pos1 + distance
q = self.quality(seed_sequence)
# from beginning of sequence
yield self.swap_and_quality(seed_sequence, q, pos1, pos2)
# from end of sequence
yield self.swap_and_quality(seed_sequence, q, -pos1, -pos2)
candidate_count += 2
if candidate_count > self.weed_interval:
self.weed()
candidate_count = 0
def swap_and_quality(self, sequence, preswap_sequence_q: int, pos1: int, pos2: int) -> (tuple, int):
"""
swap and return quality (which can easily be calculated from present quality
:param sequence: as for swap
:param pos1: as for swap
:param pos2: as for swap
:param preswap_sequence_q: quality of pre-swapped sequence
:return: swapped sequence, quality of swapped sequence
"""
initial_node_q = sum(self.node_quality(sequence, pos) for pos in [pos1, pos2])
swapped_sequence = self.swap(sequence, pos1, pos2)
swapped_node_q = sum(self.node_quality(swapped_sequence, pos) for pos in [pos1, pos2])
qdelta = swapped_node_q - initial_node_q
swapped_sequence_q = preswap_sequence_q + qdelta
return swapped_sequence, swapped_sequence_q
def swap(self, sequence, node_pos1: int, node_pos2: int):
"""
deques perform better than lists for swapping elements in a long sequence
:param sequence sequence on which to perform the element swap
:param node_pos1 zero-based position of first element
:param pos2 : zero-based position of second element
>>> swap(('A', 'B', 'C'), 0, 1)
('B', 'A', 'C')
"""
if type(sequence) is tuple:
# sequence is a candidate (which are dict keys and hence tuples)
# needs converting to a list for swap processing
sequence = list(sequence)
if node_pos1 == node_pos2:
return sequence
tmp = sequence[node_pos1]
sequence[node_pos1] = sequence[node_pos2]
sequence[node_pos2] = tmp
return sequence
def node_quality(self, sequence, pos):
if pos < 0:
pos = len(sequence) + pos
no_of_links = 0
middle_node_relations = self.node_set[sequence[pos]]
if pos > 0:
left_node_relations = self.node_set[sequence[pos - 1]]
no_of_links += len(left_node_relations.intersection(middle_node_relations))
if pos < len(sequence) - 1:
right_node_relations = self.node_set[sequence[pos + 1]]
no_of_links += len(middle_node_relations.intersection(right_node_relations))
return no_of_links
@timing
def optimise_order(self, selection_strategy):
top_score = 0
new_top_score = True
self.candidates.update(self.seeds)
while new_top_score:
top_score = max(self.candidates.values())
new_top_score = False
initial_candidates = {name for name, score in self.candidates.items() if score == top_score}
for initial_candidate in initial_candidates:
for candidate, q in selection_strategy(initial_candidate):
if q > top_score:
new_top_score = True
top_score = q
self.candidates[tuple(candidate)] = q
self.weed()
print(f"Number of candidates with top score: {len(list(self.candidates))}")
print(f"{list(self.candidates)[:3]}, ...and more")
print(f"Top score: {top_score}")
def weed(self):
# retain only top-scoring candidates
top_score = max(self.candidates.values())
low_scorers = {k for k, v in self.candidates.items() if v < top_score}
for low_scorer in low_scorers:
del self.candidates[low_scorer]
</code></pre>
<h2>代码术语表</h2>
<p><code>node_set</code>:一组带有标签的节点,其形式为<code>'unique_node_id': {relation1, relation2, ..., relationN}</code>。每个节点的关系集可以不包含任何关系,也可以包含任意数量的关系</p>
<p><code>node</code>:由<code>node_id</code>(键)和一组关系(值)组成的键值对</p>
<p><code>relation</code>:OP使用的是一个数字。如果两个节点都共享关系<code>1</code>,并且它们是序列中的邻居,则会将<code>1</code>添加到序列的质量中</p>
<p><code>sequence</code>:与<code>quality</code>分数相关联的一组有序节点ID(例如,.[A]、[B]、[C'])。质量分数是序列中节点之间共享关系的总和。启发式的输出是具有最高质量分数的一个或多个序列</p>
<p><code>candidate</code>:当前正在执行的序列调查看它是否是高质量的</p>
<h2>方法</h2>
<ol>
<li><p>通过对链接项中是否存在每个关系进行稳定排序来生成种子序列</p></li>
<li><p>初始呈现的顺序也是种子序列之一,以防接近最优</p></li>
<li><p>对于每个种子序列,交换成对的节点以寻找更高的质量分数</p></li>
<li><p>对每个种子序列执行一轮。轮是一种类似于外壳排序的序列传递,首先在一定距离上交换节点对,然后缩小距离,直到距离为1(交换近邻)。仅保留质量高于当前最高质量分数的序列</p></li>
<li><p>如果在这一轮中发现了一个新的最高质量分数,则剔除除最高分数之外的所有候选分数,并使用最高分数作为种子重复4次。否则退出</p></li>
</ol>
<h2>测试和测试数据生成器</h2>
<p>该启发式算法已经通过使用小型<code>node_sets</code>、数百到10000个节点的缩放数据(关系非常简单)和一个随机的、高度互联的<code>node_set</code>进行了测试,更像OP的测试数据生成器。完美的单链接序列、链接循环(内部链接和彼此链接的小子序列)和洗牌都有助于发现和修复弱点</p>
<pre class="lang-py prettyprint-override"><code>ABC_links = {
'A': {2, 3},
'B': {1, 2},
'C': {4}
}
nick_links = {
'B': {1, 2, 4},
'C': {4},
'A': {2, 3},
'D': {4},
'E': {3},
'F': {5, 6},
'G': {2, 4},
'H': {1},
}
unbroken_chain_linked_tail_to_head = ({1, 3}, {3, 4}, {4, 5}, {5, 6}, {6, 7}, {7, 8}, {8, 9}, {9, 10}, {10, 1})
cycling_unbroken_chain_linked_tail_to_head = itertools.cycle(unbroken_chain_linked_tail_to_head)
def many_nodes_many_relations(node_count):
# data set with n nodes and random 0..n relations as per OP's requirement
relation_range = list(range(node_count))
relation_set = (
set(random.choices(relation_range, k=random.randint(0, node_count)))
for _ in range(sys.maxsize)
)
return scaled_data(node_count, relation_set)
def scaled_data(no_of_items, link_sequence_model):
uppercase = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
# unique labels using sequence of four letters (AAAA, AAAB, AAAC, .., AABA, AABB, ...)
item_names = (''.join(letters) for letters in itertools.product(*([uppercase] * 4)))
# only use a copy of the original link sequence model otherwise the model could be exhausted
# or start mid-cycle
# https://stackoverflow.com/questions/42132731/how-to-create-a-copy-of-python-iterator
link_sequence_model, link_sequence = itertools.tee(link_sequence_model)
return {item_name: links for _, item_name, links in zip(range(no_of_items), item_names, link_sequence)}
def shuffled(items_list):
"""relies on Python 3.6+ dictionary insertion-ordered keys"""
shuffled_keys = list(items_list.keys())
random.shuffle(shuffled_keys)
return {k: items_list[k] for k in shuffled_keys}
cycling_quality_1000 = scaled_data(501, cycling_unbroken_chain_linked_tail_to_head)
cycling_quality_1000_shuffled = shuffled(cycling_quality_1000)
linked_forward_sequence = ({n, n + 1} for n in range(sys.maxsize))
# { 'A': {0, 1}, 'B': {1, 2}, ... } links A to B to ...
optimal_single_linked_unbroken_chain = scaled_data(401, linked_forward_sequence)
shuffled_single_linked_unbroken_chain = shuffled(optimal_single_linked_unbroken_chain)
large_node_set = scaled_data(10001, cycling_unbroken_chain_linked_tail_to_head)
large_node_set_shuffled = shuffled(large_node_set)
tests = [
('ABC', 1, ABC_links, True),
('Nick 8-digit', 6, nick_links, True),
# ('Quality <1000, cycling subsequence, small number of relations', 1000 - len(unbroken_chain_linked_tail_to_head), cycling_quality_1000, True),
# ('Quality <1000, cycling subsequence, small number of relations, shuffled', 1000 - len(unbroken_chain_linked_tail_to_head), cycling_quality_1000_shuffled, True),
('Many random relations per node (0..n), n=200', '?', many_nodes_many_relations(200), True),
# ('Quality 400, single unbroken chain, initial solution is optimal', 400, optimal_single_linked_unbroken_chain, False),
# ('Quality 400, single unbroken chain, shuffled', 400, shuffled_single_linked_unbroken_chain, True),
# ('10,000 items, single unbroken chain, initial order is optimal', 10000, large_node_set, False),
# ('10,000 items, single unbroken chain, shuffled', 10000, large_node_set_shuffled, True),
]
for title, expected_quality, item_links, generate_seeds in tests:
la = LinkAnalysis(node_set=item_links, generate_seeds=generate_seeds)
seq_length = len(list(la.node_set.keys()))
print()
print('===============================')
print(f'"{title}" (sequence length {seq_length})')
print('===============================')
la.create_initial_candidates()
print('Finding heuristic candidates...')
la.optimise_order(la.heuristic_candidates)
print(f'Optimal quality: {expected_quality}')
# print('Brute Force working...')
# la.optimise_order(la.brute_force_candidates)
</code></pre>
<h2>演出</h2>
<p>启发式比暴力解决方案更“实用”,因为它省去了许多可能的组合。可能是元素交换产生的低质量序列实际上距离一次交换后的高质量分数只有一步之遥,但这种情况可能会在测试之前被剔除</p>
<p>该启发式算法似乎可以找到单链序列或首尾相连的循环序列的最优结果。它们有一个已知的最佳解决方案,启发式算法可以找到该解决方案,并且它们可能比实际数据更简单,问题更少</p>
<p>一个巨大的改进是引入了“增量”质量计算,它可以快速计算两元素交换产生的质量差异,而无需重新计算整个序列的质量分数</p>