有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

面向批处理的java非阻塞队列

我们有一个专家,多生产者(用户)和单一消费者(引擎),队列。用户线程的运行频率更高,并且总是将单个元素添加到队列中。引擎线程操作运行的频率较低,并且批处理堆栈元素。如果堆栈为空,它将停止,直到用户线程添加了一个条目。这样,只有当队列从空变为1时,才需要发出通知

在这个实现中,引擎线程不是一次迭代并删除一个项目,而是删除所有项目-一个drainAll,而不是drainTo。没有其他操作可以改变堆栈-只有用户线程add和引擎线程drainAll

目前,我们通过一个同步的链表来实现这一点,我们想知道是否有一种非阻塞的方式来实现这一点。JDK类上的drainTo操作将迭代堆栈,我们只希望在一个操作中获取堆栈中的所有内容,而不进行迭代-因为每次迭代都会命中volatile/cas相关逻辑,所以理想情况下,我们只希望每个drainAll命中一次。引擎线程可以对每个单独的元素进行迭代和操作,而无需接触sync/volatile/cas操作

当前的实现类似于:

public class SynchronizedPropagationQueue implements PropagatioQueue {
    protected volatile PropagationEntry head;
    protected volatile PropagationEntry tail;

    protected synchronized void addEntry( PropagationEntry entry ) {
        if ( head == null ) {
            head = entry;
            notifyWaitOnRest();
        } else {
            tail.setNext( entry );
        }
        tail = entry;
    }

    @Override
    public synchronized PropagationEntry drainAll() {
        PropagationEntry currentHead = head;
        head = null;
        tail = null;
        return currentHead;
    }

    public synchronized void waitOnRest() {
        try {
            log.debug("Engine wait");
            wait();
        } catch (InterruptedException e) {
            // do nothing
        }
        log.debug("Engine resumed");
    }


    @Override
    public synchronized void notifyWaitOnRest() {
        notifyAll();
    }
}

asdf


共 (2) 个答案

  1. # 1 楼答案

    Currently we do this via a synchronised linked list, we are wondering if there is a non-blocking way to do this. The drainTo operation on JDK classes will iterate the stack, we just want to take everything in the stack in one operation, without iterating

    也许我不明白,但使用BlockingQueue.drainTo(...)方法似乎比您的实现更好。例如LinkedBlockingQueue.drainTo(...)方法在该方法周围只有一个锁,我看不到迭代开销

    如果这不是一个学术讨论,那么我怀疑您的性能问题是否与队列本身有关,是否会将精力集中在其他领域。如果是学术性的,那么@Matt的答案可能会更好,尽管肯定还有很多代码需要编写来支持完整的Collection方法列表

  2. # 2 楼答案

    堆栈有一个非常简单的非阻塞实现,可以轻松地支持并发“pop all”操作,并且可以轻松地检测空->;非空转换。您可以让所有制作人将项目推到堆栈上,然后让引擎立即清空整个项目。看起来是这样的:

    public class EngineQueue<T>
    {
        private final AtomicReference<Node<T>> m_lastItem = new AtomicReference<>();
    
        public void add(T item)
        {
            Node<T> newNode = new Node<T>(item);
            do {
                newNode.m_next = m_lastItem.get();
            } while(!m_lastItem.compareAndSet(newNode.m_next, newNode)); 
    
            if (newNode.m_next == null)
            {
                // ... just went non-empty signal any waiting consumer
            }
        }
    
        public List<T> removeAll()
        {
            Node<T> stack = m_lastItem.getAndSet(null);
            // ... wait for non-empty if necessary 
            List<T> ret = new ArrayList<>();
            for (;stack != null; stack=stack.m_next)
            {
                ret.add(stack.m_data);
            }
            Collections.reverse(ret);
            return ret;
        }
        private static class Node<U>
        {
            Node<U> m_next;
            final U m_data;
            Node(U data)
            {
                super();
                m_data = data;
            }
        }
    }
    

    用于在空车周围发送信号->;非空转换,可以使用正常同步。如果您只在检测到空状态时执行此操作,则不会太昂贵。。。因为你只有在失业的时候才能进入空状态