mirror of
https://github.com/nicolargo/glances.git
synced 2024-12-25 10:12:55 +03:00
Add Kafka export plugin #858
This commit is contained in:
parent
e568bcfe4e
commit
70eb8013ea
1
NEWS
1
NEWS
@ -8,6 +8,7 @@ Version 2.9.0
|
|||||||
Enhancements and new features:
|
Enhancements and new features:
|
||||||
|
|
||||||
* Add a Prometheus export module (issue #930)
|
* Add a Prometheus export module (issue #930)
|
||||||
|
* Add a Kafka export module (issue #858)
|
||||||
* Port in the -c URI (-c hostname:port) (issue #996)
|
* Port in the -c URI (-c hostname:port) (issue #996)
|
||||||
|
|
||||||
Bugs corrected:
|
Bugs corrected:
|
||||||
|
@ -349,6 +349,14 @@ db=glances
|
|||||||
#user=root
|
#user=root
|
||||||
#password=root
|
#password=root
|
||||||
|
|
||||||
|
[kafka]
|
||||||
|
# Configuration for the --export-kafka option
|
||||||
|
# http://kafka.apache.org/
|
||||||
|
host=localhost
|
||||||
|
port=9092
|
||||||
|
topic=glances
|
||||||
|
#compression=gzip
|
||||||
|
|
||||||
[zeromq]
|
[zeromq]
|
||||||
# Configuration for the --export-zeromq option
|
# Configuration for the --export-zeromq option
|
||||||
# http://www.zeromq.org
|
# http://www.zeromq.org
|
||||||
|
@ -14,6 +14,7 @@ to providing stats to multiple services (see list below).
|
|||||||
couchdb
|
couchdb
|
||||||
elastic
|
elastic
|
||||||
influxdb
|
influxdb
|
||||||
|
kafka
|
||||||
opentsdb
|
opentsdb
|
||||||
prometheus
|
prometheus
|
||||||
rabbitmq
|
rabbitmq
|
||||||
|
46
docs/gw/kafka.rst
Normal file
46
docs/gw/kafka.rst
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
.. _kafka:
|
||||||
|
|
||||||
|
Kafka
|
||||||
|
=====
|
||||||
|
|
||||||
|
You can export statistics to a ``Kafka`` server.
|
||||||
|
The connection should be defined in the Glances configuration file as
|
||||||
|
following:
|
||||||
|
|
||||||
|
.. code-block:: ini
|
||||||
|
|
||||||
|
[kafka]
|
||||||
|
host=localhost
|
||||||
|
port=9092
|
||||||
|
topic=glances
|
||||||
|
#compression=gzip
|
||||||
|
|
||||||
|
Note: you can enable the compression but it consume CPU on your host.
|
||||||
|
|
||||||
|
and run Glances with:
|
||||||
|
|
||||||
|
.. code-block:: console
|
||||||
|
|
||||||
|
$ glances --export-kafka
|
||||||
|
|
||||||
|
Stats are sent in native ``JSON`` format to the topic:
|
||||||
|
|
||||||
|
- ``key``: plugin name
|
||||||
|
- ``value``: JSON dict
|
||||||
|
|
||||||
|
Example of record for the memory plugin:
|
||||||
|
|
||||||
|
.. code-block::
|
||||||
|
|
||||||
|
ConsumerRecord(topic=u'glances', partition=0, offset=1305, timestamp=1490460592248, timestamp_type=0, key='mem', value=u'{"available": 2094710784, "used": 5777428480, "cached": 2513543168, "mem_careful": 50.0, "percent": 73.4, "free": 2094710784, "mem_critical": 90.0, "inactive": 2361626624, "shared": 475504640, "history_size": 28800.0, "mem_warning": 70.0, "total": 7872139264, "active": 4834361344, "buffers": 160112640}', checksum=214895201, serialized_key_size=3, serialized_value_size=303)
|
||||||
|
|
||||||
|
Python code example to consume Kafka Glances plugin:
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
from kafka import KafkaConsumer
|
||||||
|
import json
|
||||||
|
|
||||||
|
consumer = KafkaConsumer('glances', value_deserializer=json.loads)
|
||||||
|
for s in consumer:
|
||||||
|
print s
|
99
glances/exports/glances_kafka.py
Normal file
99
glances/exports/glances_kafka.py
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# This file is part of Glances.
|
||||||
|
#
|
||||||
|
# Copyright (C) 2017 Nicolargo <nicolas@nicolargo.com>
|
||||||
|
#
|
||||||
|
# Glances is free software; you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# Glances is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Lesser General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Lesser General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
"""Kafka interface class."""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from glances.logger import logger
|
||||||
|
from glances.compat import iteritems
|
||||||
|
from glances.exports.glances_export import GlancesExport
|
||||||
|
|
||||||
|
from kafka import KafkaProducer
|
||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
class Export(GlancesExport):
|
||||||
|
|
||||||
|
"""This class manages the Kafka export module."""
|
||||||
|
|
||||||
|
def __init__(self, config=None, args=None):
|
||||||
|
"""Init the Kafka export IF."""
|
||||||
|
super(Export, self).__init__(config=config, args=args)
|
||||||
|
|
||||||
|
# Mandatories configuration keys (additional to host and port)
|
||||||
|
self.topic = None
|
||||||
|
|
||||||
|
# Optionals configuration keys
|
||||||
|
self.compression = None
|
||||||
|
|
||||||
|
# Load the Cassandra configuration file section
|
||||||
|
self.export_enable = self.load_conf('kafka',
|
||||||
|
mandatories=['host', 'port', 'topic'],
|
||||||
|
options=['compression'])
|
||||||
|
if not self.export_enable:
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
# Init the kafka client
|
||||||
|
self.client = self.init()
|
||||||
|
|
||||||
|
def init(self):
|
||||||
|
"""Init the connection to the Kafka server."""
|
||||||
|
if not self.export_enable:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Build the server URI with host and port
|
||||||
|
server_uri = '{}:{}'.format(self.host, self.port)
|
||||||
|
|
||||||
|
try:
|
||||||
|
s = KafkaProducer(bootstrap_servers=server_uri,
|
||||||
|
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
|
||||||
|
compression_type=self.compression)
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical("Cannot connect to Kafka server %s (%s)" % (server_uri, e))
|
||||||
|
sys.exit(2)
|
||||||
|
else:
|
||||||
|
logger.info("Connected to the Kafka server %s" % server_uri)
|
||||||
|
|
||||||
|
return s
|
||||||
|
|
||||||
|
def export(self, name, columns, points):
|
||||||
|
"""Write the points to the kafka server."""
|
||||||
|
logger.debug("Export {} stats to Kafka".format(name))
|
||||||
|
|
||||||
|
# Create DB input
|
||||||
|
data = dict(zip(columns, points))
|
||||||
|
|
||||||
|
# Send stats to the kafka topic
|
||||||
|
# key=<plugin name>
|
||||||
|
# value=JSON dict
|
||||||
|
try:
|
||||||
|
self.client.send(self.topic,
|
||||||
|
key=name,
|
||||||
|
value=data)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Cannot export {} stats to Kafka ({})".format(name, e))
|
||||||
|
|
||||||
|
def exit(self):
|
||||||
|
"""Close the Kafka export module."""
|
||||||
|
# To ensure all connections are properly closed
|
||||||
|
self.client.flush()
|
||||||
|
self.client.close()
|
||||||
|
# Call the father method
|
||||||
|
super(Export, self).exit()
|
@ -181,6 +181,8 @@ Examples of use:
|
|||||||
dest='export_elasticsearch', help='export stats to an ElasticSearch server (elasticsearch lib needed)')
|
dest='export_elasticsearch', help='export stats to an ElasticSearch server (elasticsearch lib needed)')
|
||||||
parser.add_argument('--export-influxdb', action='store_true', default=False,
|
parser.add_argument('--export-influxdb', action='store_true', default=False,
|
||||||
dest='export_influxdb', help='export stats to an InfluxDB server (influxdb lib needed)')
|
dest='export_influxdb', help='export stats to an InfluxDB server (influxdb lib needed)')
|
||||||
|
parser.add_argument('--export-kafka', action='store_true', default=False,
|
||||||
|
dest='export_kafka', help='export stats to a Kafka server (kafka-python lib needed)')
|
||||||
parser.add_argument('--export-opentsdb', action='store_true', default=False,
|
parser.add_argument('--export-opentsdb', action='store_true', default=False,
|
||||||
dest='export_opentsdb', help='export stats to an OpenTSDB server (potsdb lib needed)')
|
dest='export_opentsdb', help='export stats to an OpenTSDB server (potsdb lib needed)')
|
||||||
parser.add_argument('--export-prometheus', action='store_true', default=False,
|
parser.add_argument('--export-prometheus', action='store_true', default=False,
|
||||||
|
@ -5,6 +5,7 @@ couchdb
|
|||||||
docker
|
docker
|
||||||
elasticsearch
|
elasticsearch
|
||||||
influxdb
|
influxdb
|
||||||
|
kafka-python
|
||||||
matplotlib
|
matplotlib
|
||||||
netifaces
|
netifaces
|
||||||
nvidia-ml-py; python_version == "2.7"
|
nvidia-ml-py; python_version == "2.7"
|
||||||
|
4
setup.py
4
setup.py
@ -85,8 +85,8 @@ setup(
|
|||||||
'chart': ['matplotlib'],
|
'chart': ['matplotlib'],
|
||||||
'docker': ['docker>=2.0.0'],
|
'docker': ['docker>=2.0.0'],
|
||||||
'export': ['bernhard', 'cassandra-driver', 'couchdb', 'elasticsearch',
|
'export': ['bernhard', 'cassandra-driver', 'couchdb', 'elasticsearch',
|
||||||
'influxdb>=1.0.0', 'pika', 'potsdb', 'prometheus_client',
|
'influxdb>=1.0.0', 'kafka-python', 'pika', 'potsdb',
|
||||||
'pyzmq', 'statsd'],
|
'prometheus_client', 'pyzmq', 'statsd'],
|
||||||
'folders:python_version<"3.5"': ['scandir'],
|
'folders:python_version<"3.5"': ['scandir'],
|
||||||
'gpu:python_version=="2.7"': ['nvidia-ml-py'],
|
'gpu:python_version=="2.7"': ['nvidia-ml-py'],
|
||||||
'ip': ['netifaces'],
|
'ip': ['netifaces'],
|
||||||
|
Loading…
Reference in New Issue
Block a user