你好,游客 登录 注册 搜索
背景:
阅读新闻

Java Master-Worker模式实现

[日期:2017-03-11] 来源:Linux社区  作者:zhangfengzhe [字体: ]

Master-Worker模式简介

Master-Worker模式是非常经典的常用的一个并行计算模式,它的核心思想是2类进程协作工作:Master进程和Worker进程。Master负责接收客户端请求,分配任务;Worker负责具体处理任务。当各个Worker处理完任务后,统一将结果返回给Master,由Master进行整理和总结。其好处是能够将一个大JOB分解成若干小JOB,并行执行,从而提高系统的吞吐量。比如流行的Web Server,如Nginx,Apache HTTP都存在这种Master-Worker工作模式;离线分布式计算框架Hadoop的JobTracker和TaskTracker,实时流计算框架Strom的Nimbus和Supervisor都涉及到这种思想。那么下面我们来具体分析下Java Master-Worker模式的实现。

Master-Worker模式分析

我们重点分析下Master,Worker这2个角色。

Master

Master需要接受Client端提交过来的任务Task,而且还得将Task分配给Worker进行处理,因此Master需要一个存储来存放Task。那么采用哪种存储集合呢?首先来说,需要支持并发的集合类,因为多个Worker间可能存在任务竞争,因此我们需要考虑java.util.concurrent包下的集合。这里可以考虑采用非阻塞的ConcurrentLinkedQueue。

Master需要清楚的知道各个Woker的基本信息,如是否各个Worker都运行完毕,因此Master端需要保存Worker的信息,可以采用Map存储。

由于最后各个Worker都会上报运行结果,Master端需要有一个存储结果的Map,可以采用支持并发的ConcurrentHashMap。

Worker

Worker需要持有Master端的任务Task集合的引用,因为Worker需要从里面拿取Task。

同上,Worker需要持有Master端的存储结果的引用。

综上,我们可以得到如下:

我们可以进一步细化,Master/Worker应该提供什么操作?

Master:

通过构造方法以初始化workers

应该提供submit(Task)方法接受Client端提交过来的任务

start()让workers开始处理任务

提供isComplete()判断各个worker的状态,是否都处理完毕

提供getResult()给客户端返回结果

Worker:

Worker本质上就是Runnable,提供run()

负责处理业务逻辑的handle()

Java Master-Worker代码实现

Task

 public class Task {
 
    private long id;
    private String name;
 
    public Task(long id, String name) {
        this.id = id;
        this.name = name;
    }
 
    public long getId() {
        return id;
    }
 
    public void setId(long id) {
        this.id = id;
    }
 
    public String getName() {
        return name;
    }
 
    public void setName(String name) {
        this.name = name;
    }
 
 
}

Worker

 public class Worker implements Runnable {
 
    private long id;
    private String name;
 
    private ConcurrentLinkedQueue<Task> workQueue;
 
    private ConcurrentHashMap<Long,Object> results;
 
    public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
        this.workQueue = workQueue;
    }
 
    public void setResults(ConcurrentHashMap<Long, Object> results) {
        this.results = results;
    }
 
    public Worker(long id, String name) {
        this.id = id;
        this.name = name;
    }
 
    @Override
    public void run() {
 
        while(true){
 
            Task task = workQueue.poll();
 
            if(task == null){
                break;
            }
 
            long start = System.currentTimeMillis();
            long result = handle(task);
 
            this.results.put(task.getId(),result);
 
            System.out.println(this.name + " handle " + task.getName() + " success . result is " + result + " cost time : " + (System.currentTimeMillis() - start));
        }
 
 
 
    }
 
    /**
    * 负责处理具体业务逻辑
    * @param task
    * @return
    */
    private long handle(Task task) {
 
        //这里只是模拟下,在真实环境也许是查询数据库,也许是查缓存等
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        return new Random().nextLong();
    }
}

Master

public class Master {
 
    private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
 
    private Map<Long,Thread> workers = new HashMap<Long, Thread>();
 
    private ConcurrentHashMap<Long,Object> results = new ConcurrentHashMap<Long, Object>();
 
    public Master(int num){
 
        for(int i = 0 ; i < num ; i++){
 
            Worker worker = new Worker(i,"worker-" + i);
            worker.setResults(results);
            worker.setWorkQueue(workQueue);
 
            workers.put(Long.valueOf(i),new Thread(worker));
        }
 
    }
 
    public void submit(Task task){
        workQueue.add(task);
    }
 
    public void start(){
 
        for (Map.Entry<Long,Thread> entry : workers.entrySet()){
 
            entry.getValue().start();
        }
 
    }
 
    public boolean isComlepte(){
 
        for(Map.Entry<Long,Thread> entry : workers.entrySet()){
 
            if(entry.getValue().getState() != Thread.State.TERMINATED){
                return false;
            }
 
        }
 
        return true;
    }
 
    public long getSumResult(){
 
        long value = 0;
        for(Map.Entry<Long,Object> entry : results.entrySet()){
 
            value = value + (Long)entry.getValue();
 
        }
        return value;
    }
}

Main

public class Main {
 
    public static void main(String[] args) {
 
        Master master = new Master(10);
 
        for(int i = 0 ; i < 10 ; i++){
 
            Task task = new Task(i,"task-" + i);
 
            master.submit(task);
        }
 
        long start = System.currentTimeMillis();
        master.start();
 
        while(true){
 
            if(master.isComlepte()){
 
                System.out.println("sum result is " + master.getSumResult() + " . cost time : " + (System.currentTimeMillis() - start));
                break;
            }
        }
 
 
    }
 
}

 

运行结果

总结

在单线程的时候,处理一个Task需要500ms,那么处理10个Task需要5S,如果采用Master-Worker这种并行模型,可以大大缩短计算处理时间。

本文永久更新链接地址http://www.linuxidc.com/Linux/2017-03/141680.htm

linux
相关资讯       Worker  Master  Java Master 
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数

       

评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款