你好,游客 登录 注册 搜索
背景:
阅读新闻

MetaQ用户指南

[日期:2013-11-15] 来源:taobao.github.io  作者:taobao [字体: ]

版本

  1. 0.1版本,针对Metaq 1.0版本
  2. 0.2版本,针对Metaq 1.2版本

简介

Memorphosis是一个消息中间件,它是linkedin开源MQ——kafka的Java版本,针对淘宝内部应用做了定制和优化。Metaq的设计原则

  • 消息都是持久的,保存在磁盘
  • 吞吐量第一
  • 消费状态保存在客户端
  • 分布式,生产者、服务器和消费者都可分布

Metaq的部署结构

Image:Meta部署结构.png

 

Metaq的特点

除了完整实现kafka的功能之外,我们还为meta加入了额外的功能,使得meta成为一个更为强大的通用消息中间件,包括

  • 彻底用java重写的实现,高效的协议和通讯框架
  • 发送端的负载均衡
  • Master/Slave异步复制的高可用方案
  • 专门用于广播消息的客户端实现
  • 与diamond结合使用的顺序发送消息功能
  • 支持事务,包括本地事务和分布式事务,实现JTA规范。

Getting started

我们在日常已经部署了metamorhposis环境,因此你可以直接在本地测试,如果你想部署一个自己的服务器,可以参照#.E6.9C.8D.E5.8A.A1.E5.99.A8.E9.83.A8.E7.BD.B2节。

前面提到,meta是一个消息中间件。消息中间件中有两个角色:消息生产者和消息消费者。Meta里同样有这两个概念,消息生产者负责创建消息并发送到meta服务器,meta服务器会将消息持久化到磁盘,消息消费者从meta服务器拉取消息并提交给应用消费。

 

消息会话工厂类

在使用消息生产者和消费者之前,我们需要创建它们,这就需要用到消息会话工厂类——MessageSessionFactory,由这个工厂帮你创建生产者或者消费者。除了这些,MessageSessionFactory还默默无闻地在后面帮你做很多事情,包括

  1. 服务的查找和发现,通过diamond和zookeeper帮你查找日常的meta服务器地址列表
  2. 连接的创建和销毁,自动创建和销毁到meta服务器的连接,并做连接复用,也就是到同一台meta的服务器在一个工厂内只维持一个连接。
  3. 消息消费者的消息存储和恢复,后续我们会谈到这一点。
  4. 协调和管理各种资源,包括创建的生产者和消费者的。

因此,我们首先需要创建一个会话工厂类,MessageSessionFactory仅是一个接口,它的实现类是MetaMessageSessionFactory

MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(new MetaClientConfig());

请注意,MessageSessionFactory应当全局共用一个

消息生产者

翠花,上代码

package com.taobao.Metaq.example;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import com.taobao.Metaq.Message;
import com.taobao.Metaq.client.MessageSessionFactory;
import com.taobao.Metaq.client.MetaClientConfig;
import com.taobao.Metaq.client.MetaMessageSessionFactory;
import com.taobao.Metaq.client.producer.MessageProducer;
import com.taobao.Metaq.client.producer.SendResult;


public class Producer {
    public static void main(String[] args) throws Exception {
        // New session factory
        MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(new MetaClientConfig());
        // create producer
        MessageProducer producer = sessionFactory.createProducer();
        // publish topic
        final String topic = "meta-test";
        producer.publish(topic);

        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        String line = null;
        while ((line = reader.readLine()) != null) {
            // send message
            SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes()));
            // check result
            if (!sendResult.isSuccess()) {
                System.err.println("Send message failed,error message:" + sendResult.getErrorMessage());
            }
            else {
                System.out.println("Send message successfully,sent to " + sendResult.getPartition());
            }
        }
    }

}

消息生产者的接口是MessageProducer,你可以通过它来发送消息。创建生产者很简单,通过MessageSessionFactory的createProducer方法即可以创建一个生产者。在Meta里,每个消息对象都是Message类的实例,Message表示一个消息对象,它包含这么几个属性:

属性
id 消息的唯一id,系统自动产生,用户无法设置,在发送成功后由服务器返回,发送失败则为0。
topic 消息的主题,订阅者订阅该主题即可接收发送到该主题下的消息,必须
data 消息的有效载荷,也就是消息内容,meta永远不会修改消息内容,你发送出去是什么样子,接收到就是什么样子。消息内容限制在1M以内,我的建议是最好不要发送超过上百K的消息,必须
attribute 消息属性,一个字符串,可选。发送者可设置消息属性来让消费者过滤。


细心的朋友可能注意到,我们在sendMessage之前还调用了MessageProducer的publish(topic)方法

producer.publish(topic);

这一步在发送消息前是必须的,你必须发布你将要发送消息的topic,这是为了让会话工厂帮你去查找接收这些topic的meta服务器地址并初始化连接。这个步骤针对每个topic只需要做一次,多次调用无影响。

总结下这个例子,从标准输入读入你输入的数据,并将数据封装成一个Message对象,发送到topic为meta-test下。

请注意,MessageProducer是线程安全的,完全可重复使用,因此最好在应用中作为单例来使用,一次创建,到处使用,配置为spring里的singleton bean。MessageProducer创建的代价昂贵,每次都需要通过zk查找服务器并创建tcp长连接。

更多详情见请继续阅读下一页的精彩内容http://www.linuxidc.com/Linux/2013-11/92750p2.htm

Metamorphosis 的详细介绍请点这里
Metamorphosis 的下载地址请点这里

相关阅读

MetaQ安装部署文档 http://www.linuxidc.com/Linux/2013-11/92748.htm

linux
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数

       

评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款