有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

并发Java“顺序”信号量

我有一个问题,对如何解决这个问题有一个模糊的想法,但我会尽量在上下文上过度考虑,以避免出现XY problem

我有一个非同步方法,可以立即返回一个guava ^{},我需要调用它数十万次或数百万次(未来本身可能需要一段时间才能完成)。我真的无法改变这种方法的内部结构。它内部涉及一些严重的资源争用,所以我想限制一次对该方法调用的次数。所以我试着使用Semaphore

public class ConcurrentCallsLimiter<In, Out> {
  private final Function<In, ListenableFuture<Out>> fn;
  private final Semaphore semaphore;
  private final Executor releasingExecutor;

  public ConcurrentCallsLimiter(
      int limit, Executor releasingExecutor,
      Function<In, ListenableFuture<Out>> fn) {
    this.semaphore = new Semaphore(limit);
    this.fn = fn;
    this.releasingExecutor = releasingExecutor;
  }

  public ListenableFuture<Out> apply(In in) throws InterruptedException {
    semaphore.acquire();
    ListenableFuture<Out> result = fn.apply(in);
    result.addListener(() -> semaphore.release(), releasingExecutor);
    return result;
  }
}

这样我就可以把我的电话放在这节课上,改为打电话:

ConcurrentLimiter<Foo, Bar> cl =
    new ConcurrentLimiter(10, executor, someService::turnFooIntoBar);
for (Foo foo : foos) {
  ListenableFuture<Bar> bar = cl.apply(foo);
  // handle bar (no pun intended)
}

这种有点有效。问题是尾部延迟非常糟糕。有些调用会变得“不走运”,最终花很长时间试图获取该方法调用中的资源。一些内部指数退避逻辑加剧了这种情况,例如,与新呼叫相比,不走运的呼叫获得所需资源的机会越来越少,而新呼叫更急切,等待时间更短,然后重试

理想的解决方法是,如果有类似于那个信号量的东西,但它有一个顺序的概念。例如,如果限制为10,那么第11次呼叫当前必须等待前10次的任何完成。我希望第11次通话必须等待第一次通话完成。这样一来,“不走运”的电话就不会因为不断有新的来电而继续挨饿

我似乎可以为调用分配一个整数序列号,并以某种方式跟踪尚未完成的最低序列号,但我不太明白如何实现这一点,尤其是因为在AtomicInteger或其他任何地方都没有任何有用的“等待”方法


共 (2) 个答案

  1. # 1 楼答案

    我想到的一个不太理想的解决方案是使用Queue存储所有未来:

    public class ConcurrentCallsLimiter<In, Out> {
      private final Function<In, ListenableFuture<Out>> fn;
      private final int limit;
      private final Queue<ListenableFuture<Out>> queue = new ArrayDeque<>();
    
      public ConcurrentCallsLimiter(int limit, Function<In, ListenableFuture<Out>> fn) {
        this.limit = limit;
        this.fn = fn;
      }
    
      public ListenableFuture<Out> apply(In in) throws InterruptedException {
        if (queue.size() == limit) {
          queue.remove().get();
        }
        ListenableFuture<Out> result = fn.apply(in);
        queue.add(result);
        return result;
      }
    }
    

    然而,这似乎是对记忆的极大浪费。涉及的对象可能有点大,限制可能设定得相当高。所以我愿意接受没有O(n)内存使用的更好答案。似乎它在O(1)中是可行的

  2. # 2 楼答案

    您可以使用公平参数创建信号量:

    Semaphore(int permits, boolean fair)
    

    //公平-如果此信号量将保证在争用下先入先出授予许可证,则为true,否则为false