有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Apache Spark流媒体定制接收器(socket服务器)

我正在尝试制作一个简单的spark流应用程序,它通过socket同时接收多个请求,spark应用程序在同一socket上响应每个请求,因此管道是这样的:“假设spark作业已启动并运行,并等待新的请求”:

PHP通过socket发送“hello spark==”>;spark处理此请求并使用“hello php”进行回复

所以在这里,我尝试将spark与java多线程结合起来 代码如下:

object socket_code {
  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("CustomReceiver")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    try{
    println("Waiting for connection")
    val listener = new ServerSocket(9898);
    val lines = ssc.receiverStream(new CustomReceiver(listener.accept()))
    lines.print()
    ssc.start()
    ssc.awaitTermination()
  }
  catch
  {
    case e:SocketException => 
        println("Could not listen on port: 9898.")
  }
  }
}


class CustomReceiver(socket: Socket)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { 

    val rand = new Random(System.currentTimeMillis());
    try {
    val is = new BufferedInputStream(socket.getInputStream())
    val os = new PrintStream(new BufferedOutputStream(socket.getOutputStream()))//new

    val out = new DataOutputStream(socket.getOutputStream());

    while(is.available()< 1)
    {
        Thread.sleep(30)
    }
    val buf = new Array[Byte](is.available)
    is.read(buf)
    println(new String(buf))
     out.writeBytes(new String("Hello PHP"))
     out.close()
    is.close()
    socket.close()

    }
    catch {
      case e: SocketException =>
        e.printStackTrace();
    }
      }
    }.start()
  }

  def onStop() {
   // There is nothing much to do as the thread calling receive()
   // is designed to stop by itself isStopped() returns false
  }
}

spark作业正常启动,但当我向socket发送数据时,会出现以下错误:

15/10/07 11:45:32 INFO DAGScheduler: Job 1 failed: start at main.scala:46, took 0.114462 s
Exception in thread "Thread-26" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: java.net.Socket
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:879)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1389)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

共 (0) 个答案