我正在尝试重写scala code,以便使用pyspark库在python中计算随机游动。我遇到了如何最好地处理scala对象GraphMap的实现来计算routing table的问题。问题是GraphMap的值在每个worker上都被访问和修改。由于GraphMap是一个Scala对象,因此结果是对其值进行全局修改。你知道吗
在python中,我创建了一个模块graphmap.py
import threading
def synchronized(func):
func.__lock__ = threading.Lock()
def synced_func(*args, **kws):
with func.__lock__:
return func(*args, **kws)
return synced_func
class GraphMap(object):
scr_vertex_map = dict()
offsets = []
lengths = []
edges = []
index_counter = 0
offset_counter = 0
first_get = True
vertex_partition_map = dict()
@synchronized
def add_vertex(self, vertex_id, neighbors):
"""
Add vertex to the map
:param vertex_id: <int>
:param neighbors: <list(int,float)>
:return:
"""
if vertex_id not in self.scr_vertex_map:
if len(neighbors):
self.scr_vertex_map[vertex_id] = self.index_counter
self.offsets.insert(self.index_counter, self.offset_counter)
self.lengths.insert(self.index_counter, len(neighbors))
self.index_counter += 1
for element in neighbors:
self.edges.insert(self.offset_counter, element)
self.offset_counter += 1
else:
self.scr_vertex_map[vertex_id] = -1
以及一个单独的模块来计算路由表
from graphmap import GraphMap
import pytest
from pyspark.sql import SparkSession
def test_graphmap():
spark_session = SparkSession \
.builder \
.master('local[*]')\
.getOrCreate()
graph = spark_session.sparkContext\
.parallelize([(1, [(2, 1.0)]), (2, []), (3, [])])\
.partitionBy(3)
graph_map = GraphMap()
def f(splitIndex, iterator):
for vid, neighbors in iterator:
graph_map.add_vertex(vid, neighbors)
yield []
routing_table = graph.mapPartitionsWithIndex(f, preservesPartitioning=True)
assert(routing_table.count() == 3)
assert(all(len(i) == 0 for i in routing_table.collect()))
assert(graph_map.index_counter == 1)
assert (graph_map.offset_counter == 1)
assert(len(graph_map.scr_vertex_map) == 3)
最后三个断言失败,因为在我的例子中,graph\u map不是一个单例,并且在每个worker上创建和修改一个单独的实例。我尝试按照this post的建议为graphmap创建一个单例(特别是模块方法、Borg模式和元类方法)。然而,这并没有帮助。我不知道是什么原因,但我猜这和腌制有关。如果有人能解释一下这里到底发生了什么,我将不胜感激。你知道吗
最后,我通过将graphmap的状态本地存储在文件中解决了这个问题。因此,在每个分区上,我首先从文件中读取状态,然后在修改之后将状态写入文件。据我所知,如果在集群模式下运行spark,这种方法将不起作用。你知道吗
这是我能做的最好的了吗?有更好的方法吗?如有任何建议,我们将不胜感激。你知道吗
目前没有回答
相关问题 更多 >
编程相关推荐