有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Apache Camel:缓存流文件删除导致找不到文件错误

场景: 我正在尝试流式传输和处理一些大型xml文件。这些文件是从生产者异步发送的

producerTemplate.sendBodyAndHeaders(endpointUri, inStream, ImmutableMap.of(JOBID_PROPERTY, importJob.getId()));

我需要对所有文件输入流进行批处理,通过xpath探测来识别文件,并根据其内容重新排序。我有以下路线:

   from("direct:route1")
        .streamCaching()
        .choice()
        .when(xpath("//Tag1"))             .setHeader("execOrder", constant(3))    .setHeader("xmlRoute", constant( "direct:some-route"))
        .when(xpath("//Tag2"))             .setHeader("execOrder", constant(1))    .setHeader("xmlRoute", constant( "direct:some-other-route"))
        .when(xpath("//Tag3"))             .setHeader("execOrder", constant(2))    .setHeader("xmlRoute", constant( "direct:yet-another-route"))
        .otherwise()
            .to("direct:somewhereelse")
        .end()
        .resequence(header("execOrder"))
        .batch(new BatchResequencerConfig(300, 10000L))
        .allowDuplicates()
        .recipientList(header("xmlRoute"))

运行我的代码时,出现以下错误:

   2017-11-23 11:43:13.442  INFO 10267 --- [ - Batch Sender] c.w.n.s.m.DefaultImportJobService        : Updating entity ImportJob with id 5a16a61803af33281b22c716
   2017-11-23 11:43:13.451  WARN 10267 --- [ - Batch Sender] org.apache.camel.processor.Resequencer   : Error processing aggregated exchange: Exchange[ID-int-0-142-bcd-wsint-pro-59594-1511433568520-0-20]. Caused by: [org.apache.camel.RuntimeCamelException - Cannot reset stream from file /var/folders/dc/fkrgdrnx6txbg7jfdjd_58mm0000gn/T/camel/camel-tmp-39abaae8-9bdd-435a-b63d-299ad8b06415/cos1499080503439465502.tmp]

   org.apache.camel.RuntimeCamelException: Cannot reset stream from file /var/folders/dc/fkrgdrnx6txbg7jfdjd_58mm0000gn/T/camel/camel-tmp-39abaae8-9bdd-435a-b63d-299ad8b06415/cos1499080503439465502.tmp
   at org.apache.camel.converter.stream.FileInputStreamCache.reset(FileInputStreamCache.java:91)

我已经读到here当XPathBuilder启动时,FileInputStreamCache关闭。调用getDocument(),并删除临时文件,因此当XPathBuilder想要重置InputStream时,您将获得FileNotFoundException

解决方案似乎是禁用磁盘假脱机,如下所示:

camelContext.getStreamCachingStrategy().setSpoolThreshold(-1);

但是,我不想这样做,因为RAM限制,即文件可以达到600MB,我不想将它们保留在内存中。有什么办法解决这个问题吗


共 (2) 个答案

  1. # 1 楼答案

    我最终按照克劳斯和里卡多的建议做了。我做了一个单独的路径,将文件保存到磁盘。然后是另一个,它探测文件并按照固定顺序对交换进行重新排序

    String xmlUploadDirectory = "file://" + Files.createTempDir().path + "/xmls?noop=true"
    
    from("direct:route1")
        .to(xmlUploadDirectory)
    
    from(xmlUploadDirectory)
        .choice()
        .when(xpath("//Tag1")).setHeader("execOrder", constant(3)).setHeader("xmlRoute", constant( "direct:some-route"))
        .when(xpath("//Tag2")).setHeader("execOrder", constant(1)).setHeader("xmlRoute", constant( "direct:some-other-route"))
        .when(xpath("//Tag3")).setHeader("execOrder", constant(2)).setHeader("xmlRoute", constant( "direct:yet-another-route"))
        .otherwise()
        .to("direct:somewhereelse")
        .end()
        .to("direct:resequencing")
    
    from("direct:resequencing")
        .resequence(header("execOrder"))
        .batch(new BatchResequencerConfig(300, 10000L))
        .allowDuplicates()
        .recipientList(header("xmlRoute"))
    
  2. # 2 楼答案

    重排序器是一种两段模式(有状态的),它将使原始交换提前完成,因为它在内存中保留一个副本,同时重新排序,直到间隙满足并以新的顺序发送消息

    由于您的输入流来自某个HTTP服务,因此该服务将在重排序器输出交换之前关闭

    或者按照建议先存储到本地磁盘,然后让重排序器处理,或者找到一种不使用重排序器的方法