手机版
你好,游客 登录 注册 搜索
背景:
阅读新闻

JMS之ActiveMQ Linux下安装与应用实例

[日期:2015-08-20] 来源:Linux社区  作者:wuzhaohuixy [字体: ]

JMS之ActiveMQ Linux下安装与应用实例

1.下载activeMQ安装包,拷贝到/activeMQ目录下

apache-activemq-5.10.0-bin.tar.gz,下载地址http://activemq.apache.org/download.html

2.解压文件到运行目录

[root@linuxidc softs]# tar -xzvf /server/apache-activemq-5.10.0-bin.tar.gz

3.为了方便管理,重命名

[root@linuxidc softs]# mv apache-activemq-5.10.0 activemq-5.10.0

[root@linuxidc softs]# cd activemq-5.10.0/
[root@linuxidc activemq-5.10.0]# ll
total 6304
-rwxr-xr-x 1 root root 6371237 Jun  5  2014 activemq-all-5.10.0.jar
drwxr-xr-x 5 root root    4096 Jan 11 23:31 bin
drwxr-xr-x 2 root root    4096 Jan 11 23:31 conf
drwxr-xr-x 2 root root    4096 Jan 11 23:31 data
drwxr-xr-x 2 root root    4096 Jan 11 23:31 docs
drwxr-xr-x 8 root root    4096 Jan 11 23:31 examples
drwxr-xr-x 6 root root    4096 Jan 11 23:31 lib
-rw-r--r-- 1 root root  40580 Jun  5  2014 LICENSE
-rw-r--r-- 1 root root    3334 Jun  5  2014 NOTICE
-rw-r--r-- 1 root root    2610 Jun  5  2014 README.txt
drwxr-xr-x 7 root root    4096 Jan 11 23:31 webapps
drwxr-xr-x 3 root root    4096 Jan 11 23:31 webapps-demo
[root@linuxidc activemq-5.10.0]# cd bin/
[root@linuxidc bin]# ll
total 152
-rwxr-xr-x 1 root root 22126 Jun  5  2014 activemq
-rwxr-xr-x 1 root root  5665 Jun  5  2014 activemq-admin
-rw-r--r-- 1 root root 15954 Jun  5  2014 activemq.jar
-rwxr-xr-x 1 root root  6189 Jun  5  2014 diag
drwxr-xr-x 2 root root  4096 Jan 11 23:31 linux-x86-32
drwxr-xr-x 2 root root  4096 Jan 11 23:31 linux-x86-64
drwxr-xr-x 2 root root  4096 Jan 11 23:31 macosx
-rwxr-xr-x 1 root root 83820 Jun  5  2014 wrapper.jar

4.启动服务

[root@linuxidc bin]# ./activemq start
INFO: Using default configuration
(you can configure options in one of these file: /etc/default/activemq /root/.activemqrc)

INFO: Invoke the following command to create a configuration file
./activemq setup [ /etc/default/activemq | /root/.activemqrc ]

INFO: Using Java '/softs/jdk1.6.0_30/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/softs/activemq-5.10.0/data/activemq-linuxidc.pid' (pid '28962')

5.查看是否启动成功

[root@linuxidc bin]#
[root@linuxidc bin]# ps -ef | grep activemq
root    28962    1 32 23:32 pts/0    00:00:04 /softs/jdk1.6.0_30/bin/java -Xms1G -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/softs/activemq-5.10.0/conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/softs/activemq-5.10.0/tmp -Dactivemq.classpath=/softs/activemq-5.10.0/conf; -Dactivemq.home=/softs/activemq-5.10.0 -Dactivemq.base=/softs/activemq-5.10.0 -Dactivemq.conf=/softs/activemq-5.10.0/conf -Dactivemq.data=/softs/activemq-5.10.0/data -jar /softs/activemq-5.10.0/bin/activemq.jar start
root    29011 28898  0 23:32 pts/0    00:00:00 grep activemq
[root@linuxidc bin]#
[root@linuxidc bin]#

6.停止服务

[root@linuxidc data]#
[root@linuxidc data]# kill 28962
[root@linuxidc data]#
[root@linuxidc data]# ps -ef | grep activemq
root    29078 28898  0 23:42 pts/0    00:00:00 grep activemq
[root@linuxidc data]#

到此环境准备成功

demo应用

package com.wzh.activemq;

import java.io.Serializable;

public class User implements Serializable{

 private static final long serialVersionUID = 1L;

 private String username ;
 
 private String password ;
 
 public User(String username,String password){
  this.username = username ;
  this.password = password ;
 }

 public String getUsername() {
  return username;
 }

 public void setUsername(String username) {
  this.username = username;
 }

 public String getPassword() {
  return password;
 }

 public void setPassword(String password) {
  this.password = password;
 }

 @Override
 public String toString() {
  // TODO Auto-generated method stub
  return "[username="+username+",password="+password+"]" ;
 }

}

点对点:
生产者:

package com.wzh.activemq;

import java.io.Serializable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class P2PMessageProducer {

 protected String username = ActiveMQConnection.DEFAULT_USER;
 protected String password = ActiveMQConnection.DEFAULT_PASSWORD;
 //protected String brokerURL = "tcp://127.0.0.1:61616";
 protected String brokerURL = "tcp://120.24.85.167:61616";

 protected static transient ConnectionFactory factory;
 protected transient Connection connection;

 public static void main(String[] args) {
 
  try {
   new P2PMessageProducer().sendObjectMessage(new User("wzh","q123456"));
   new P2PMessageProducer().sendMapMessage();
   new P2PMessageProducer().sendTextMessage("海,你好");
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 public P2PMessageProducer() {

  try {
   factory = new ActiveMQConnectionFactory(username, password,
     brokerURL);
   connection = factory.createConnection();
   connection.start();
  } catch (JMSException jmse) {
   close();
  }
 }

 /**
  * 初始化连接信息
  */
 public P2PMessageProducer(String username, String password, String brokerURL)
   throws JMSException {
  this.username = username;
  this.password = password;
  this.brokerURL = brokerURL;

  factory = new ActiveMQConnectionFactory(username, password, brokerURL);
  connection = factory.createConnection();
  try {
   connection.start();
  } catch (JMSException jmse) {
   connection.close();
   throw jmse;
  }
 }

 /**
  * 关闭连接
  */
 public void close() {
  try {
   if (connection != null) {
    connection.close();
   }
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }

 
 protected void sendObjectMessage(Serializable serializable) throws JMSException {
  Session session = null;
  try {

   session = connection.createSession(Boolean.TRUE,
     Session.AUTO_ACKNOWLEDGE);
   Destination destination = session.createQueue("MessageQueue");
   MessageProducer producer = session.createProducer(destination);
   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

   Message message = session.createObjectMessage(serializable);

   producer.send(message);

   session.commit();

  } catch (JMSException e) {
   try {
    session.rollback() ;
   } catch (JMSException e1) {
    e1.printStackTrace();
   }
   throw e ;
  } finally {
   close();
  }

 }


 protected void sendTextMessage(String text) throws JMSException {
  Session session = null;
  try {

   session = connection.createSession(Boolean.TRUE,
     Session.AUTO_ACKNOWLEDGE);
   Destination destination = session.createQueue("MessageQueue");
   MessageProducer producer = session.createProducer(destination);
   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

   Message message = session.createTextMessage(text);

   producer.send(message);
   session.commit();

  } catch (JMSException e) {
   try {
    session.rollback() ;
   } catch (JMSException e1) {
    e1.printStackTrace();
   }
   throw e ;
  } finally {
   close();
  }

 }
 
 protected void sendMapMessage() throws JMSException {
  Session session = null;
  try {

   session = connection.createSession(Boolean.TRUE,
     Session.AUTO_ACKNOWLEDGE);
   Destination destination = session.createQueue("MessageQueue");
   MessageProducer producer = session.createProducer(destination);
   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

   MapMessage message = session.createMapMessage();
   message.setString("stock", "string");
   message.setDouble("price", 11.14);
   producer.send(message);

   session.commit();

  } catch (JMSException e) {
   try {
    session.rollback() ;
   } catch (JMSException e1) {
    e1.printStackTrace();
   }
   throw e ;
  } finally {
   close();
  }

 }

}

消费者:

package com.wzh.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class P2PMessageConsumer {

 protected String username = ActiveMQConnection.DEFAULT_USER;
 protected String password = ActiveMQConnection.DEFAULT_PASSWORD;
 //protected String brokerURL = "tcp://127.0.0.1:61616";
 protected String brokerURL = "tcp://120.24.85.167:61616";

 protected static transient ConnectionFactory factory;
 protected transient Connection connection;

 public static void main(String[] args) {
  P2PMessageConsumer consumer = new P2PMessageConsumer();
  consumer.receiveMessage();
 }

 public P2PMessageConsumer() {

  try {
   factory = new ActiveMQConnectionFactory(username, password,
     brokerURL);
   connection = factory.createConnection();
   connection.start();
  } catch (JMSException jmse) {
   close();
  }
 }

 public P2PMessageConsumer(String username, String password, String brokerURL)
   throws JMSException {
  this.username = username;
  this.password = password;
  this.brokerURL = brokerURL;

  factory = new ActiveMQConnectionFactory(username, password, brokerURL);
  connection = factory.createConnection();
  try {
   connection.start();
  } catch (JMSException jmse) {
   connection.close();
   throw jmse;
  }
 }

 public void close() {
  try {
   if (connection != null) {
    connection.close();
   }
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }

 protected void receiveMessage() {
  Session session = null;
  try {

   session = connection.createSession(Boolean.FALSE,
     Session.AUTO_ACKNOWLEDGE);
   Destination destination = session.createQueue("MessageQueue");
   MessageConsumer consumer = session.createConsumer(destination);

   while (true) {
    Message message = consumer.receive();

    if (null != message) {

     if (message instanceof ObjectMessage) {
      System.out.println("deal ObjectMessage....");
      dealObjectMessage((ObjectMessage) message);
     } else if (message instanceof MapMessage) {
      System.out.println("deal MapMessage....");
      dealMapMessage((MapMessage) message);
     } else if (message instanceof TextMessage) {
      System.out.println("deal TextMessage....");
      dealTextMessage((TextMessage) message);
     }

    } else {
     break;
    }

   }

  } catch (Exception e) {
   e.printStackTrace();
  } finally {
   if (session != null) {
    try {
     session.commit();
    } catch (JMSException e) {
     e.printStackTrace();
    }
   }

  }

 }

 /**
  *
  * 处理 TextMessage消息
  *
  * @throws JMSException
  */
 private void dealTextMessage(TextMessage message) throws JMSException {
  String text = message.getText();
  System.out.println("text = " + text);

 }

 /**
  *
  * 处理 MapMessage消息
  *
  * @throws JMSException
  */
 private void dealMapMessage(MapMessage message) throws JMSException {
  String stack = message.getString("stock");
  Double price = message.getDouble("price");
  System.out.println("stock = " + stack + " , price =" + price);
 }

 /**
  * 处理ObjectMessage消息
  */
 private void dealObjectMessage(ObjectMessage message) throws JMSException {

  User user = (User) message.getObject();
  System.out.println(user.toString());

 }

}

发布与订阅:
-----------------------
消息发布者

package com.wzh.activemq;

import java.io.Serializable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Publish {

 protected String username = ActiveMQConnection.DEFAULT_USER;
 protected String password = ActiveMQConnection.DEFAULT_PASSWORD;
 //protected String brokerURL = "tcp://127.0.0.1:61616";
 protected String brokerURL = "tcp://120.24.85.167:61616";

 protected static transient ConnectionFactory factory;
 protected transient Connection connection;

 public static void main(String[] args) {
  try {
   new Publish().sendObjectMessage(new User("wzh","q123456"));
   new Publish().sendMapMessage();
   new Publish().sendTextMessage("海,你好");
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 public Publish() {

  try {
   factory = new ActiveMQConnectionFactory(username, password,
     brokerURL);
   connection = factory.createConnection();
   connection.start();
  } catch (JMSException jmse) {
   close();
  }
 }

 public Publish(String username, String password, String brokerURL)
   throws JMSException {
  this.username = username;
  this.password = password;
  this.brokerURL = brokerURL;

  factory = new ActiveMQConnectionFactory(username, password, brokerURL);
  connection = factory.createConnection();
  try {
   connection.start();
  } catch (JMSException jmse) {
   connection.close();
   throw jmse;
  }
 }

 public void close() {
  try {
   if (connection != null) {
    connection.close();
   }
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }

 protected void sendObjectMessage(Serializable serializable) throws JMSException {
  Session session = null;
  try {

   session = connection.createSession(Boolean.TRUE,
     Session.AUTO_ACKNOWLEDGE);
   Topic topic = session.createTopic("MessageTopic");
   MessageProducer producer = session.createProducer(topic);
   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

   Message message = session.createObjectMessage(serializable);

   producer.send(message);

   session.commit();

  } catch (JMSException e) {
   try {
    session.rollback() ;
   } catch (JMSException e1) {
    e1.printStackTrace();
   }
   throw e ;
  } finally {
   close();
  }

 }


 protected void sendTextMessage(String text) throws JMSException {
  Session session = null;
  try {

   session = connection.createSession(Boolean.TRUE,
     Session.AUTO_ACKNOWLEDGE);
   Topic topic = session.createTopic("MessageTopic");
   MessageProducer producer = session.createProducer(topic);
   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

   Message message = session.createTextMessage(text);

   producer.send(message);
   session.commit();

  } catch (JMSException e) {
   try {
    session.rollback() ;
   } catch (JMSException e1) {
    e1.printStackTrace();
   }
   throw e ;
  } finally {
   close();
  }

 }
 
 protected void sendMapMessage() throws JMSException {
  Session session = null;
  try {

   session = connection.createSession(Boolean.TRUE,
     Session.AUTO_ACKNOWLEDGE);
   Topic topic = session.createTopic("MessageTopic");
   MessageProducer producer = session.createProducer(topic);
   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

   MapMessage message = session.createMapMessage();
   message.setString("stock", "string");
   message.setDouble("price", 11.14);
   producer.send(message);

   session.commit();

  } catch (JMSException e) {
   try {
    session.rollback() ;
   } catch (JMSException e1) {
    e1.printStackTrace();
   }
   throw e ;
  } finally {
   close();
  }

 }
}

消息订阅者:

package com.wzh.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Subscriber {

 protected String username = ActiveMQConnection.DEFAULT_USER;
 protected String password = ActiveMQConnection.DEFAULT_PASSWORD;
 //protected String brokerURL = "tcp://127.0.0.1:61616";
 protected String brokerURL = "tcp://120.24.85.167:61616";

 protected static transient ConnectionFactory factory;
 protected transient Connection connection;

 public static void main(String[] args) {
  Subscriber consumer = new Subscriber();
  consumer.receiveMessage();
 }

 public Subscriber() {

  try {
   factory = new ActiveMQConnectionFactory(username, password,
     brokerURL);
   connection = factory.createConnection();
   connection.start();
  } catch (JMSException jmse) {
   close();
  }
 }

 public Subscriber(String username, String password, String brokerURL)
   throws JMSException {
  this.username = username;
  this.password = password;
  this.brokerURL = brokerURL;

  factory = new ActiveMQConnectionFactory(username, password, brokerURL);
  connection = factory.createConnection();
  try {
   connection.start();
  } catch (JMSException jmse) {
   connection.close();
   throw jmse;
  }
 }

 public void close() {
  try {
   if (connection != null) {
    connection.close();
   }
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }

 protected void receiveMessage() {
  Session session = null;
  try {

   session = connection.createSession(Boolean.FALSE,
     Session.AUTO_ACKNOWLEDGE);
   Topic topic = session.createTopic("MessageTopic");
   MessageConsumer consumer = session.createConsumer(topic);
   
   consumer.setMessageListener(new MessageListener() {
   
    @Override
    public void onMessage(Message message) {

     if (message instanceof ObjectMessage) {
      System.out.println("deal ObjectMessage....");
      dealObjectMessage((ObjectMessage) message);
     } else if (message instanceof MapMessage) {
      System.out.println("deal MapMessage....");
      dealMapMessage((MapMessage) message);
     } else if (message instanceof TextMessage) {
      System.out.println("deal TextMessage....");
      dealTextMessage((TextMessage) message);
     }
     
    }
   }) ;

  } catch (Exception e) {
   e.printStackTrace();
  } finally {
   /*if (session != null) {
    try {
     session.commit();
    } catch (JMSException e) {
     e.printStackTrace();
    }
   }*/

  }

 }

 /**
  *
  * 处理 TextMessage消息
  *
  * @throws JMSException
  */
 private void dealTextMessage(TextMessage message) {
  try {
   String text = message.getText();
   System.out.println("text = " + text);
  } catch (JMSException e) {
   e.printStackTrace();
  }

 }

 /**
  *
  * 处理 MapMessage消息
  *
  * @throws JMSException
  */
 private void dealMapMessage(MapMessage message){
  try {
   String stack = message.getString("stock");
   Double price = message.getDouble("price");
   System.out.println("stock = " + stack + " , price =" + price);
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }

 /**
  * 处理ObjectMessage消息
  */
 private void dealObjectMessage(ObjectMessage message){

  try {
   User user = (User) message.getObject();
   System.out.println(user.toString());
  } catch (JMSException e) {
   e.printStackTrace();
  }

 }

}

推荐阅读:

Linux系统下ActiveMQ 安装 http://www.linuxidc.com/Linux/2012-03/55623.htm

Ubuntu下的ACTIVEMQ服务器 http://www.linuxidc.com/Linux/2008-07/14587.htm

CentOS 6.5启动ActiveMQ报错解决 http://www.linuxidc.com/Linux/2015-08/120898.htm

Spring+JMS+ActiveMQ+Tomcat实现消息服务 http://www.linuxidc.com/Linux/2011-10/44632.htm

Linux环境下面ActiveMQ端口号设置和WEB端口号设置 http://www.linuxidc.com/Linux/2012-01/51100.htm

本文永久更新链接地址http://www.linuxidc.com/Linux/2015-08/122002.htm

linux
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数

       

评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款