并发编程(四)_Master-Worker代码

Master-Worker模式的实例代码:
【Master.class】

package com.jxb.thread11;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * Master-Worker模式——并行计算模式
 *  Master 负责接收和分配任务
 */
public class Master {
    
    //1、应该有一个 承装所有任务的容器
    private ConcurrentLinkedQueue<Task> workQueue=new ConcurrentLinkedQueue<Task>();
    
    //2、使用HashMap去承装所有的worker对象
    private HashMap<String, Thread> workers=new HashMap<String, Thread>();
    
    //3、使用一个容器承装每一个worker并行执行的任务的结果集
    private ConcurrentHashMap<String, Object> resultMap=new ConcurrentHashMap<String, Object>();
    
    /**
     * 4、构造函数
     * @param worker        worker对象
     * @param worlerCount   多少个worker对象(子节点)
     */
    public Master(Worker worker,int worlerCount){
        // 每一个worker对象都需要有master的引用,workQueue由于任务的领取
        worker.setWorkerQueue(this.workQueue);
        // resultMap 用于任务的提交
        worker.setResultMap(this.resultMap);
        
        //循环将worker装入workers容器
        for(int i=0;i<worlerCount;i++){
            //key表示每一个worker的名字(方便跟踪),value表示线程执行对象(worker对象,实现线程)
            workers.put("子节点"+Integer.toString(i), new Thread(worker));
        }
    }
    
    //5、提交方法(往任务队列里面装任务)
    public void submit(Task task){
        this.workQueue.add(task);
    }
    
    //6、执行方法(启动应用程序,让所有的worker工作)
    public void execute(){
        //循环Map的方式(启动workers容器 的 所有线程),让worker开始工作
        for(Map.Entry<String, Thread> me:workers.entrySet()){
            me.getValue().start();
        }
    }
    
    //7、判断任务是否执行完毕(线程是否都停止)
    public boolean isComplete() {
        for(Map.Entry<String, Thread> me:workers.entrySet()){
            //判断线程的状态,是否是停止(Thread.State.TERMINATED)
            if(me.getValue().getState()!=Thread.State.TERMINATED){
                return false;   //只要有一个线程的状态不是停止,就返回false
            }
        }
        return true;
    }

    //8、返回结果集
    public int getResult() {
        int ret=0;
        for (Map.Entry<String, Object> result : resultMap.entrySet()) {
            //汇总结果集的逻辑(不一定是++)
            ret+=(Integer)result.getValue();
        }
        return ret;
    }
}

【Worker.class】

package com.jxb.thread11;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 *Master-Worker模式——并行计算模式
 *  Worker 负责处理子任务
 */
public class Worker implements Runnable {

    //接收引用
    private ConcurrentLinkedQueue<Task> workQueue;
    private ConcurrentHashMap<String, Object> resultMap;
    
    /**
     * 引用 Master里面的workerQueue,方便领取任务(领取一个少一个)
     * @param workQueue
     */
    public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
        this.workQueue=workQueue;   
    }

    /**
     * 引用 Master里面的resultMap,方便将每个worker里卖弄处理的结果,返回给Master的结果集容器里面
     * @param resultMap
     */
    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap=resultMap;
    }
    
    /**
     * 处理一个一个的任务
     */
    @Override
    public void run() {
        while(true){
            Task input=this.workQueue.poll(); //从队列中取出任务并移除
            if(input==null){    //当没有任务时,退出循环
                break;
            }
            /**
             * 真正的去做业务处理
             */
            //Object output=handler(input);
            Object output=MyWorker.handler(input); //优化后的代码01
            //将任务的返回结果 放入结果集容器里面(任务编号/任务名称,任务处理的结果)
            this.resultMap.put(Integer.toString(input.getId()), output);
        }
    }

//------------------------------------代码优化01(将具体的任务实现封装出去)MyWorker,去重写-------------------------------------
    /**
     * 具体的任务
     * @param input 任务
     * @return  返回处理的结果
     */
    /*private Object handler(Task input) {
        Object output=null;
        try {
            //表示处理task任务的耗时,可能是数据的加工也可能是操作数据库
            Thread.sleep(500);
            output=input.getPrivce();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return output;
    }*/

    public static Object handler(Task input){
        return null;
    }

}

【Main.class】

package com.jxb.thread11;

import java.util.Random;

/**
 * Master-Worker 模式的测试类
 */
public class Main {
    public static void main(String[] args) {
        /**
         * (实例化一个worker,需要几个子任务(进程))
         */
        //Master master=new Master(new Worker(), 10);
        //Master master=new Master(new MyWorker(), 10); //代码优化01
        System.out.println("当前机器可用的线程数:"+Runtime.getRuntime().availableProcessors());
        //线程数不能随便加,应该根据自己的电脑性能来
        Master master=new Master(new MyWorker(), Runtime.getRuntime().availableProcessors());   //代码优化02
        
        // 100个任务
        Random random=new Random();
        for (int i = 1; i <= 100; i++) {
            Task task=new Task();
            task.setId(i);
            task.setName("任务"+i);
            task.setPrivce(random.nextInt(1000));
            master.submit(task); //提交任务
        }   
        
        master.execute();   //启动任务
        
        long start=System.currentTimeMillis();
        
        //判断所有任务是否执行完成(所有的线程都执行完成了)
        while(true){
            if(master.isComplete()){
                long end=System.currentTimeMillis()-start;
                int ret= master.getResult();
                System.out.println("汇总结果集最后的值:"+ret+" ,执行耗时"+end);
                break;
            }
        }
    }
}

【Task.class】

package com.jxb.thread11;
/**
 * 任务
 */
public class Task {
    private int id;         //编号
    private String name;    //任务名称
    private int privce;     //价格
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getPrivce() {
        return privce;
    }
    public void setPrivce(int privce) {
        this.privce = privce;
    }
}

【MyWorker.class】

package com.jxb.thread11;
/**
 * 具体需要完成的任务类,可以多个
 */
public class MyWorker extends Worker{
    /**
     * 重写handler()——具体的任务
     * @param input 任务
     * @return  返回处理的结果
     */
    public static Object handler(Task input) {
        Object output=null;
        try {
            //表示处理task任务的耗时,可能是数据的加工也可能是操作数据库
            Thread.sleep(500);
            output=input.getPrivce();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return output;
    }
}

【效果截图】


运行效果.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,323评论 19 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 175,079评论 25 709
  • 找到自己想要的资源 在互联网快速发展的今天,网络上的资源无所不包,可是你能快速就能找到自己需要的资源吗?估计能做到...
    多做减法阅读 897评论 0 0
  • 我们总是忙忙碌碌,总是被各种琐事缠绕着,总以为生活只有眼前的苟且,岂不知,诗和远方竟近在咫尺 盛夏的早晨因为前几天...
    tuzhu002阅读 3,785评论 1 5
  • 今天跟大家分享我们的工具,我就是微商的八大兵器! 现在回想一开始老师要求我们看电影《走出亚马逊》实...
    盈盈1230阅读 1,501评论 0 0