Python中的大型网络数据集-如何处理非常大的数组

2024-05-19 14:13:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我们目前使用Cassandra(http://cassandra.apache.org/)作为时间序列数据。Cassandra的读取速度非常快,但是在呈现数据之前,我们必须对数据执行一系列计算(实际上,我们是在模仿SQL的SUM和GROUP-BY功能—Cassandra不支持现成的功能)

我们熟悉Python(在一定程度上),并决定构建一个脚本来查询Cassandra集群,执行数学运算,并以JSON格式显示结果:

query = (
    "SELECT query here...")

startTimeQuery = time.time()

# Executes cassandra query
rslt = cassession.execute(query)

print("--- %s seconds to query ---" % (time.time() - startTimeQuery))

tally = {}

startTimeCalcs = time.time()
for row in rslt:
    userid = row.site_user_id

    revenue = (int(row.revenue) - int(row.reversals_revenue or 0))
    accepted = int(row.accepted or 0)
    reversals_revenue = int(row.reversals_revenue or 0)
    error = int(row.error or 0)
    impressions_negative = int(row.impressions_negative or 0)
    impressions_positive = int(row.impressions_positive or 0)
    rejected = int(row.rejected or 0)
    reversals_rejected = int(row.reversals_rejected or 0)

    if tally.has_key(userid):
        tally[userid]["revenue"] += revenue
        tally[userid]["accepted"] += accepted
        tally[userid]["reversals_revenue"] += reversals_revenue
        tally[userid]["error"] += error
        tally[userid]["impressions_negative"] += impressions_negative
        tally[userid]["impressions_positive"] += impressions_positive
        tally[userid]["rejected"] += rejected
        tally[userid]["reversals_rejected"] += reversals_rejected
    else:
        tally[userid] = {
            "accepted": accepted,
            "error": error,
            "impressions_negative": impressions_negative,
            "impressions_positive": impressions_positive,
            "rejected": rejected,
            "revenue": revenue,
            "reversals_rejected": reversals_rejected,
            "reversals_revenue": reversals_revenue
        }


print("--- %s seconds to calculate results ---" % (time.time() - startTimeCalcs))

startTimeJson = time.time()
jsonOutput =json.dumps(tally)
print("--- %s seconds for json dump ---" % (time.time() - startTimeJson))

print("--- %s seconds total ---" % (time.time() - startTimeQuery))

print "Array Size: " + str(len(tally)) 

我们得到的输出是:

^{pr2}$

我们在计算上花费了大量的时间,我们知道问题不在于求和和和分组本身:问题在于数组的大小。在

我们听到了一些关于numpy的好消息,但是我们的数据的性质使得矩阵的大小是未知的。在

我们正在寻找有关如何处理这一问题的任何提示。完全不同的编程方法。在


Tags: ortimeerrorqueryintrowuseridnegative
2条回答

我做了一个非常相似的处理,我也担心处理时间。我认为您没有考虑到一些重要的事情:作为函数execute()的返回而从cassandra接收的result对象并没有包含您想要的所有行。相反,它包含一个分页的结果,当您扫描for列表中的对象时,它将获得行。这是基于个人的观察,但我不知道更多的技术细节来提供。在

我建议您通过在execute命令后面添加一个简单的rslt = list(rslt)来隔离结果的查询和处理,这将迫使python在进行处理之前遍历结果中的所有行,同时还强制cassandra驱动程序在进行处理之前获取所需的所有行。在

我想你会发现,你的很多处理时间实际上是查询,但它被驱动程序通过分页结果掩盖了。在

Cassandra2.2及更高版本允许用户定义聚合函数。你可以用它在卡桑德拉侧进行柱计算。 有关用户定义聚合的数据,请参见DataStax article

相关问题 更多 >