如何使用scala或python在apachespark中运行多线程作业?

2024-10-01 15:41:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我在spark中遇到了一个与并发性相关的问题,它阻止了我在生产中使用它,但我知道有一种方法可以解决它。我正在尝试使用订单历史记录在7百万用户上运行10亿产品的Spark ALS。首先,我获取一个不同用户的列表,然后对这些用户运行一个循环以获得推荐,这是一个相当慢的过程,需要几天才能为所有用户获得推荐。我试着做笛卡尔用户和产品,一次得到所有人的推荐,但又一次,为了把这个输入到elasticsearch,我必须为每个用户过滤和排序记录,只有这样我才能把它输入到elasticsearch中,供其他api使用。在

所以请给我推荐一个解决方案,它在这种情况下是相当可伸缩的,并且可以在生产中使用,并提供实时建议。在

下面是我在scala中的代码片段,它将告诉您我当前如何解决问题:

  //    buy_values -> RDD with Rating(<int user_id>, <int product_id>, <double rating>)
  def recommend_for_user(user: Int): Unit = {
      println("Recommendations for User ID: " + user);
      // Product IDs which are not bought by user 
      val candidates = buys_values
        .filter(x => x("customer_id").toString.toInt != user)
        .map(x => x("product_id").toString.toInt)
        .distinct().map((user, _))
      // find 30 products with top rating
      val recommendations = bestModel.get
        .predict(candidates)
        .takeOrdered(30)(Ordering[Double].reverse.on(x => x.rating))

      var i = 1
      var ESMap = Map[String, String]()
      recommendations.foreach { r =>
        ESMap += r.product.toString -> bitem_ids.value(r.product)
      }
      //  push to elasticsearch with user as id
      client.execute {
        index into "recommendation" / "items" id user fields ESMap
      }.await
      // remove candidate RDD from memory
      candidates.unpersist()
  }
  // iterate on each user to get recommendations for the user [slow process]
  user_ids.foreach(recommend_for_user)

Tags: 用户idfor产品withproductelasticsearchvalues
2条回答

1.4已推荐所有建议,以便通过kv仓库提供服务。在

很明显,程序中的瓶颈是搜索candidates。考虑到Spark架构,它严重限制了并行化的能力,并通过为每个用户启动Spark job来增加大量开销。在

假设典型的情况是,在700万用户10亿个产品的情况下,您将预测整个产品范围减去用户已经购买的少数产品。至少在我看来,重要的问题是为什么还要费心过滤。即使你推荐以前买过的产品,它真的有害吗?在

除非您有非常严格的要求,否则我会忽略这个问题并使用^{}来完成所有工作,不包括数据导出。在那之后,你可以进行批量出口,你就可以走了。在

现在假设您有一个明确的无重复策略。假设一个典型用户只购买了相对较少的产品,您可以从为每个用户获得一套产品开始:

val userProdSet = buy_values
    .map{case (user, product, _) => (user, product)} 
    .aggregateByKey(Set.empty[Int])((s, e) => s + e, (s1, s2) => s1 ++ s2)

接下来,您可以简单地映射userProdSet来获得预测:

^{pr2}$

您可以通过使用可变集合进行聚合和广播模型来进一步改进,但这是一个一般的想法。在

如果user_ids中的用户数小于整个集合中的用户数(buy_values),您可以简单地过滤userProdSet来只保留一部分用户。在

相关问题 更多 >

    热门问题