并发Java“顺序”信号量
我有一个问题,对如何解决这个问题有一个模糊的想法,但我会尽量在上下文上过度考虑,以避免出现XY problem
我有一个非同步方法,可以立即返回一个guava ^{Semaphore
:
public class ConcurrentCallsLimiter<In, Out> {
private final Function<In, ListenableFuture<Out>> fn;
private final Semaphore semaphore;
private final Executor releasingExecutor;
public ConcurrentCallsLimiter(
int limit, Executor releasingExecutor,
Function<In, ListenableFuture<Out>> fn) {
this.semaphore = new Semaphore(limit);
this.fn = fn;
this.releasingExecutor = releasingExecutor;
}
public ListenableFuture<Out> apply(In in) throws InterruptedException {
semaphore.acquire();
ListenableFuture<Out> result = fn.apply(in);
result.addListener(() -> semaphore.release(), releasingExecutor);
return result;
}
}
这样我就可以把我的电话放在这节课上,改为打电话:
ConcurrentLimiter<Foo, Bar> cl =
new ConcurrentLimiter(10, executor, someService::turnFooIntoBar);
for (Foo foo : foos) {
ListenableFuture<Bar> bar = cl.apply(foo);
// handle bar (no pun intended)
}
这种有点有效。问题是尾部延迟非常糟糕。有些调用会变得“不走运”,最终花很长时间试图获取该方法调用中的资源。一些内部指数退避逻辑加剧了这种情况,例如,与新呼叫相比,不走运的呼叫获得所需资源的机会越来越少,而新呼叫更急切,等待时间更短,然后重试
理想的解决方法是,如果有类似于那个信号量的东西,但它有一个顺序的概念。例如,如果限制为10,那么第11次呼叫当前必须等待前10次的任何完成。我希望第11次通话必须等待第一次通话完成。这样一来,“不走运”的电话就不会因为不断有新的来电而继续挨饿
我似乎可以为调用分配一个整数序列号,并以某种方式跟踪尚未完成的最低序列号,但我不太明白如何实现这一点,尤其是因为在AtomicInteger
或其他任何地方都没有任何有用的“等待”方法
# 1 楼答案
我想到的一个不太理想的解决方案是使用
Queue
存储所有未来:然而,这似乎是对记忆的极大浪费。涉及的对象可能有点大,限制可能设定得相当高。所以我愿意接受没有O(n)内存使用的更好答案。似乎它在O(1)中是可行的
# 2 楼答案
您可以使用公平参数创建信号量:
//公平-如果此信号量将保证在争用下先入先出授予许可证,则为true,否则为false