有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

executorservice Java:Threadpoolexecutor是否使用作业列表提交作业?

下面的代码是CustomerjobManager。客户有姓名、地址和账户余额。工作是从一个客户到另一个客户的资金转移。这是一个线程池执行器培训计划。下面的版本有效,我一个接一个地提交作业

顾客。爪哇

public class customer {

    private String name;
    private String adress;
    private Double accountBalance;

    public customer(String name, String adress, Double accountBalance)
    {
        this.name = name;
        this.adress = adress;
        this.accountBalance = accountBalance;
    }

    public String getName() { return name; }

    public String getAdress()
    {
        return adress;
    }

    public Double getAccountBalance(){return accountBalance;}

    public void setAccountBalance(double accountBalance){this.accountBalance=accountBalance;}

    @Override
    public String toString(){

        return "[" + name+"; " +adress+"; "+accountBalance+"]";
    }
}

顾客组织者。爪哇

import java.util.ArrayList;
import java.util.List;

public class customerOrganizer {

    private static final customerOrganizer myJobOrganizer = new customerOrganizer();

    public static customerOrganizer getJobOrganizer(){
        return myJobOrganizer;
    }

    private List<customer> customerList = new ArrayList<customer>();

    public void add_customer(customer kunde)
    {
        this.customerList.add(kunde);
    }

    public Iterable<customer> all_customers()
    {
        return this.customerList;
    }

    public static customerOrganizer getInstance()
    {
        return myJobOrganizer;
    }

}

工作。爪哇

public class job implements Runnable {
    private customer kunde1;
    private customer kunde2;
    private Double transfer;

    public job(customer kunde1, customer kunde2, Double transfer) {
        this.kunde1 = kunde1;
        this.kunde2 = kunde2;
        this.transfer = transfer;
    }

    @Override
    public String toString(){

        return "[" + kunde1+"; " +kunde2+"; "+transfer+"]";
    }

    public void run() {

        System.out.println("Starting transfer");

        Double geber = this.kunde1.getAccountBalance();
        Double nehmer = this.kunde2.getAccountBalance();

        Double geberNeu = geber - this.transfer;
        this.kunde1.setAccountBalance(geberNeu);

        Double nehmerNeu = nehmer + this.transfer;
        this.kunde2.setAccountBalance(nehmerNeu);
        System.out.println("Transfer done");
    }
}

工作组织者。爪哇

public class jobOrganizer {

    private static final jobOrganizer myJobOrganizer = new jobOrganizer();

    public static jobOrganizer getMyJobOrganizer() {
        return myJobOrganizer;
    }

    private List<job> jobList = new ArrayList<job>();

    public int getAmount(){ return jobList.size();}

    public void add_job(job newJob) {
        this.jobList.add(newJob);
    }

    public Iterable<job> all_jobs() {
        return this.jobList;
    }

    public static jobOrganizer getInstance() {
        return myJobOrganizer;
    }

}

梅因。爪哇

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    public static void main(String[] args) {
        customerOrganizer myCustomerOrganizer = new customerOrganizer();
        jobOrganizer myJobOrganizer= new jobOrganizer();

        customer mueller = new customer("Tim Mueller", "Strasse 1", 1077.00);
        customer john = new customer("John Doe", "Strasse 2",503.00);
        customer meier = new customer("John Meier", "Strasse 3", 8500.50);
        customer wurst = new customer("Hans Wurst", "Strasse 4", 1000.00);

        myCustomerOrganizer.add_customer(mueller);
        myCustomerOrganizer.add_customer(john);
        myCustomerOrganizer.add_customer(meier);
        myCustomerOrganizer.add_customer(wurst);

        job transfer1= new job(meier,wurst,500.50);
        job transfer2= new job(mueller,john,77.00);

        myJobOrganizer.add_job(transfer1);
        myJobOrganizer.add_job(transfer2);

        // this works:
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.submit(transfer1);
        executor.submit(transfer2);
        executor.shutdown();


    }}

所以,我有一份工作清单,我有一个想法,我应该使用它。我不想一个接一个地提交作业,而是想根据作业列表提交作业。我一开始就想到了这样的事情:

 int threads = myJobOrganizer.getAmount();
        ExecutorService executor = Executors.newFixedThreadPool(threads);
        for (int i = 0; i <threads+1; i++){
            //submit jobs? execute?
        }

此外,myJobOrganizer还需要实现Runnable?我还看到解决方案是这样的:

for(condition){

        executor.execute(new Runnable() {

            @Override
            public void run() {
              submit the jobs?
                }
            }}

但我真的不知道该怎么做。基本上,我不知道如何以正确的方式从我的作业列表中提取作业,以便将它们提交给executor service><

关于线程安全的更新

所以,我按照Rab提供的链接,使用了CompletionService。主要内容的最后一部分。java现在看起来像这样:

int threads = myJobOrganizer.getAmount();

System.out.println(myCustomerOrganizer.all_customers().toString());
// executor service   
ExecutorService executor = Executors.newFixedThreadPool(threads);
// completion service is applied on executor
CompletionService service = new ExecutorCompletionService(executor);

for(Callable<Job> myJob : myJobOrganizer.all_jobs()){
    service.submit(myJob);
}
executor.shutdown();
// pause the main for control printout -> not nice yet, I am working on 
// joining threads
try {
    Thread.sleep(1000);
} catch (InterruptedException e) {
    e.printStackTrace();
}
// control print
System.out.println(myCustomerOrganizer.all_customers().toString());

请注意,这个编辑是为了完成条目,但仍然是错误的(可悲的)。提供的答案与原始问题相关,与线程安全无关

谢谢你的时间和努力


共 (2) 个答案

  1. # 1 楼答案

    ExecutorService处理任务如何在工人之间分配。你所要做的就是一个接一个地通过考试

    for (job jobObj : myJobOrganizer.all_jobs()) 
        executor.submit(jobObj);
    

    请注意,sumbit返回一个Future<?>,用于跟踪任务是否完成,或者任务是否出错(以及任务结果,但runnable没有结果)。如果你关心这些东西,你会想把它们收集到某种容器中,比如List


    如果将job更改为Callable<Void>,提交将更容易Callable是Runnable的某种扩展,允许任务在完成时产生结果。因为您的传输没有结果,所以使用java.lang.Void作为泛型参数的填充类型就可以了

    现在,只要做executor.invokeAll(myJobOrganizer.all_jobs())就足够了。这将节省一些上下文切换,加快速度。(实际上非常重要,因为你的任务都很小)


    顺便说一句,您应该知道并发访问需要适当的同步,而您没有同步。如果不同的工作涉及同一个账户,你的账户可能会以错误的状态结束。我们还通常在LargeCamelCase中命名类,在smallCamelCase中命名方法

  2. # 2 楼答案

    如果不想使用循环,可以使用Stream来实现。如果您使用的java版本大于或等于8,这里有一个例子

        myJobList
                .stream()
                .forEach(e -> executor.execute(() -> {
            //submit the jobs
        }));
    

    或者

        myJobOrganizer.all_jobs()
                .stream()
                .forEach(e -> executor.submit(e));
    

    如果你真的不想循环,你可以使用executor.invokeAll(myJobList)提交你的列表

    我觉得这个答案很有趣,你应该仔细研究一下