有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java是否正确地完成了锁定

我正在读取UDP提要,然后对其进行解码并写入MSMQ(消息队列)

我创建了一个名为UDPReader的新线程。然后,UDPReader创建一个线程池并调用类IPAddressConnection。Run inside ipaddrConnection包含一个while循环,该循环连续从多播socket读取数据包并将其推送到类parseUDP。从parseUDP对其进行解码,最后推送到写入MSMQ的类。我认为在ipaddrConnection中的while循环中,我没有正确锁定线程,因为线程试图写入MSMQ中的同一内存位置。我认为通过将锁放在while循环中,池中的每个线程在“Critical Section”1中都有自己的时间。接收一个数据包,然后2。解码并写入MSMQ。我仍在学习并发性并寻求一些帮助。我提供了一个崩溃转储文件,我不知道如何正确读取它以及我的UDPREDER和ipaddrConnection类。parseUDP调用一个类来解码数据包,该类调用一个MSMQ类来写入内存。所有这些都在我的关键部分

class UDPReader implements Runnable
{
    private final String ip, socket, queue, threadName;
    private final JTextArea screen;

    UDPReader(String ip, String socket, String queue, String threadName, JTextArea screen) 
    {
        this.ip = ip;
        this.socket = socket;
        this.queue = queue;
        this.threadName = threadName;
        this.screen = screen;
    }

    public void run()
    {
        screen.append("Thread " + threadName + " running\n\n");
        ExecutorService executor = Executors.newFixedThreadPool(5);
        Runnable reader = new ipaddrConnection(ip, socket, queue);
        executor.execute(reader);
    }

}

public final class ipaddrConnection implements Runnable
{
    private final ReentrantLock lock = new ReentrantLock();
    byte[] bytes = new byte[(int)100000];
    InetAddress group; 
    MulticastSocket s;
    DatagramPacket packet = new DatagramPacket(bytes, bytes.length);
    private String queue;

    public ipaddrConnection(String ip, String socket, String queue) {
        try {
            this.s = new MulticastSocket(Integer.parseInt(socket));
            this.group = InetAddress.getByName(ip);
            this.queue = queue;
        } catch (IOException ex) {
            Logger.getLogger(ipaddrConnection.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

        @Override
        public void run() {
            try {
                parseUDP p = new parseUDP(queue);
                s.joinGroup(group);
                s.setSoTimeout(95000);

                try{
                    while(true){
                        lock.lock();
                        s.receive(packet);
                        p.parseUDP(packet.getData());
                    } 
                }finally {
                    lock.unlock();
                }


             } catch (SocketException ex) {
                Logger.getLogger(ipaddrConnection.class.getName()).log(Level.SEVERE, null, ex);
            } catch (IOException ex) {
                Logger.getLogger(ipaddrConnection.class.getName()).log(Level.SEVERE, null, ex);
            }
        }

}

事故报告 https://drive.google.com/file/d/0B4GWNCU6_CBlM2tJNGJqNzRVazg/view?usp=sharing


共 (3) 个答案

  1. # 1 楼答案

    在代码中,锁没有任何用处

    每个线程都有自己的锁,因此一次可以有多个线程使用队列(因为线程1锁定了锁1,线程2锁定了锁2,没有任何东西可以阻止它们同时使用队列)

    如果在代码中使用lock字段static,那么所有线程都将使用相同的锁

    您可能仍然有问题,因为线程永远不会释放锁(除非遇到异常),所以只允许一个线程执行工作:

    try{
        while(true){
            lock.lock();
            s.receive(packet);
            p.parseUDP(packet.getData());
        } 
    }finally {
        lock.unlock();
    }
    

    请注意,线程解锁锁的唯一方法是出现异常

    你可能想要更像这样的东西:

    while(true) {
        s.receive(packet);
        try {
            lock.lock();
            s.parseUDP(packet.getData());
        } finally {
            lock.unlock();
        }
    }
    

    -使用这种结构,线程只在解析数据包时持有锁,而不是在接收数据包时持有锁。(我不知道这是否是你真正想要的)

  2. # 2 楼答案

    ExecutorService executor = Executors.newFixedThreadPool(5);
    Runnable reader = new ipaddrConnection(ip, socket, queue);
    executor.execute(reader);
    

    这段代码实际上是单线程的,因为虽然池有五个线程,但只使用了一个

  3. # 3 楼答案

    1. 让UDPReader实现Runnable和它的run()实现至少不是惯用的
    2. 正如immibis所提到的,您的锁对象不会在线程之间共享,也不会提供您想要的保护
    3. 只有在退出while (true) { ... }时才解锁,也就是说永不解锁。考虑到这一点,你可能想考虑一些类似的事情:

          public class UDPReader {
              ...
      
              UDPReader(String ip, String socket, String queue, String threadName, JTextArea screen, numberOfThreads) {
                  ...
                  this.numberOfThreads = numberOfThreads;
                  this.lock = new ReentrantLock();
              }
      
              public void run() {
                  ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
                  for (int i = 0; i < numberOfThreads; i++){
                      executor.execute(new ipaddrConnection(ip, socket, queue, lock));
                  }
              }
           }
      
      
           public final class ipaddrConnection implements Runnable {
              private lock ;
              ...
      
              public ipaddrConnection(String ip, String socket, String queue, ReentrantLock lock) {
                  ...
                  this.lock = lock;
              }
      
              @Override
              public void run() {
                      ...
                      while (true) {
                          try {
                              lock.lock();
                              s.receive(packet);
                              p.parseUDP(packet.getData());
                          } finally {
                              lock.unlock();
                          }
                      }
                   ....
              }
          }
      }