有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

线程池执行器Java线程池

我正在尝试编写多线程java程序来并行获取mongo数据并存储它。下面是回调代码,使用它创建的70个线程线程池 工人。我正在使用Callable回调

问题在于提取的项目不仅仅返回到回调列表。不知道出了什么问题。有人能帮忙吗?甚至是“抓取的…”打印的数量大于“索引…”线是否相互重叠

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

import javax.xml.parsers.ParserConfigurationException;

import org.apache.solr.client.solrj.SolrServerException;
import org.xml.sax.SAXException;

import com.chegg.migrator.question.entity.TbsProblem;

public class CallBack {
    List<TbsProblem> problemsToBeIndex = new ArrayList<TbsProblem>();
    final int NO_OF_THREAD = 70;

    public void returnResult(List<TbsProblem> result) throws IOException, SAXException, ParserConfigurationException, SolrServerException {
        problemsToBeIndex.addAll(result);
        System.out.println(" Data Indexed "+problemsToBeIndex.size());
    }
    public  List<TbsProblem> andAction() throws IOException, SAXException, ParserConfigurationException, SolrServerException {
        ThreadPoolExecutor es = (ThreadPoolExecutor) Executors.newFixedThreadPool(NO_OF_THREAD);
            int ctr=0;
            while(ctr <= 100000) {
                CallingBackWorker worker = new CallingBackWorker();
                worker.setCallBack(this);
                final Future future = es.submit( worker);
                ctr +=100;
            }

            while(!es.isTerminated()) {}
            es.shutdown();
            System.out.println(" finished the retrival ");
        System.out.println("try to do something while the work is being done....");
        System.out.println("&quot;End work&quot; "+ new java.util.Date());
        return problemsToBeIndex;
    }

    public static void main(String[] argv) throws IOException, SAXException, ParserConfigurationException, SolrServerException {
        new CallBack().andAction();
    }
}

package com.chegg.migrator.question.parallel.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

import com.chegg.migrator.question.entity.TbsProblem;

public class CallingBackWorker implements Callable<Object>{
    CallBack callBack;

    static int calls = 0;
    static int fetched =0;
    static int indexed =0;
    List<TbsProblem> problems = new ArrayList<TbsProblem>();

    public CallingBackWorker() {
        super();
    }

    @Override
    public Object call() throws Exception {
        System.out.println("  fetching the data ....."+calls++);
        List<TbsProblem> problems = new ArrayList<TbsProblem>();
        for(int i=0;i<50;i++) {
            TbsProblem problem = new TbsProblem();
            problem.setId("fetched"+fetched);
            problems.add(problem);
        }
        Thread.sleep(500);
        fetched +=problems.size();
        System.out.println(" FETCHED ^^^^^^"+fetched);

        List<String> lists = new ArrayList<String>();
        for(TbsProblem tbs : problems) {
            lists.add(tbs.getId());
        }
        Thread.sleep(500);
        indexed += lists.size();
        System.out.println("   committed, exiting.");
        System.out.println(" INDEXED $$$$"+indexed);
        callBack.returnResult(problems);
        return null;
    }

      public CallBack getCallBack() {
        return callBack;
    }

    public void setCallBack(CallBack callBack) {
        this.callBack = callBack;
    }
}

共 (1) 个答案

  1. # 1 楼答案

    是否在每个可调用的外部声明了已获取的?你在几个线程中递增它?如果是这样,那就是一个问题。递增整数不是线程安全的。如果是这种情况,请使用原子整数替换已获取的,或在同步块内递增它

    为什么在多个线程中递增一个整数是一个问题?每个线程将执行以下操作:

    STEP 1: read current value of fetched
    STEP 2: calculate current value + problems.size()
    STEP 3: assign new value to fetched
    

    图像线程(1)完成步骤1和2,将获取的的新值计算为10。然后线程(2)到(50)完成步骤1、2和3获取的现在的值为1000。最后,线程(1)完成步骤3,再次为获取的赋值10