PySpark陷入困境的多词嵌入模型训练

2024-06-16 10:24:32 发布

您现在位置:Python中文网/ 问答频道 /正文

非常高兴终于发布我的第一个问题,但如果我不清楚或违反了标准礼仪,请轻推我。我真诚地感谢我能得到的任何帮助

我试图使用PySpark(在Databricks中)并行地训练许多语料库的嵌入(每个语料库对应一个不同的作者)。对于每个作者,每个语料库的大小都在1GB以下

corpi/df的形式为:

+----------------+------------------------------------------------------------+
|          author|                                                      corpus|
+----------------+------------------------------------------------------------+
|            john| [["hello"], ["these", "are", "john's", "thoughts"]]        |
|           steve| [["hello"], ["these", "are", "steve's", "thoughts"]]       |
|          markus| [["hello"], ["these", "are", "markus's", "thoughts"]]       |
+----------------+------------------------------------------------------------+

过去的尝试:

  • 定义了一个UDF,该UDF使用gensim的Word2Vec函数应用于上面的每一行。这对小型corpi有效,但无论我制作的spark.executor.memory有多大,我都会从内存错误中解脱出来,这会导致我失去执行者、无限期挂起等等
  • 现在,我试图将上面的数据框explode转换为每一行都是一个句子的位置。然后我想我会做一个df.groupBy('author'),然后我会定义一个UDF,它实现Spark的Word2Vec模型来训练组并保存模型。不幸的是,即使只使用少数作者、重新分区等,它也不能让我在不无限期挂起的情况下爆炸数据帧,因此我只能使用上面显示的数据帧

群集:

  • 1个驱动程序=56GB内存,16核
  • 1-8个工作(自动缩放)56GB内存,16核

配置(受here启发):

yarn.nodemanager.pmem-check-enabled false
spark.databricks.delta.preview.enabled true
spark.executor.cores 5
spark.executor.memory 16gb
spark.executor.instances 26
spark.driver.memory 16gb
spark.yarn.executor.memoryOverhead 2gb
yarn.nodemanager.vmem-check-enabled false
spark.default.parallelism 260
spark.driver.cores 5

性能:

Poor Memory Usage

执行器出错:

WARN HangingTaskDetector: Task 1527 is probably not making progress because its metrics (Map(internal.metrics.shuffle.read.localBlocksFetched -> 0, internal.metrics.shuffle.read.remoteBytesReadToDisk -> 0, internal.metrics.shuffle.write.bytesWritten -> 0, internal.metrics.output.recordsWritten -> 0, internal.metrics.shuffle.write.recordsWritten -> 0, internal.metrics.memoryBytesSpilled -> 0, internal.metrics.shuffle.read.remoteBytesRead -> 0, internal.metrics.diskBytesSpilled -> 0, internal.metrics.shuffle.read.localBytesRead -> 0, internal.metrics.shuffle.read.recordsRead -> 0, internal.metrics.output.bytesWritten -> 0, internal.metrics.input.bytesRead -> 119650145, internal.metrics.input.recordsRead -> 4096, internal.metrics.shuffle.read.remoteBlocksFetched -> 0))

驱动程序标准日志:

2021-07-29T02:27:25.955+0000: [GC (Allocation Failure) [PSYoungGen: 5309440K->135150K(5444608K)] 5520626K->401147K(16629760K), 0.0788413 secs] [Times: user=0.22 sys=0.14, real=0.08 secs]

日志:

21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_29_piece0 on 10.161.179.16:42153 in memory (size: 7.4 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 10.161.179.6:40551 in memory (size: 28.6 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_7_piece0 on 10.161.179.19:38301 in memory (size: 14.2 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_37_piece0 on 10.161.179.19:36657 in memory (size: 13.4 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_7_piece0 on 10.161.179.19:36657 in memory (size: 14.2 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_31_piece0 on 10.161.179.10:45589 in memory (size: 11.5 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_35_piece0 on 10.161.179.10:40843 in memory (size: 8.3 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_7_piece0 on 10.161.179.16:42707 in memory (size: 14.2 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_16_piece0 on 10.161.179.19:38301 in memory (size: 8.0 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_16_piece0 on 10.161.179.19:36657 in memory (size: 8.0 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_16_piece0 on 10.161.179.6:37313 in memory (size: 8.0 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_7_piece0 on 10.161.179.6:37313 in memory (size: 14.2 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_8_piece0 on 10.161.179.6:37313 in memory (size: 62.5 KiB, free: 8.4 GiB)
21/07/29 02:27:27 INFO BlockManagerInfo: Removed broadcast_37_piece0 on 10.161.179.8:41599 in memory (size: 13.4 KiB, free: 8.4 GiB)
21/07/29 02:29:16 INFO DriverCorral: DBFS health check ok
21/07/29 02:29:17 INFO HikariDataSource: metastore-monitor - Starting...
21/07/29 02:29:17 INFO HikariDataSource: metastore-monitor - Start completed.
21/07/29 02:29:17 INFO HikariDataSource: metastore-monitor - Shutdown initiated...
21/07/29 02:29:17 INFO HikariDataSource: metastore-monitor - Shutdown completed.
21/07/29 02:29:17 INFO MetastoreMonitor: Metastore healthcheck successful (connection duration = 84 milliseconds)
21/07/29 02:29:25 INFO HiveMetaStore: 1: get_database: default
21/07/29 02:29:25 INFO audit: ugi=root  ip=unknown-ip-addr  cmd=get_database: default   
21/07/29 02:29:25 INFO DriverCorral: Metastore health check ok

所以我的问题是双重的

  1. 简单地尝试分解行,然后使用应用于groupBy.('author')的UDF来训练Word2Vec嵌入,这样做是否有问题?我知道explode非常昂贵,因为它会创建大量的行,但它似乎是最好的选择
  2. 有没有一种更有效/不同的方法来解决这个问题,从而完全避免explode挂断

请让我知道,如果我能提供任何其他信息,可能会有所帮助,我欢迎任何建议,这将有助于我在未来更清楚地提出问题

更新: 在将spark.sql.shuffle.partitions更改为大量后,情况肯定有所改善,但当执行者正在完成任务时,由于“工人丢失”或“远程RPC客户端断开关联,它们不断被创建和销毁。很可能…”(尽管更改了许多不同的驱动程序/执行器内存/内核配置,但这在过去一直困扰着我)。事件时间线如下所示:

Timeline

因此,我的上述问题仍然成立,但找到合适的配置似乎更近了一步


Tags: ininfofreesizeonsparkmetricsinternal