基于Keras预训练模型的火花流

2024-04-26 10:49:51 发布

您现在位置:Python中文网/ 问答频道 /正文

我想在Spark Streaming应用程序中使用Keras预训练模型,该应用程序从卡夫卡主题接收数据,创建数据的绘图图像,然后使用Keras预训练模型对这些图像进行分类。到目前为止,我尝试了以下内容。以下是进口产品:

from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
import pickle
import os
import numpy as np
import time
import cv2
import seaborn as sns; sns.set()
import matplotlib.pyplot as plt
from keras_applications.imagenet_utils import decode_predictions, preprocess_input
from keras_preprocessing.image import load_img
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.models import load_model
from tensorflow.keras.models import save_model
from tensorflow.keras.models import Model
import types
import tempfile
from tensorflow.python.keras.layers import deserialize, serialize
from tensorflow.python.keras.saving import saving_utils

下面是将数据转换为绘图图形,然后转换为图像,然后对图像进行分类的函数:

def predict(x):
    key = x[0].decode("utf-8")
    records = list(x[1])
    data_x = list(map(lambda y: y[1].decode("utf-8").split(",")[4], records))
    data_y= list(map(lambda y: y[1].decode("utf-8").split(",")[5], records))
    color_data = list(map(lambda y: float(y[1].decode("utf-8").split(",")[8])/150.0, records))
    fig = plt.figure()
    ax = fig.add_subplot(111)
    data1 = np.array(data_x)
    data2 = np.array(data_y)
    colors = np.array(color_data)
    ax.plot(data1,data2)
    ax.scatter(data1, data2, c=colors)
    ax.axis('off')
    fig.canvas.draw()
    img = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
    img  = img.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    plt.close(fig)
    # img is rgb, convert to opencv's default bgr
    img = cv2.cvtColor(img,cv2.COLOR_RGB2BGR)

    image = cv2.resize(img, (224, 224))
    image = image.astype("float") / 255.0
    image = img_to_array(image)
    image = np.expand_dims(image, axis=0)
    (class1, class2, class3) = model.predict(image)[0]
    return f'{key},{class1},{class2},{class3}'

下面是Keras模型可拾取(我使用了这个示例:https://github.com/tensorflow/tensorflow/issues/34697)并加载的部分:

def unpack(model, training_config, weights):
    restored_model = deserialize(model)
    if training_config is not None:
        restored_model.compile(
            **saving_utils.compile_args_from_training_config(
                training_config
            )
        )
    restored_model.set_weights(weights)
    return restored_model

def make_keras_picklable():

    def __reduce__(self):
        model_metadata = saving_utils.model_metadata(self)
        training_config = model_metadata.get("training_config", None)
        model = serialize(self)
        weights = self.get_weights()
        return (unpack, (model, training_config, weights))

    cls = Model
    cls.__reduce__ = __reduce__

make_keras_picklable()
model = load_model("vgg16model.model")

以下是Spark Streaming应用程序从卡夫卡主题读取数据的部分:

SparkContext.setSystemProperty('spark.executor.memory', '12g')
SparkContext.setSystemProperty('spark.driver.memory', '12g')
sc = SparkContext("local[4]", "MyApp")
ssc = StreamingContext(sc,5) // 5-second windows
stream = KafkaUtils.createDirectStream(ssc,['my_test_topic'],{"metadata.broker.list":"localhost:9092"},keyDecoder=lambda x: x,valueDecoder=lambda x: x)
stream.foreachRDD(lambda x: process_rdd(x))
ssc.start()
ssc.awaitTermination()

下面是处理Spark窗口的RDD的函数,该函数还将结果保存到文件中:

def process_rdd(rdd):
    rdd.groupBy(lambda x: x[0]).map(lambda x: predict(x)).saveAsTextFile(f'C:\\spark_results_{time.time()}')

当我运行应用程序时,出现以下错误:

2020-06-23 06:25:34.991349: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library cudart64_101.dll
2020-06-23 06:25:38.112924: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library nvcuda.dll
2020-06-23 06:25:38.134107: E tensorflow/stream_executor/cuda/cuda_driver.cc:351] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2020-06-23 06:25:38.138318: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: DESKTOP-RK0TQ70
2020-06-23 06:25:38.138967: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: DESKTOP-RK0TQ70
2020-06-23 06:25:38.139499: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2
20/06/23 06:25:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[Stage 1:>                                                          (0 + 4) / 4]20/06/23 06:26:18 ERROR Utils: Aborting task
java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(Unknown Source)
        at java.net.SocketInputStream.read(Unknown Source)
        at java.io.BufferedInputStream.fill(Unknown Source)
        at java.io.BufferedInputStream.read(Unknown Source)
        at java.io.DataInputStream.readInt(Unknown Source)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:582)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:130)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:129)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:141)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
20/06/23 06:26:18 ERROR SparkHadoopWriter: Task attempt_20200623062548_0008_m_000003_0 aborted.
20/06/23 06:26:18 ERROR Executor: Exception in task 3.0 in stage 1.0 (TID 7)
org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:157)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(Unknown Source)
        at java.net.SocketInputStream.read(Unknown Source)
        at java.io.BufferedInputStream.fill(Unknown Source)
        at java.io.BufferedInputStream.read(Unknown Source)
        at java.io.DataInputStream.readInt(Unknown Source)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:582)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:130)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:129)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:141)
        ... 10 more
20/06/23 06:26:19 WARN TaskSetManager: Lost task 3.0 in stage 1.0 (TID 7, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:157)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(Unknown Source)
        at java.net.SocketInputStream.read(Unknown Source)
        at java.io.BufferedInputStream.fill(Unknown Source)
        at java.io.BufferedInputStream.read(Unknown Source)
        at java.io.DataInputStream.readInt(Unknown Source)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:582)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:130)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:129)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:141)
        ... 10 more

20/06/23 06:26:19 ERROR TaskSetManager: Task 3 in stage 1.0 failed 1 times; aborting job
20/06/23 06:26:19 ERROR SparkHadoopWriter: Aborting job job_20200623062548_0008.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 1 times, most recent failure: Lost task 3.0 in stage 1.0 (TID 7, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:157)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(Unknown Source)
        at java.net.SocketInputStream.read(Unknown Source)
        at java.io.BufferedInputStream.fill(Unknown Source)
        at java.io.BufferedInputStream.read(Unknown Source)
        at java.io.DataInputStream.readInt(Unknown Source)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:582)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:130)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:129)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:141)
        ... 10 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
        at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:78)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957)
        at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1544)
        at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1523)
        at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1523)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1523)
        at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:550)
        at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:157)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        ... 1 more
Caused by: java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(Unknown Source)
        at java.net.SocketInputStream.read(Unknown Source)
        at java.io.BufferedInputStream.fill(Unknown Source)
        at java.io.BufferedInputStream.read(Unknown Source)
        at java.io.DataInputStream.readInt(Unknown Source)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:582)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:130)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:129)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:141)
        ... 10 more
20/06/23 06:26:19 WARN FileUtil: Failed to delete file or dir [C:\spark_results_1592882745.8662503\_temporary\0\_temporary\attempt_20200623062548_0008_m_000000_0\.part-00000.crc]: it still exists.
20/06/23 06:26:19 WARN FileUtil: Failed to delete file or dir [C:\spark_results_1592882745.8662503\_temporary\0\_temporary\attempt_20200623062548_0008_m_000000_0\part-00000]: it still exists.
20/06/23 06:26:19 WARN FileUtil: Failed to delete file or dir [C:\spark_results_1592882745.8662503\_temporary\0\_temporary\attempt_20200623062548_0008_m_000001_0\.part-00001.crc]: it still exists.
20/06/23 06:26:19 WARN FileUtil: Failed to delete file or dir [C:\spark_results_1592882745.8662503\_temporary\0\_temporary\attempt_20200623062548_0008_m_000001_0\part-00001]: it still exists.
20/06/23 06:26:19 WARN FileUtil: Failed to delete file or dir [C:\spark_results_1592882745.8662503\_temporary\0\_temporary\attempt_20200623062548_0008_m_000002_0\.part-00002.crc]: it still exists.
20/06/23 06:26:19 WARN FileUtil: Failed to delete file or dir [C:\spark_results_1592882745.8662503\_temporary\0\_temporary\attempt_20200623062548_0008_m_000002_0\part-00002]: it still exists.
20/06/23 06:26:19 ERROR JobScheduler: Error running job streaming job 1592882745000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 68, in call
    r = self.func(t, *rdds)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\streaming\kafka.py", line 403, in <lambda>
    func = lambda r, rdd: old_func(rdd)
  File "streaming.py", line 192, in <lambda>
    stream.foreachRDD(lambda x: process_rdd(x))
  File "streaming.py", line 162, in process_rdd
    rdd.groupBy(lambda x: x[0]).map(lambda x: predict(x)).saveAsTextFile(f'C:\\spark_results_{time.time()}')
  File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\rdd.py", line 1570, in saveAsTextFile
    keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
  File "C:\ProgramData\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\ProgramData\Anaconda3\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o78.saveAsTextFile.

Traceback (most recent call last):
  File "streaming.py", line 195, in <module>
    ssc.awaitTermination()
  File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\streaming\context.py", line 192, in awaitTermination
    self._jssc.awaitTermination()
  File "C:\ProgramData\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\ProgramData\Anaconda3\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o27.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 68, in call
    r = self.func(t, *rdds)
  File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\streaming\kafka.py", line 403, in <lambda>
    func = lambda r, rdd: old_func(rdd)
  File "streaming.py", line 192, in <lambda>
    stream.foreachRDD(lambda x: process_rdd(x))
  File "streaming.py", line 162, in process_rdd
    rdd.groupBy(lambda x: x[0]).map(lambda x: predict(x)).saveAsTextFile(f'C:\\spark_results_{time.time()}')
  File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\rdd.py", line 1570, in saveAsTextFile
    keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
  File "C:\ProgramData\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\ProgramData\Anaconda3\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o78.saveAsTextFile.

检查Spark Streaming UI时,我看到以下内容:

enter image description here

这表明第一个窗口/批尚未完成处理,而其余窗口/批倾向于排队。此外,在使用纯python的非Spark应用程序中运行预先训练好的模型时,predict函数的逻辑工作,并在几毫秒内进行预测。我使用tensorflow 2.1.0和pyspark 2.4.5。问题的原因可能是什么


Tags: ioorgsourcereadapachejavaatunknown