带pysp的Python单例

2024-09-29 23:15:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试重写scala code,以便使用pyspark库在python中计算随机游动。我遇到了如何最好地处理scala对象GraphMap的实现来计算routing table的问题。问题是GraphMap的值在每个worker上都被访问和修改。由于GraphMap是一个Scala对象,因此结果是对其值进行全局修改。你知道吗

在python中,我创建了一个模块graphmap.py

import threading


def synchronized(func):
    func.__lock__ = threading.Lock()

    def synced_func(*args, **kws):
        with func.__lock__:
            return func(*args, **kws)

    return synced_func


class GraphMap(object):
    scr_vertex_map = dict()
    offsets = []
    lengths = []
    edges = []
    index_counter = 0
    offset_counter = 0
    first_get = True
    vertex_partition_map = dict()

    @synchronized
    def add_vertex(self, vertex_id, neighbors):
        """
        Add vertex to the map
        :param vertex_id: <int>
        :param neighbors: <list(int,float)>
        :return:
        """
        if vertex_id not in self.scr_vertex_map:
            if len(neighbors):
                self.scr_vertex_map[vertex_id] = self.index_counter
                self.offsets.insert(self.index_counter, self.offset_counter)
                self.lengths.insert(self.index_counter, len(neighbors))
                self.index_counter += 1
                for element in neighbors:
                    self.edges.insert(self.offset_counter, element)
                    self.offset_counter += 1
            else:
                self.scr_vertex_map[vertex_id] = -1

以及一个单独的模块来计算路由表

from graphmap import GraphMap
import pytest
from pyspark.sql import SparkSession


def test_graphmap():
    spark_session = SparkSession \
        .builder \
        .master('local[*]')\
        .getOrCreate()

    graph = spark_session.sparkContext\
        .parallelize([(1, [(2, 1.0)]), (2, []), (3, [])])\
        .partitionBy(3)
    graph_map = GraphMap()

    def f(splitIndex, iterator):
        for vid, neighbors in iterator:
            graph_map.add_vertex(vid, neighbors)
        yield []

    routing_table = graph.mapPartitionsWithIndex(f, preservesPartitioning=True)
    assert(routing_table.count() == 3)
    assert(all(len(i) == 0 for i in routing_table.collect()))
    assert(graph_map.index_counter == 1)
    assert (graph_map.offset_counter == 1)
    assert(len(graph_map.scr_vertex_map) == 3)

最后三个断言失败,因为在我的例子中,graph\u map不是一个单例,并且在每个worker上创建和修改一个单独的实例。我尝试按照this post的建议为graphmap创建一个单例(特别是模块方法、Borg模式和元类方法)。然而,这并没有帮助。我不知道是什么原因,但我猜这和腌制有关。如果有人能解释一下这里到底发生了什么,我将不胜感激。你知道吗

最后,我通过将graphmap的状态本地存储在文件中解决了这个问题。因此,在每个分区上,我首先从文件中读取状态,然后在修改之后将状态写入文件。据我所知,如果在集群模式下运行spark,这种方法将不起作用。你知道吗

这是我能做的最好的了吗?有更好的方法吗?如有任何建议,我们将不胜感激。你知道吗


Tags: selfidmapindexdefcounterneighborsassert

热门问题