java将嵌套for循环转换为反应通量
我有一个代码逻辑,需要在一系列嵌套循环中。 以下是示例:
String q = null;
for(C c in getC()){
if(Valid(c)){
String Id = c.Id;
List<M> mList = getM(Id);
for(M m in mList){
q = Message.Format("query:{}",m);
List<D> dList = getD(Id, q);
sendDToKafka(dList);
}
}
}
我试图使用project reactor将上述逻辑转换为反应性逻辑。 到目前为止,我掌握的代码是:
Flux.fromIterable(getC())
.map(c -> c.getId)
.doOnNext(cId -> getM(cId))
.map(m -> m.trim())
.doOnNext(m -> getD()) // need to pass in query and Id to getD()
.subscribe();
我面临的问题很少:
- 如何将IsValid()方法合并到查询中李>
- 我需要重用我在第一张地图上得到的cId值-。地图(c->;c.getId),分为两处。如果在下一步中不立即使用,我如何跟踪该值李>
- 有没有办法在反应式查询中形成q变量,作为getD()中的参数传递
- 如果代码是一种有效的方法,我将非常感谢任何反馈李>
# 1 楼答案
首先,
doOnNext
方法用于副作用,而不是改变事件的流程。此外,如果要将某个内容转换为反应式,则整个管道需要是非阻塞的,并且应该避免调用任何阻塞的代码。如果您有无法更改的阻塞代码,you can follow the advice here:对于筛选,您可以使用
filter
,对于在多个位置使用cId
,您可以将其作为元组传递到链下(有很多库),或者您可以为此创建自己的类# 2 楼答案
有一个接线员:
在特定的简单情况下,您可以简单地使用嵌套的
flatmap
,如下所示:见What is a good idiom for nested flatMaps in Java Reactor?
不能在流中使用非final
q
变量。看看AtomicReference<String>
正如在另一个答案中已经提到的,
doOnNext
是指副作用,map
是指从一种类型映射到另一种类型