手机版
你好,游客 登录 注册 搜索
背景:
阅读新闻

使用JMX监控Kafka

[日期:2015-04-14] 来源:Linux社区  作者:eric_sunah [字体: ]

Kafka可以配置使用JMX进行运行状态的监控,既可以通过JDK自带Jconsole来观察结果,也可以通过Java API的方式来。

关于监控指标的描述,可以参考:http://kafka.apache.org/documentation.html#monitoring

开启JMX端口

修改bin/kafka-server-start.sh,添加JMX_PORT参数,添加后样子如下

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
fi

通过Jconsole测试时候可以连接

使用JMX监控Kafka使用JMX监控Kafka

通过JavaAPI来访问

通过以下方法获取目标值

public class KafkaDataProvider{
    protected final Logger LOGGER = LoggerFactory.getLogger(getClass());
    private static final String MESSAGE_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec";
    private static final String BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";
    private static final String BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec";
    private static final String PRODUCE_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce";
    private static final String CONSUMER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer";
    private static final String FLOWER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower";
    private static final String ACTIVE_CONTROLLER_COUNT = "kafka.controller:type=KafkaController,name=ActiveControllerCount";
    private static final String PART_COUNT = "kafka.server:type=ReplicaManager,name=PartitionCount";
    public String extractMonitorData() {
        //TODO 通过调用API获得IP以及参数
        KafkaRoleInfo monitorDataPoint = new KafkaRoleInfo();
        String jmxURL = "service:jmx:rmi:///jndi/rmi://192.168.40.242:9999/jmxrmi";
        try {
            MBeanServerConnection jmxConnection = MetricDataUtils.getMBeanServerConnection(jmxURL);
            ObjectName messageCountObj = new ObjectName(MESSAGE_IN_PER_SEC);
            ObjectName bytesInPerSecObj = new ObjectName(BYTES_IN_PER_SEC);
            ObjectName bytesOutPerSecObj = new ObjectName(BYTES_OUT_PER_SEC);
            ObjectName produceRequestsPerSecObj = new ObjectName(PRODUCE_REQUEST_PER_SEC);
            ObjectName consumerRequestsPerSecObj = new ObjectName(CONSUMER_REQUEST_PER_SEC);
            ObjectName flowerRequestsPerSecObj = new ObjectName(FLOWER_REQUEST_PER_SEC);
            ObjectName activeControllerCountObj = new ObjectName(ACTIVE_CONTROLLER_COUNT);
            ObjectName partCountObj = new ObjectName(PART_COUNT);
            Long messagesInPerSec = (Long) jmxConnection.getAttribute(messageCountObj, "Count");
            Long bytesInPerSec = (Long) jmxConnection.getAttribute(bytesInPerSecObj, "Count");
            Long bytesOutPerSec = (Long) jmxConnection.getAttribute(bytesOutPerSecObj, "Count");
            Long produceRequestCountPerSec = (Long) jmxConnection.getAttribute(produceRequestsPerSecObj, "Count");
            Long consumerRequestCountPerSec = (Long) jmxConnection.getAttribute(consumerRequestsPerSecObj, "Count");
            Long flowerRequestCountPerSec = (Long) jmxConnection.getAttribute(flowerRequestsPerSecObj, "Count");
            Integer activeControllerCount = (Integer) jmxConnection.getAttribute(activeControllerCountObj, "Value");
            Integer partCount = (Integer) jmxConnection.getAttribute(partCountObj, "Value");
            monitorDataPoint.setMessagesInPerSec(messagesInPerSec);
            monitorDataPoint.setBytesInPerSec(bytesInPerSec);
            monitorDataPoint.setBytesOutPerSec(bytesOutPerSec);
            monitorDataPoint.setProduceRequestCountPerSec(produceRequestCountPerSec);
            monitorDataPoint.setConsumerRequestCountPerSec(consumerRequestCountPerSec);
            monitorDataPoint.setFlowerRequestCountPerSec(flowerRequestCountPerSec);
            monitorDataPoint.setActiveControllerCount(activeControllerCount);
            monitorDataPoint.setPartCount(partCount);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (MalformedObjectNameException e) {
            e.printStackTrace();
        } catch (AttributeNotFoundException e) {
            e.printStackTrace();
        } catch (MBeanException e) {
            e.printStackTrace();
        } catch (ReflectionException e) {
            e.printStackTrace();
        } catch (InstanceNotFoundException e) {
            e.printStackTrace();
        }
        return monitorDataPoint.toString();
    }
    public static void main(String[] args) {
        System.out.println(new KafkaDataProvider().extractMonitorData());
    }
    /**
    * 获得MBeanServer 的连接
    *
    * @param jmxUrl
    * @return
    * @throws IOException
    */
    public MBeanServerConnection getMBeanServerConnection(String jmxUrl) throws IOException {
        JMXServiceURL url = new JMXServiceURL(jmxUrl);
        JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
        MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
        return mbsc;
    }
}

其他工具
除了自己编写定制化的监控程序外

kafka-web-console
https://github.com/claudemamo/kafka-web-console
部署sbt:
http://www.scala-sbt.org/0.13/tutorial/Manual-Installation.html
http://www.scala-sbt.org/release/tutorial/zh-cn/Installing-sbt-on-Linux.html

KafkaOffsetMonitor
https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.0
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk localhost:12181 --port 8080 --refresh 5.minutes --retain 1.day

Mx4jLoader

相关阅读

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

Apache kafka原理与特性(0.8V)  http://www.linuxidc.com/Linux/2014-09/107388.htm

Kafka部署与代码实例  http://www.linuxidc.com/Linux/2014-09/107387.htm

Kafka介绍和集群环境搭建  http://www.linuxidc.com/Linux/2014-09/107382.htm

Kafka 的详细介绍请点这里
Kafka 的下载地址请点这里

本文永久更新链接地址http://www.linuxidc.com/Linux/2015-04/116177.htm

linux
相关资讯       kafka  JMX监控 
本文评论   查看全部评论 (1)
表情: 表情 姓名: 字数

       

评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款
第 1 楼
* Lucas_zh会员 发表于 2016/11/15 23:35:50
楼主你好,请问KafkaRoleInfo monitorDataPoint = new KafkaRoleInfo()里的KafkaRoleInfo和MetricDataUtils.getMBeanServerConnection分别引的是哪个包?是否方便留下联系方式以便探讨问题