java Apache Spark流媒体定制接收器(socket服务器)
我正在尝试制作一个简单的spark流应用程序,它通过socket同时接收多个请求,spark应用程序在同一socket上响应每个请求,因此管道是这样的:“假设spark作业已启动并运行,并等待新的请求”:
PHP通过socket发送“hello spark==”>;spark处理此请求并使用“hello php”进行回复
所以在这里,我尝试将spark与java多线程结合起来 代码如下:
object socket_code {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("CustomReceiver")
val ssc = new StreamingContext(sparkConf, Seconds(1))
try{
println("Waiting for connection")
val listener = new ServerSocket(9898);
val lines = ssc.receiverStream(new CustomReceiver(listener.accept()))
lines.print()
ssc.start()
ssc.awaitTermination()
}
catch
{
case e:SocketException =>
println("Could not listen on port: 9898.")
}
}
}
class CustomReceiver(socket: Socket)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() {
val rand = new Random(System.currentTimeMillis());
try {
val is = new BufferedInputStream(socket.getInputStream())
val os = new PrintStream(new BufferedOutputStream(socket.getOutputStream()))//new
val out = new DataOutputStream(socket.getOutputStream());
while(is.available()< 1)
{
Thread.sleep(30)
}
val buf = new Array[Byte](is.available)
is.read(buf)
println(new String(buf))
out.writeBytes(new String("Hello PHP"))
out.close()
is.close()
socket.close()
}
catch {
case e: SocketException =>
e.printStackTrace();
}
}
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
}
spark作业正常启动,但当我向socket发送数据时,会出现以下错误:
15/10/07 11:45:32 INFO DAGScheduler: Job 1 failed: start at main.scala:46, took 0.114462 s
Exception in thread "Thread-26" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: java.net.Socket
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:879)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1389)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
共 (0) 个答案