diff --git a/NEWS b/NEWS index b6b2829d..b4e43373 100644 --- a/NEWS +++ b/NEWS @@ -8,6 +8,7 @@ Version 2.9.0 Enhancements and new features: * Add a Prometheus export module (issue #930) + * Add a Kafka export module (issue #858) * Port in the -c URI (-c hostname:port) (issue #996) Bugs corrected: diff --git a/conf/glances.conf b/conf/glances.conf index e81b34fa..c7389357 100644 --- a/conf/glances.conf +++ b/conf/glances.conf @@ -349,6 +349,14 @@ db=glances #user=root #password=root +[kafka] +# Configuration for the --export-kafka option +# http://kafka.apache.org/ +host=localhost +port=9092 +topic=glances +#compression=gzip + [zeromq] # Configuration for the --export-zeromq option # http://www.zeromq.org diff --git a/docs/gw/index.rst b/docs/gw/index.rst index 0b9fbb82..6d0c2251 100644 --- a/docs/gw/index.rst +++ b/docs/gw/index.rst @@ -14,6 +14,7 @@ to providing stats to multiple services (see list below). couchdb elastic influxdb + kafka opentsdb prometheus rabbitmq diff --git a/docs/gw/kafka.rst b/docs/gw/kafka.rst new file mode 100644 index 00000000..f678a148 --- /dev/null +++ b/docs/gw/kafka.rst @@ -0,0 +1,46 @@ +.. _kafka: + +Kafka +===== + +You can export statistics to a ``Kafka`` server. +The connection should be defined in the Glances configuration file as +following: + +.. code-block:: ini + + [kafka] + host=localhost + port=9092 + topic=glances + #compression=gzip + +Note: you can enable the compression but it consume CPU on your host. + +and run Glances with: + +.. code-block:: console + + $ glances --export-kafka + +Stats are sent in native ``JSON`` format to the topic: + +- ``key``: plugin name +- ``value``: JSON dict + +Example of record for the memory plugin: + +.. code-block:: + + ConsumerRecord(topic=u'glances', partition=0, offset=1305, timestamp=1490460592248, timestamp_type=0, key='mem', value=u'{"available": 2094710784, "used": 5777428480, "cached": 2513543168, "mem_careful": 50.0, "percent": 73.4, "free": 2094710784, "mem_critical": 90.0, "inactive": 2361626624, "shared": 475504640, "history_size": 28800.0, "mem_warning": 70.0, "total": 7872139264, "active": 4834361344, "buffers": 160112640}', checksum=214895201, serialized_key_size=3, serialized_value_size=303) + +Python code example to consume Kafka Glances plugin: + +.. code-block:: python + + from kafka import KafkaConsumer + import json + + consumer = KafkaConsumer('glances', value_deserializer=json.loads) + for s in consumer: + print s diff --git a/glances/exports/glances_kafka.py b/glances/exports/glances_kafka.py new file mode 100644 index 00000000..9b56b3f5 --- /dev/null +++ b/glances/exports/glances_kafka.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# +# This file is part of Glances. +# +# Copyright (C) 2017 Nicolargo +# +# Glances is free software; you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Glances is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +"""Kafka interface class.""" + +import sys + +from glances.logger import logger +from glances.compat import iteritems +from glances.exports.glances_export import GlancesExport + +from kafka import KafkaProducer +import json + + +class Export(GlancesExport): + + """This class manages the Kafka export module.""" + + def __init__(self, config=None, args=None): + """Init the Kafka export IF.""" + super(Export, self).__init__(config=config, args=args) + + # Mandatories configuration keys (additional to host and port) + self.topic = None + + # Optionals configuration keys + self.compression = None + + # Load the Cassandra configuration file section + self.export_enable = self.load_conf('kafka', + mandatories=['host', 'port', 'topic'], + options=['compression']) + if not self.export_enable: + sys.exit(2) + + # Init the kafka client + self.client = self.init() + + def init(self): + """Init the connection to the Kafka server.""" + if not self.export_enable: + return None + + # Build the server URI with host and port + server_uri = '{}:{}'.format(self.host, self.port) + + try: + s = KafkaProducer(bootstrap_servers=server_uri, + value_serializer=lambda v: json.dumps(v).encode('utf-8'), + compression_type=self.compression) + except Exception as e: + logger.critical("Cannot connect to Kafka server %s (%s)" % (server_uri, e)) + sys.exit(2) + else: + logger.info("Connected to the Kafka server %s" % server_uri) + + return s + + def export(self, name, columns, points): + """Write the points to the kafka server.""" + logger.debug("Export {} stats to Kafka".format(name)) + + # Create DB input + data = dict(zip(columns, points)) + + # Send stats to the kafka topic + # key= + # value=JSON dict + try: + self.client.send(self.topic, + key=name, + value=data) + except Exception as e: + logger.error("Cannot export {} stats to Kafka ({})".format(name, e)) + + def exit(self): + """Close the Kafka export module.""" + # To ensure all connections are properly closed + self.client.flush() + self.client.close() + # Call the father method + super(Export, self).exit() diff --git a/glances/main.py b/glances/main.py index 2b47e508..928e01c8 100644 --- a/glances/main.py +++ b/glances/main.py @@ -181,6 +181,8 @@ Examples of use: dest='export_elasticsearch', help='export stats to an ElasticSearch server (elasticsearch lib needed)') parser.add_argument('--export-influxdb', action='store_true', default=False, dest='export_influxdb', help='export stats to an InfluxDB server (influxdb lib needed)') + parser.add_argument('--export-kafka', action='store_true', default=False, + dest='export_kafka', help='export stats to a Kafka server (kafka-python lib needed)') parser.add_argument('--export-opentsdb', action='store_true', default=False, dest='export_opentsdb', help='export stats to an OpenTSDB server (potsdb lib needed)') parser.add_argument('--export-prometheus', action='store_true', default=False, diff --git a/optional-requirements.txt b/optional-requirements.txt index 5ae9990b..327dae40 100644 --- a/optional-requirements.txt +++ b/optional-requirements.txt @@ -5,6 +5,7 @@ couchdb docker elasticsearch influxdb +kafka-python matplotlib netifaces nvidia-ml-py; python_version == "2.7" diff --git a/setup.py b/setup.py index 49c2f7db..467bb8b5 100755 --- a/setup.py +++ b/setup.py @@ -85,8 +85,8 @@ setup( 'chart': ['matplotlib'], 'docker': ['docker>=2.0.0'], 'export': ['bernhard', 'cassandra-driver', 'couchdb', 'elasticsearch', - 'influxdb>=1.0.0', 'pika', 'potsdb', 'prometheus_client', - 'pyzmq', 'statsd'], + 'influxdb>=1.0.0', 'kafka-python', 'pika', 'potsdb', + 'prometheus_client', 'pyzmq', 'statsd'], 'folders:python_version<"3.5"': ['scandir'], 'gpu:python_version=="2.7"': ['nvidia-ml-py'], 'ip': ['netifaces'],