为什么PySpark在完成几个ShuffleMapTask后会卡住?
我在用pyspark运行一个Python脚本时,执行到"14/03/23 21:00:30 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 6)"这一步时没有任何反应,卡在这里了,也没有出现错误信息。
==========日志=============
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Serialized task 5.0:4 as 3689 bytes in 1 ms
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Starting task 5.0:5 as TID 73 on executor 0: myhost-bigdata-110d13 (NODE_LOCAL)
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Serialized task 5.0:5 as 3689 bytes in 0 ms
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Starting task 5.0:6 as TID 74 on executor 2: myhost-bigdata-110d14 (NODE_LOCAL)
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Serialized task 5.0:6 as 3689 bytes in 0 ms
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Starting task 5.0:7 as TID 75 on executor 1: myhost-bigdata-110d12 (NODE_LOCAL)
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Serialized task 5.0:7 as 3689 bytes in 0 ms
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Starting task 5.0:8 as TID 76 on executor 0: myhost-bigdata-110d13 (NODE_LOCAL)
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Serialized task 5.0:8 as 3689 bytes in 0 ms
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Starting task 5.0:9 as TID 77 on executor 2: myhost-bigdata-110d14 (NODE_LOCAL)
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Serialized task 5.0:9 as 3689 bytes in 0 ms
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Finished TID 73 in 200 ms on myhost-bigdata-110d13 (progress: 0/10)
14/03/23 21:00:30 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 5)
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Finished TID 76 in 218 ms on myhost-bigdata-110d13 (progress: 1/10)
14/03/23 21:00:30 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 8)
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Finished TID 72 in 324 ms on myhost-bigdata-110d12 (progress: 2/10)
14/03/23 21:00:30 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 4)
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Finished TID 69 in 371 ms on myhost-bigdata-110d12 (progress: 3/10)
14/03/23 21:00:30 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 1)
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Finished TID 75 in 367 ms on myhost-bigdata-110d12 (progress: 4/10)
14/03/23 21:00:30 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 7)
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Finished TID 77 in 423 ms on myhost-bigdata-110d14 (progress: 5/10)
14/03/23 21:00:30 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 9)
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Finished TID 71 in 435 ms on myhost-bigdata-110d14 (progress: 6/10)
14/03/23 21:00:30 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 3)
14/03/23 21:00:30 INFO scheduler.TaskSetManager: Finished TID 74 in 510 ms on myhost-bigdata-110d14 (progress: 7/10)
14/03/23 21:00:30 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(5, 6)
==============工作1日志==============
14/03/23 21:32:33 INFO executor.Executor: Serialized size of result for 59 is 962
14/03/23 21:32:33 INFO executor.Executor: Sending result for 59 directly to driver
14/03/23 21:32:33 INFO executor.Executor: Serialized size of result for 47 is 962
14/03/23 21:32:33 INFO executor.Executor: Finished task ID 59
14/03/23 21:32:33 INFO executor.Executor: Sending result for 47 directly to driver
14/03/23 21:32:33 INFO executor.Executor: Finished task ID 47
14/03/23 21:32:33 INFO executor.Executor: Serialized size of result for 44 is 962
14/03/23 21:32:33 INFO executor.Executor: Sending result for 44 directly to driver
14/03/23 21:32:33 INFO executor.Executor: Finished task ID 44
14/03/23 21:32:33 INFO python.PythonRDD: Times: total = 82, boot = 3, init = 76, finish = 3
14/03/23 21:32:33 INFO executor.Executor: Serialized size of result for 65 is 962
14/03/23 21:32:33 INFO executor.Executor: Sending result for 65 directly to driver
14/03/23 21:32:33 INFO executor.Executor: Finished task ID 65
14/03/23 21:32:33 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 68
14/03/23 21:32:33 INFO executor.Executor: Running task ID 68
14/03/23 21:32:33 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 71
14/03/23 21:32:33 INFO executor.Executor: Running task ID 71
14/03/23 21:32:33 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 74
14/03/23 21:32:33 INFO executor.Executor: Running task ID 74
14/03/23 21:32:33 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 77
14/03/23 21:32:33 INFO executor.Executor: Running task ID 77
14/03/23 21:32:33 INFO storage.BlockManager: Found block broadcast_0 locally
14/03/23 21:32:33 INFO storage.BlockManager: Found block broadcast_0 locally
14/03/23 21:32:33 INFO storage.BlockManager: Found block broadcast_0 locally
14/03/23 21:32:33 INFO storage.BlockManager: Found block broadcast_0 locally
14/03/23 21:32:33 INFO rdd.HadoopRDD: Input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00000.bz2:0+14685
14/03/23 21:32:33 INFO rdd.HadoopRDD: Input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00009.bz2:0+15447
14/03/23 21:32:33 INFO rdd.HadoopRDD: Input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00006.bz2:0+14924
14/03/23 21:32:33 INFO rdd.HadoopRDD: Input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00003.bz2:0+15015
14/03/23 21:32:33 INFO python.PythonRDD: Times: total = 89, boot = 3, init = 62, finish = 24
14/03/23 21:32:33 INFO executor.Executor: Serialized size of result for 68 is 851
14/03/23 21:32:33 INFO executor.Executor: Sending result for 68 directly to driver
14/03/23 21:32:33 INFO executor.Executor: Finished task ID 68
14/03/23 21:32:33 INFO python.PythonRDD: Times: total = 83, boot = 2, init = 57, finish = 24
14/03/23 21:32:33 INFO executor.Executor: Serialized size of result for 77 is 851
14/03/23 21:32:33 INFO executor.Executor: Sending result for 77 directly to driver
14/03/23 21:32:33 INFO executor.Executor: Finished task ID 77
14/03/23 21:32:34 INFO python.PythonRDD: Times: total = 66, boot = 2, init = 40, finish = 24
14/03/23 21:32:34 INFO python.PythonRDD: Times: total = 95, boot = 2, init = 60, finish = 33
14/03/23 21:32:34 INFO executor.Executor: Serialized size of result for 71 is 851
14/03/23 21:32:34 INFO executor.Executor: Sending result for 71 directly to driver
14/03/23 21:32:34 INFO executor.Executor: Finished task ID 71
14/03/23 21:32:34 INFO executor.Executor: Serialized size of result for 74 is 851
14/03/23 21:32:34 INFO executor.Executor: Sending result for 74 directly to driver
14/03/23 21:32:34 INFO executor.Executor: Finished task ID 74
=========工作2日志 ========================
14/03/23 21:32:40 INFO executor.Executor: Serialized size of result for 60 is 962
14/03/23 21:32:40 INFO executor.Executor: Sending result for 60 directly to driver
14/03/23 21:32:40 INFO executor.Executor: Finished task ID 60
14/03/23 21:32:40 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 69
14/03/23 21:32:40 INFO executor.Executor: Running task ID 69
14/03/23 21:32:40 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 72
14/03/23 21:32:40 INFO executor.Executor: Running task ID 72
14/03/23 21:32:40 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 75
14/03/23 21:32:40 INFO executor.Executor: Running task ID 75
14/03/23 21:32:40 INFO storage.BlockManager: Found block broadcast_0 locally
14/03/23 21:32:40 INFO storage.BlockManager: Found block broadcast_0 locally
14/03/23 21:32:40 INFO rdd.HadoopRDD: Input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00001.bz2:0+14114
14/03/23 21:32:40 INFO storage.BlockManager: Found block broadcast_0 locally
14/03/23 21:32:40 INFO rdd.HadoopRDD: Input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00007.bz2:0+15325
14/03/23 21:32:40 INFO rdd.HadoopRDD: Input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00004.bz2:0+15169
14/03/23 21:32:40 INFO python.PythonRDD: stdin writer to Python finished early
14/03/23 21:32:40 INFO python.PythonRDD: stdin writer to Python finished early
14/03/23 21:32:40 INFO python.PythonRDD: Times: total = 25, boot = 2, init = 10, finish = 13
14/03/23 21:32:40 INFO executor.Executor: Serialized size of result for 72 is 851
14/03/23 21:32:40 INFO executor.Executor: Sending result for 72 directly to driver
14/03/23 21:32:40 INFO executor.Executor: Finished task ID 72
===============工作3日志===================
14/03/23 21:32:28 INFO executor.Executor: Serialized size of result for 55 is 962
14/03/23 21:32:28 INFO executor.Executor: Serialized size of result for 61 is 962
14/03/23 21:32:28 INFO executor.Executor: Sending result for 55 directly to driver
14/03/23 21:32:28 INFO executor.Executor: Finished task ID 58
14/03/23 21:32:28 INFO executor.Executor: Finished task ID 55
14/03/23 21:32:28 INFO executor.Executor: Sending result for 61 directly to driver
14/03/23 21:32:28 INFO executor.Executor: Finished task ID 61
14/03/23 21:32:28 INFO python.PythonRDD: Times: total = 92, boot = 3, init = 86, finish = 3
14/03/23 21:32:28 INFO executor.Executor: Serialized size of result for 67 is 968
14/03/23 21:32:28 INFO executor.Executor: Sending result for 67 directly to driver
14/03/23 21:32:28 INFO executor.Executor: Finished task ID 67
14/03/23 21:32:28 INFO executor.Executor: Serialized size of result for 64 is 962
14/03/23 21:32:28 INFO executor.Executor: Sending result for 64 directly to driver
14/03/23 21:32:28 INFO executor.Executor: Finished task ID 64
14/03/23 21:32:29 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 70
14/03/23 21:32:29 INFO executor.Executor: Running task ID 70
14/03/23 21:32:29 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 73
14/03/23 21:32:29 INFO executor.Executor: Running task ID 73
14/03/23 21:32:29 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 76
14/03/23 21:32:29 INFO executor.Executor: Running task ID 76
14/03/23 21:32:29 INFO storage.BlockManager: Found block broadcast_0 locally
14/03/23 21:32:29 INFO storage.BlockManager: Found block broadcast_0 locally
14/03/23 21:32:29 INFO storage.BlockManager: Found block broadcast_0 locally
14/03/23 21:32:29 INFO rdd.HadoopRDD: Input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00002.bz2:0+14560
14/03/23 21:32:29 INFO rdd.HadoopRDD: Input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00008.bz2:0+14842
14/03/23 21:32:29 INFO rdd.HadoopRDD: Input split: hdfs://myhost-bigdata-110d11:9000/tmp/logs/2014/03/21/cc/uv/part-00005.bz2:0+14961
14/03/23 21:32:29 INFO python.PythonRDD: Times: total = 73, boot = 3, init = 46, finish = 24
14/03/23 21:32:29 INFO executor.Executor: Serialized size of result for 70 is 851
14/03/23 21:32:29 INFO executor.Executor: Sending result for 70 directly to driver
14/03/23 21:32:29 INFO executor.Executor: Finished task ID 70
14/03/23 21:32:29 INFO python.PythonRDD: Times: total = 73, boot = 2, init = 47, finish = 24
14/03/23 21:32:29 INFO executor.Executor: Serialized size of result for 76 is 851
14/03/23 21:32:29 INFO executor.Executor: Sending result for 76 directly to driver
14/03/23 21:32:29 INFO executor.Executor: Finished task ID 76
14/03/23 21:32:29 INFO python.PythonRDD: Times: total = 78, boot = 3, init = 47, finish = 28
14/03/23 21:32:29 INFO executor.Executor: Serialized size of result for 73 is 851
14/03/23 21:32:29 INFO executor.Executor: Sending result for 73 directly to driver
14/03/23 21:32:29 INFO executor.Executor: Finished task ID 73
=============我的代码============
conf = SparkConf()
conf.setMaster("spark://myhost-bigdata:7077")
conf.setAppName("job")
conf.set("spark.ui.port", 4040)
sc=SparkContext(conf=conf)
types = ["pv","uv"]
for type in types:
dm = sc.textFile("/tmp/uv/part-%s-*.gz" % type)
arrs1 = dm.map(lambda line: (int(line.split("^")[0].split("_")[0]),line.split("^")[1])).distinct().map(lambda a:( a[0], 1)).countByKey().items()
arrs2 = dm.map(lambda line: (int(line.split("^")[0].split("_")[1]),line.split("^")[1])).distinct().map(lambda a:( a[0], 1)).countByKey().items()
arrs3 = dm.map(lambda line: (int(line.split("^")[0].split("_")[2]),line.split("^")[1])).distinct().map(lambda a:( a[0], 1)).countByKey().items()
=====================我的新代码 ==================
#-*- encoding: UTF-8 -*-
import logging
import time
from pyspark import SparkConf, SparkContext
logger = logging.getLogger("endlesscode")
formatter = logging.Formatter('%(name)-12s %(asctime)s %(levelname)-8s %(message)s', '%a, %d %b %Y %H:%M:%S', )
file_handler = logging.FileHandler("y.log")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.setLevel(logging.DEBUG)
def getSparkContext(port):
conf = SparkConf()
conf.setMaster("spark://my-bigdata-01:7077")
conf.setAppName("test")
conf.set("spark.ui.port", port)
return SparkContext(conf=conf)
def getPv(dm, level):
return dm.map(
lambda line: (int(line.split("^")[0].split("_")[level]), int(line.split("^")[1]))).reduceByKey(
lambda a, b: a + b, numPartitions=80).collect()
def getUv(dm, level):
return dm.map(
lambda line: (int(line.split("^")[0].split("_")[level]), line.split("^")[1])).distinct().map(
lambda a: (a[0], 1)).reduceByKey(
lambda a, b: a + b, numPartitions=80).collect()
def calc(ls):
sc = []
try:
port = ls[0]
cityname = ls[1]
calcTime = ls[2]
regDate = calcTime[0] + "-" + calcTime[1] + "-" + calcTime[2]
logNames = ["pcpv", "wappv", "apppv", "pcuv", "wapuv", "appuv", "hottidall"]
sc = getSparkContext(port)
for index, ln in enumerate(logNames):
log = "/tmp/2014report/%s/%s/%s/%s/%s/part-*.bz2" % (calcTime[0], calcTime[1], calcTime[2], cityname, ln)
dm = sc.textFile(log)
logger.debug("Type: %s, total count: %s; cityname: %s; first: %s" % (ln, dm.count(), cityname, dm.take(1)))
if index in (0, 1, 2):
for i in range(0, 3):
result = getPv(dm, i)
logger.debug("Level: %s, regDate: %s, cityname: %s, result: %s", i, regDate, cityname, result)
elif index in (3, 4, 5):
for i in range(0, 3):
result = getUv(dm, i)
logger.debug("Level: %s, regDate: %s, cityname: %s, result: %s", i, regDate, cityname, result)
elif index == 6:
pass
except Exception as e:
logger.error("Error: %s", e)
finally:
sc.stop()
def getAllCityName():
array = ("cc", "hangzhou", "taizhou", "jiaxing")
return array
if __name__ == '__main__':
port = 5100
curTime = time.time()
citynames = getAllCityName()
for index1, cityname in enumerate(citynames):
calc((port + index1, cityname, ["2014", "03", "26"]))
1 个回答
0
它只有10个分区 (进度: 7/10)
。如果数据量真的很大,只有10个分区可能会导致数据处理变得混乱。我发现当我处理非常大的数据时,如果分区不够,任务经常会卡住。这个卡住可能是因为内存管理的问题,或者其他和内存有关的原因。一般来说,每个CPU大约需要2到4个分区,但有时候我会使用更多的分区,比如上千个,以避免内存溢出和疯狂的内存管理卡住。
另外,countByKey
在处理键的数量时不太灵活,这可能会成为一个问题。可以尝试使用(Scala代码).map((_, 1)).reduceByKey(_ + _)
。