手机版
你好,游客 登录 注册 搜索
背景:
阅读新闻

Kafka代码API

[日期:2014-09-29] 来源:Linux社区  作者:xiaobianjava [字体: ]

1.建立工程,导入相应的jar包

Procuder类

package cn.itcast.kafka;

import Java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerDemo {
 
 //要读取的数据主题
 private static final String topic = "kfc";
 //消费者的数量
 private static final Integer threads = 2;
 
 public static void main(String[] args) {
 
  Properties props = new Properties();
  //指定zookeeper的地址
  props.put("zookeeper.connect", "storm01:2181,storm02:2181,storm03:2181");
  //消费组的编号
  props.put("group.id", "1111");
  //偏移量,从哪个位置读
  props.put("auto.offset.reset", "smallest");
 
  ConsumerConfig config = new ConsumerConfig(props);
  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
  HashMap<String, Integer> topicCountmap = new HashMap<String,Integer>();
  topicCountmap.put(topic, threads);
 
  //根据map获取所有的主题对应的消息流
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountmap);
  //获取某个主题的消息流
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
  //开启两个消费者进程,读取主题下的流
  for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {
   new Thread(new Runnable() {
   
    @Override
    public void run() {
     for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {
      System.err.println(new String(messageAndMetadata.message()));
     }
     
    }
   }).start();
  }
 
 }
}

consumer--消费者类

package cn.itcast.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class ConsumerDemo {
 
 //要读取的数据主题
 private static final String topic = "kfc";
 //消费者的数量
 private static final Integer threads = 2;
 
 public static void main(String[] args) {
 
  Properties props = new Properties();
  //指定zookeeper的地址
  props.put("zookeeper.connect", "storm01:2181,storm02:2181,storm03:2181");
  //消费组的编号
  props.put("group.id", "1111");
  //偏移量,从哪个位置读
  props.put("auto.offset.reset", "smallest");
 
  ConsumerConfig config = new ConsumerConfig(props);
  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
  HashMap<String, Integer> topicCountmap = new HashMap<String,Integer>();
  topicCountmap.put(topic, threads);
 
  //根据map获取所有的主题对应的消息流
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountmap);
  //获取某个主题的消息流
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
  //开启两个消费者进程,读取主题下的流
  for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {
   new Thread(new Runnable() {
   
    @Override
    public void run() {
     for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {
      System.err.println(new String(messageAndMetadata.message()));
     }
     
    }
   }).start();
  }
 
 }
}

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

Kafka使用入门教程 http://www.linuxidc.com/Linux/2014-07/104470.htm

Kafka 的详细介绍请点这里
Kafka 的下载地址请点这里

本文永久更新链接地址http://www.linuxidc.com/Linux/2014-09/107383.htm

linux
相关资讯       kafka 
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数

       

评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款