Pyspark:线程heartbeat receivereventloopthread中的未捕获异常

2024-05-14 07:41:34 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个pythonspark代码,如下所示。它基本上是从self.user_RDD获取{},为此{}结合了{}和{}的产物。然后保存到Redis中。在

    for user_id in self.user_RDD.collect():
        product_CF = self.getpreferredProducts(user_id)
        try:
            product_list = json.loads(redis_client.hget('user_products',user_id))
            # combine 2 list
            for product_id in product_list:
                if product_id in product_CF:
                    product_CF.remove(product_id)
            product_list.extend(product_CF)
            r.hset('score',user_id,str(json.dumps(product_list)))
        except Exception as e:
            print e

当有一个巨大的数据集时,它会在执行过程中停止并引发以下异常

^{pr2}$

Tags: 代码inselfredisidjsonforproduct

热门问题