我有这个火花程序,我会尽量把它限制在相关的部分
# Split by delimiter ,
# If the file is in unicode, we need to convert each value to a float in order to be able to
# treat it as a number
points = sc.textFile(filename).map(lambda line: [float(x) for x in line.split(",")]).persist()
# start with K randomly selected points from the dataset
# A centroid cannot be an actual data point or else the distance measure between a point and
# that centroid will be zero. This leads to an undefined membership value into that centroid.
centroids = points.takeSample(False, K, 34)
#print centroids
# Initialize our new centroids
newCentroids = [[] for k in range(K)]
tempCentroids = []
for centroid in centroids:
tempCentroids.append([centroid[N] + 0.5])
#centroids = sc.broadcast(tempCentroids)
convergence = False
ncm = NCM()
while(not convergence):
memberships = points.map(lambda p : (p, getMemberships([p[N]], centroids.value, m)))
cmax = memberships.map(lambda (p, mus) : (p, getCMax(mus, centroids.value)))
# Memberships
T = cmax.map(lambda (p, c) : (p, getMemberships2([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)))
I = cmax.map(lambda (p, c) : (p, getIndeterminateMemberships([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)[0]))
F = cmax.map(lambda (p, c) : (p, getFalseMemberships([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)[0]))
# Components of new centroids
wTm = T.map(lambda (x, t) : ('onekey', scalarPow(m, scalarMult(weight1, t))))
#print "wTm = " + str(wTm.collect())
print "at first reduce"
sumwTm = wTm.reduceByKey(lambda p1, p2 : addPoints(p1, p2))
#print "sumwTm = " + str(sumwTm.collect())
wTmx = T.map(lambda (x, t) : pointMult([x[N]], scalarPow(m, scalarMult(weight1, t))))
print "adding to cnumerator list"
#print wTmx.collect()
cnumerator = wTmx.flatMap(lambda p: getListComponents(p)).reduceByKey(lambda p1, p2 : p1 + p2).values()
print "collected cnumerator, now printing"
#print "cnumerator = " + str(cnumerator.collect())
#print str(sumwTm.collect())
# Calculate the new centroids
sumwTmCollection = sumwTm.collect()[0][1]
cnumeratorCollection = cnumerator.collect()
#print "sumwTmCollection = " + str(sumwTmCollection)
#cnumeratorCollection =cnumerator.collectAsMap().get(0).items
print "cnumeratorCollection = " + str(cnumeratorCollection)
for i in range(len(newCentroids)):
newCentroids[i] = scalarMult(1 / sumwTmCollection[i], [cnumeratorCollection[i]])
centroids = newCentroids
# Test for convergence
convergence = ncm.test([centroids[N]], [newCentroids[N]], epsilon)
#convergence = True
# Replace our old centroids with the newly found centroids and repeat if convergence not met
# Clear out space for a new set of centroids
newCentroids = [[] for k in range(K)]
这个程序在我的本地机器上运行得很好,但是当在独立集群上运行时,它的行为并不像预期的那样。它不一定抛出一个错误,但它所做的事情会给出与我在本地机器上运行时收到的不同的输出。集群和3个节点似乎运行良好。我有一种感觉,问题是我不断更新centroids
,这是一个python列表,它每次都会通过while-loop
进行更改。是否可能每个节点都没有该列表的最新副本?我想是的,所以我试着使用broadcast variable
,但这些不能更新(只读)。我也试过使用accumulator
,但那只是为了积累。我还试图将python列表保存为hdfs上的一个文件,以便每个节点都可以访问,但这没有很好地工作。你认为我对这个问题的理解正确吗?这里可能还有别的事吗?我怎样才能得到在本地机器上运行良好的代码,而不是在集群上运行的代码?在
感谢您一直以来对这个问题的关注和关注,特别是因为听起来我本可以发布更多的信息来让您的工作更轻松。问题出在这里
我没有意识到这一点,但是经过一个简短的实验,这个函数每次都返回相同的输出,尽管它是我认为的随机样本。只要您使用相同的seed(本例中为34),您将得到相同的RDD作为回报。由于某些原因,集群上的RDD与返回到本地计算机的RDD不同。无论如何,由于每次都是相同的RDD,所以我的输出从未改变过。关于“随机”质心的问题是,这些特殊的质心产生了数学中的马鞍点,在那里质心不会收敛。答案的这一部分是数学和编程问题,所以我不再赘述。在这一点上,我真正的希望是,如果你愿意的话,其他人会得到帮助
为了在每次调用时生成不同的样本,每次都将种子更改为某个随机数。在
我希望这些都有帮助。我从来没有花这么多时间来解决我的记忆问题。在
再次感谢。在
相关问题 更多 >
编程相关推荐