你好,游客 登录 注册 搜索
背景:
阅读新闻

MetaQ用户指南

[日期:2013-11-15] 来源:taobao.github.io  作者:taobao [字体: ]

消息消费者

发送消息后,消费者可以接收消息了,下面的代码创建消费者并订阅meta-test这个主题,等待消息送达并打印消息内容

package com.taobao.Metaq.example;

import java.util.concurrent.Executor;

import com.taobao.Metaq.Message;
import com.taobao.Metaq.client.MessageSessionFactory;
import com.taobao.Metaq.client.MetaClientConfig;
import com.taobao.Metaq.client.MetaMessageSessionFactory;
import com.taobao.Metaq.client.consumer.ConsumerConfig;
import com.taobao.Metaq.client.consumer.MessageConsumer;
import com.taobao.Metaq.client.consumer.MessageListener;

public class AsyncConsumer {
    public static void main(String[] args) throws Exception {
        // New session factory
        MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(new MetaClientConfig());
        // subscribed topic
        final String topic = "meta-test";
        // consumer group
        final String group = "meta-example";
        // create consumer
        MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
        // subscribe topic
        consumer.subscribe(topic, 1024 * 1024, new MessageListener() {

            public void recieveMessages(Message message) {
                System.out.println("Receive message " + new String(message.getData()));
            }


            public Executor getExecutor() {
                // Thread pool to process messages,maybe null.
                return null;
            }
        });
        // complete subscribe
        consumer.completeSubscribe();

    }

}

通过createConsumer方法来创建MessageConsumer,注意到我们传入一个ConsumerConfig参数,这是消费者的配置对象。每个消息者都必须有一个ConsumerConfig配置对象,我们这里只设置了group属性,这是消费者的分组名称。Meta的Producer、Consumer和Broker都可以为集群。消费者可以组成一个集群共同消费同一个topic,发往这个topic的消息将按照一定的负载均衡规则发送给集群里的一台机器。同一个消费者集群必须拥有同一个分组名称,也就是同一个group,这个概念跟notify里的订阅者组名是一样的。我们这里将分组名称设置为meta-example。

订阅消息通过subscribe方法,这个方法接受三个参数

  • topic,订阅的主题
  • maxSize,因为meta是一个消费者主动拉取的模型,这个参数规定每次拉取的最大数据量,单位为字节,这里设置为1M,最大为1M。
  • MessageListener,消息监听器,负责消费消息。

MessageListener的接口方法如下

public interface MessageListener {
    /**
     * 接收到消息列表,只有messages不为空并且不为null的情况下会触发此方法
     *
     * @param messages
     */
    public void recieveMessages(Message message);


    /**
     * 处理消息的线程池
     *
     * @return
     */
    public Executor getExecutor();
}

消息的消费过程可以是一个并发处理的过程,getExecutor返回你想设置的线程池,每次消费都会在这个线程池里进行。recieveMessage方法用于实际的消息消费处理,message参数即为消费者收到的消息,它必不为null。

我们这里简单地打印收到的消息内容就完成消费。如果在消费过程中抛出任何异常,该条消息将会在一定间隔后重新尝试提交给MessageListener消费。在多次消费失败的情况下,该消息将会存储到消费者应用的本次磁盘,并在后台自动恢复重试消费。

细心的你一定还注意到,在调用subscribe之后,我们还调用了completeSubscribe方法来完成订阅过程。请注意,subscribe仅是将订阅信息保存在本地,并没有实际跟meta服务器交互,要使得订阅关系生效必须调用一次completeSubscribe,completeSubscribe仅能被调用一次,多次调用将抛出异常。 为什么需要completeSubscribe方法呢,原因有二:

  • 首先,subscribe方法可以被调用多次,也就是一个消费者可以消费多种topic
  • 其次,如果每次调用subscribe都跟zk和meta服务器交互一次,代价太高

因此completeSubscribe一次性将所有订阅的topic生效,并处理跟zk和meta服务器交互的所有过程。

同样,MessageConsumer也是线程安全的,创建的代价不低,因此也应该尽量复用。

例子小结

上面的例子可以直接在您的机器上跑起来,因为我们在日常已经部署了几台meta机器。不过我们建议您测试的时候使用自己的topic和消费者组名group,防止跟其他测试的开发者产生冲突,如有疑问,可以联系伯岩(boyan@taobao.com),无花(wuhua@taobao.com),

此例子的代码可以在Metaq-example工程下找到,Metaq-example源码的svn地址

http://svn.app.taobao.net/repos/metaq/trunk/metaq/metaq-example

你可以在这里找到所有meta的例子源码。

事务

Metaq 1.2开始支持事务,包括发送端和消费端事务。发送端同时支持本地事务和分布式事务,可以在一个事务内发送多条消息,要么同时成功,要么同时失败;可以使用XA事务,在事务内操作其他XA资源,例如操作数据库,与此同时发送meta消息,可以保证这些操作和发送消息要么一起成功,要么一起失败。

在消费消息的时候,可以批量消费一批消息,要么一起消费成功,要么失败重试。

发送消息的本地事务

事务跟线程关联,启动一个事务将会关联该事务到当前线程,在此线程和此事务内发送的消息,将作为一个整体发送,同时成功,或者同时失败,对外界看来是一个原子操作。发起一个本地事务很简单,参见代码:

       try {
                // 开始事务
                producer.beginTransaction();
                // 在事务内发送两条消息
                if (!producer.sendMessage(new Message(topic, line.getBytes())).isSuccess()) {
                    // 发送失败,立即回滚
                    producer.rollback();
                    continue;
                }
                if (!producer.sendMessage(new Message(topic, line.getBytes())).isSuccess()) {
                    producer.rollback();
                    continue;
                }
                // 提交
                producer.commit();

            }
            catch (final Exception e) {
                producer.rollback();
            }

beginTransaction方法启动一个事务并关联到当前线程,commit方法提交事务,而rollback则回滚当前事务。

发送消息的分布式事务

如果你要在发送消息的同时操作数据库,比如同时将消息插入某张表,例如下订单的时候同时发送消息通知卖家并将订单插入数据库,这时候因为涉及到两个Resource(数据库和meta),就需要使用分布式事务来保证ACID。分布式事务一般采用两阶段提交协议,在java里就是使用JTA规范API的XA部分。

在这种情形下,你需要使用数据库的XADatasource和meta的XAMessageProducer类,并使用一个开源JTA实现来支持事务管理器做协调者。例如我们在Metaq-example里的XATransactionProducer例子使用了atomikos这个开源JTA实现,具体不在这里讲解,请直接参考源码并尝试运行。

linux
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数

       

评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款