有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

多线程Java并发:并发添加和清除列表项

考虑以下方法:

public void add(final List<ReportingSTG> message) {
        if(stopRequested.get()) {
            synchronized (this) {
                if(stopRequested.get()) {
                    retryQueue.put(message);
                }
            }
        }
        messages.add(message);
        if(messages.size() >= batchSize && waitingThreads.get() == 0) {
            synchronized (this) {
                if(messages.size() >= batchSize && waitingThreads.get() == 0) {
                    final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
                    messages.clear();
                    if(processors.size()>=numOfProcessors) {
                        waitingThreads.incrementAndGet();
                        waitForProcessor();
                        waitingThreads.decrementAndGet();
                    }                   
                    startProcessor(clone);
                }
            }

        }
    }

特别是这两条线:

 1:   final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
 2:   messages.clear();

如果线程A进入同步块并获得当前对象的锁,这是否意味着该对象的实例属性状态不能被同步块之外的其他线程更改(而线程A处于同步块中)

例如,线程执行的行1->;线程B输入了方法并添加了新的列表条目(messages.add(message))->;线程执行的第2行->;线程B添加的条目被删除(与其他条目一起)。这种情况可能吗?或者线程B将等待线程A释放锁,然后才会删除列表项

消息是一个非静态同步列表

UPD:更新方法,可能的解决方案:

public void add(final List<ReportingSTG> message) {
    if(stopRequested.get()) {
        synchronized (this) {
            if(stopRequested.get()) {
                retryQueue.put(message);
            }
        }
    }
    while (addLock.get()){
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {}
    }

    messages.add(message);

    if(messages.size() >= batchSize && waitingThreads.get() == 0) {
        synchronized (this) {
            if(messages.size() >= batchSize && waitingThreads.get() == 0) {

                addLock.set(true);
                final List<List<ReportingSTG>> clone = new ArrayList<List<ReportingSTG>>(messages);
                messages.clear();
                addLock.set(false);

                if(processors.size()>=numOfProcessors) {
                    waitingThreads.incrementAndGet();
                    waitForProcessor();
                    waitingThreads.decrementAndGet();
                }                   
                startProcessor(clone);
            }
        }

    }
}

addLock-AtomicBoolean,默认为false


共 (1) 个答案

  1. # 1 楼答案

    我最近组织了一个DoubleBufferedList班。也许使用它可以完全避免你的问题。顾名思义,它实现了双缓冲算法,但用于列表

    这个类允许你有许多生产者线程和消费者线程。每个生产者线程都可以添加到当前列表中。每个使用者线程都会获取整个当前列表以进行处理

    它也不使用锁,只使用原子,所以应该可以高效运行

    请注意,其中大部分是测试代码。您可以在// TESTING注释之后删除所有内容,但您可能会发现测试的严格性令人欣慰

    public class DoubleBufferedList<T> {
      // Atomic reference so I can atomically swap it through.
      // Mark = true means I am adding to it so unavailable for iteration.
      private AtomicMarkableReference<List<T>> list = new AtomicMarkableReference<>(newList(), false);
    
      // Factory method to create a new list - may be best to abstract this.
      protected List<T> newList() {
        return new ArrayList<>();
      }
    
      // Get and replace the current list.
      public List<T> get() {
        // Atomically grab and replace the list with an empty one.
        List<T> empty = newList();
        List<T> it;
        // Replace an unmarked list with an empty one.
        if (!list.compareAndSet(it = list.getReference(), empty, false, false)) {
          // Failed to replace! 
          // It is probably marked as being appended to but may have been replaced by another thread.
          // Return empty and come back again soon.
          return Collections.EMPTY_LIST;
        }
        // Successfull replaced an unmarked list with an empty list!
        return it;
      }
    
      // Grab and lock the list in preparation for append.
      private List<T> grab() {
        List<T> it;
        // We cannot fail so spin on get and mark.
        while (!list.compareAndSet(it = list.getReference(), it, false, true)) {
          // Spin on mark.
        }
        return it;
      }
    
      // Release the list.
      private void release(List<T> it) {
        // Unmark it. Should never fail because once marked it will not be replaced.
        if (!list.attemptMark(it, false)) {
          throw new IllegalMonitorStateException("it changed while we were adding to it!");
        }
      }
    
      // Add an entry to the list.
      public void add(T entry) {
        List<T> it = grab();
        try {
          // Successfully marked! Add my new entry.
          it.add(entry);
        } finally {
          // Always release after a grab.
          release(it);
        }
      }
    
      // Add many entries to the list.
      public void add(List<T> entries) {
        List<T> it = grab();
        try {
          // Successfully marked! Add my new entries.
          it.addAll(entries);
        } finally {
          // Always release after a grab.
          release(it);
        }
      }
    
      // Add a number of entries.
      public void add(T... entries) {
        // Make a list of them.
        add(Arrays.asList(entries));
      }
      // TESTING.
      // How many testers to run.
      static final int N = 10;
      // The next one we're waiting for.
      static final AtomicInteger[] seen = new AtomicInteger[N];
      // The ones that arrived out of order.
      static final Set<Widget>[] queued = new ConcurrentSkipListSet[N];
    
      static {
        // Populate the arrays.
        for (int i = 0; i < N; i++) {
          seen[i] = new AtomicInteger();
          queued[i] = new ConcurrentSkipListSet();
        }
      }
    
      // Thing that is produced and consumed.
      private static class Widget implements Comparable<Widget> {
        // Who produced it.
        public final int producer;
        // Its sequence number.
        public final int sequence;
    
        public Widget(int producer, int sequence) {
          this.producer = producer;
          this.sequence = sequence;
        }
    
        @Override
        public String toString() {
          return producer + "\t" + sequence;
        }
    
        @Override
        public int compareTo(Widget o) {
          // Sort on producer
          int diff = Integer.compare(producer, o.producer);
          if (diff == 0) {
            // And then sequence
            diff = Integer.compare(sequence, o.sequence);
          }
          return diff;
        }
      }
    
      // Produces Widgets and feeds them to the supplied DoubleBufferedList.
      private static class TestProducer implements Runnable {
        // The list to feed.
        final DoubleBufferedList<Widget> list;
        // My ID
        final int id;
        // The sequence we're at
        int sequence = 0;
        // Set this at true to stop me.
        public volatile boolean stop = false;
    
        public TestProducer(DoubleBufferedList<Widget> list, int id) {
          this.list = list;
          this.id = id;
        }
    
        @Override
        public void run() {
          // Just pump the list.
          while (!stop) {
            list.add(new Widget(id, sequence++));
          }
        }
      }
    
      // Consumes Widgets from the suplied DoubleBufferedList
      private static class TestConsumer implements Runnable {
        // The list to bleed.
        final DoubleBufferedList<Widget> list;
        // My ID
        final int id;
        // Set this at true to stop me.
        public volatile boolean stop = false;
    
        public TestConsumer(DoubleBufferedList<Widget> list, int id) {
          this.list = list;
          this.id = id;
        }
    
        @Override
        public void run() {
          // The list I am working on.
          List<Widget> l = list.get();
          // Stop when stop == true && list is empty
          while (!(stop && l.isEmpty())) {
            // Record all items in list as arrived.
            arrived(l);
            // Grab another list.
            l = list.get();
          }
        }
    
        private void arrived(List<Widget> l) {
          for (Widget w : l) {
            // Mark each one as arrived.
            arrived(w);
          }
        }
    
        // A Widget has arrived.
        private static void arrived(Widget w) {
          // Which one is it?
          AtomicInteger n = seen[w.producer];
          // Don't allow multi-access to the same producer data or we'll end up confused.
          synchronized (n) {
            // Is it the next to be seen?
            if (n.compareAndSet(w.sequence, w.sequence + 1)) {
              // It was the one we were waiting for! See if any of the ones in the queue can now be consumed.
              for (Iterator<Widget> i = queued[w.producer].iterator(); i.hasNext();) {
                Widget it = i.next();
                // Is it in sequence?
                if (n.compareAndSet(it.sequence, it.sequence + 1)) {
                  // Done with that one too now!
                  i.remove();
                } else {
                  // Found a gap! Stop now.
                  break;
                }
              }
            } else {
              // Out of sequence - Queue it.
              queued[w.producer].add(w);
            }
          }
        }
      }
    
      // Main tester
      public static void main(String args[]) {
        try {
          System.out.println("DoubleBufferedList:Test");
          // Create my test buffer.
          DoubleBufferedList<Widget> list = new DoubleBufferedList<>();
          // All threads running - Producers then Consumers.
          List<Thread> running = new LinkedList<>();
          // Start some producer tests.
          List<TestProducer> producers = new ArrayList<>();
          for (int i = 0; i < N; i++) {
            TestProducer producer = new TestProducer(list, i);
            Thread t = new Thread(producer);
            t.setName("Producer " + i);
            t.start();
            producers.add(producer);
            running.add(t);
          }
    
          // Start the same number of consumers.
          List<TestConsumer> consumers = new ArrayList<>();
          for (int i = 0; i < N; i++) {
            TestConsumer consumer = new TestConsumer(list, i);
            Thread t = new Thread(consumer);
            t.setName("Consumer " + i);
            t.start();
            consumers.add(consumer);
            running.add(t);
          }
          // Wait for a while.
          Thread.sleep(5000);
          // Close down all.
          for (TestProducer p : producers) {
            p.stop = true;
          }
          for (TestConsumer c : consumers) {
            c.stop = true;
          }
          // Wait for all to stop.
          for (Thread t : running) {
            System.out.println("Joining " + t.getName());
            t.join();
          }
          // What results did we get?
          for (int i = 0; i < N; i++) {
            // How far did the producer get?
            int gotTo = producers.get(i).sequence;
            // The consumer's state
            int seenTo = seen[i].get();
            Set<Widget> queue = queued[i];
            if (seenTo == gotTo && queue.isEmpty()) {
              System.out.println("Producer " + i + " ok.");
            } else {
              // Different set consumed as produced!
              System.out.println("Producer " + i + " Failed: gotTo=" + gotTo + " seenTo=" + seenTo + " queued=" + queue);
            }
          }
        } catch (InterruptedException ex) {
          ex.printStackTrace();
        }
      }
    }