线程池执行器Java线程池
我正在尝试编写多线程java程序来并行获取mongo数据并存储它。下面是回调代码,使用它创建的70个线程线程池 工人。我正在使用Callable回调
问题在于提取的项目不仅仅返回到回调列表。不知道出了什么问题。有人能帮忙吗?甚至是“抓取的…”打印的数量大于“索引…”线是否相互重叠
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.solr.client.solrj.SolrServerException;
import org.xml.sax.SAXException;
import com.chegg.migrator.question.entity.TbsProblem;
public class CallBack {
List<TbsProblem> problemsToBeIndex = new ArrayList<TbsProblem>();
final int NO_OF_THREAD = 70;
public void returnResult(List<TbsProblem> result) throws IOException, SAXException, ParserConfigurationException, SolrServerException {
problemsToBeIndex.addAll(result);
System.out.println(" Data Indexed "+problemsToBeIndex.size());
}
public List<TbsProblem> andAction() throws IOException, SAXException, ParserConfigurationException, SolrServerException {
ThreadPoolExecutor es = (ThreadPoolExecutor) Executors.newFixedThreadPool(NO_OF_THREAD);
int ctr=0;
while(ctr <= 100000) {
CallingBackWorker worker = new CallingBackWorker();
worker.setCallBack(this);
final Future future = es.submit( worker);
ctr +=100;
}
while(!es.isTerminated()) {}
es.shutdown();
System.out.println(" finished the retrival ");
System.out.println("try to do something while the work is being done....");
System.out.println(""End work" "+ new java.util.Date());
return problemsToBeIndex;
}
public static void main(String[] argv) throws IOException, SAXException, ParserConfigurationException, SolrServerException {
new CallBack().andAction();
}
}
package com.chegg.migrator.question.parallel.test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import com.chegg.migrator.question.entity.TbsProblem;
public class CallingBackWorker implements Callable<Object>{
CallBack callBack;
static int calls = 0;
static int fetched =0;
static int indexed =0;
List<TbsProblem> problems = new ArrayList<TbsProblem>();
public CallingBackWorker() {
super();
}
@Override
public Object call() throws Exception {
System.out.println(" fetching the data ....."+calls++);
List<TbsProblem> problems = new ArrayList<TbsProblem>();
for(int i=0;i<50;i++) {
TbsProblem problem = new TbsProblem();
problem.setId("fetched"+fetched);
problems.add(problem);
}
Thread.sleep(500);
fetched +=problems.size();
System.out.println(" FETCHED ^^^^^^"+fetched);
List<String> lists = new ArrayList<String>();
for(TbsProblem tbs : problems) {
lists.add(tbs.getId());
}
Thread.sleep(500);
indexed += lists.size();
System.out.println(" committed, exiting.");
System.out.println(" INDEXED $$$$"+indexed);
callBack.returnResult(problems);
return null;
}
public CallBack getCallBack() {
return callBack;
}
public void setCallBack(CallBack callBack) {
this.callBack = callBack;
}
}
# 1 楼答案
是否在每个可调用的外部声明了已获取的?你在几个线程中递增它?如果是这样,那就是一个问题。递增整数不是线程安全的。如果是这种情况,请使用原子整数替换已获取的,或在同步块内递增它
为什么在多个线程中递增一个整数是一个问题?每个线程将执行以下操作:
图像线程(1)完成步骤1和2,将获取的的新值计算为10。然后线程(2)到(50)完成步骤1、2和3获取的现在的值为1000。最后,线程(1)完成步骤3,再次为获取的赋值10