From 7c3fe93226484ceb3d4ff4b13978593b67a1cd89 Mon Sep 17 00:00:00 2001 From: Raz Crimson <52282402+RazCrimson@users.noreply.github.com> Date: Sat, 18 Feb 2023 23:37:16 +0530 Subject: [PATCH] chg: containers Plugin - basic pod support StatsFetcher -> StatsStreamer --- glances/plugins/containers/glances_docker.py | 85 ++++----- glances/plugins/containers/glances_podman.py | 191 ++++++++++++++++--- glances/plugins/containers/stats_fetcher.py | 72 ------- glances/plugins/containers/stats_streamer.py | 76 ++++++++ glances/plugins/glances_containers.py | 26 ++- 5 files changed, 299 insertions(+), 151 deletions(-) delete mode 100644 glances/plugins/containers/stats_fetcher.py create mode 100644 glances/plugins/containers/stats_streamer.py diff --git a/glances/plugins/containers/glances_docker.py b/glances/plugins/containers/glances_docker.py index 11ea2ef9..a642db9f 100644 --- a/glances/plugins/containers/glances_docker.py +++ b/glances/plugins/containers/glances_docker.py @@ -1,10 +1,9 @@ """Docker Extension unit for Glances' Containers plugin.""" -import threading import time from glances.compat import iterkeys, itervalues, nativestr, pretty_date from glances.logger import logger -from glances.plugins.containers.stats_fetcher import StatsFetcher +from glances.plugins.containers.stats_streamer import StatsStreamer # Docker-py library (optional and Linux-only) # https://github.com/docker/docker-py @@ -19,48 +18,43 @@ else: import_docker_error_tag = False -class DockerStatsFetcher(StatsFetcher): +class DockerStatsFetcher: MANDATORY_MEMORY_FIELDS = ["usage", 'limit'] def __init__(self, container): - super().__init__(container) - # Lock to avoid the daemon thread updating stats when main thread reads the stats - self._stats_lock = threading.Lock() + self._container = container # Previous computes stats are stored in the self._old_computed_stats variable - # By storing time data we enable IoR/s and IoW/s calculations in the XML/RPC API, which would otherwise - # be overly difficult work for users of the API + # We store time data to enable IoR/s & IoW/s calculations to avoid complexity for consumers of the APIs exposed. self._old_computed_stats = {} # Last time when output stats (results) were computed - self._last_stats_output_time = 0 - # Last time when the raw_stats were updated by worker thread - self._last_raws_stats_update_time = 1 + self._last_stats_computed_time = 0 + + # Threaded Streamer + stats_iterable = container.stats(decode=True) + self._streamer = StatsStreamer(stats_iterable, initial_stream_value={}) + + def _log_debug(self, msg, exception=None): + logger.debug("containers (Docker) ID: {} - {} ({}) ".format(self._container.id, msg, exception)) + logger.debug(self._streamer.stats) + + def stop(self): + self._streamer.stop() @property def activity_stats(self): """Activity Stats - Each successive access of activity_stats will cause computation of activity_stats from raw_stats + Each successive access of activity_stats will cause computation of activity_stats """ computed_activity_stats = self._compute_activity_stats() self._old_computed_stats = computed_activity_stats - self._last_stats_output_time = time.time() + self._last_stats_computed_time = time.time() return computed_activity_stats - def _pre_raw_stats_update_hook(self): - self._stats_lock.acquire() - - def _post_raw_stats_update_hook(self): - self._last_raws_stats_update_time = time.time() - self._stats_lock.release() - - @property - def time_since_update(self): - return self._last_raws_stats_update_time - self._last_stats_output_time - def _compute_activity_stats(self): - with self._stats_lock: + with self._streamer.result_lock: io_stats = self._get_io_stats() cpu_stats = self._get_cpu_stats() memory_stats = self._get_memory_stats() @@ -74,6 +68,11 @@ class DockerStatsFetcher(StatsFetcher): } return computed_stats + @property + def time_since_update(self): + # In case no update, default to 1 + return max(1, self._streamer.last_update_time - self._last_stats_computed_time) + def _get_cpu_stats(self): """Return the container CPU usage. @@ -82,8 +81,8 @@ class DockerStatsFetcher(StatsFetcher): stats = {'total': 0.0} try: - cpu_stats = self.stats['cpu_stats'] - precpu_stats = self.stats['precpu_stats'] + cpu_stats = self._streamer.stats['cpu_stats'] + precpu_stats = self._streamer.stats['precpu_stats'] cpu = {'system': cpu_stats['system_cpu_usage'], 'total': cpu_stats['cpu_usage']['total_usage']} precpu = {'system': precpu_stats['system_cpu_usage'], 'total': precpu_stats['cpu_usage']['total_usage']} @@ -93,8 +92,7 @@ class DockerStatsFetcher(StatsFetcher): # the corresponding cpu_usage.percpu_usage array should be used. cpu['nb_core'] = cpu_stats.get('online_cpus') or len(cpu_stats['cpu_usage']['percpu_usage'] or []) except KeyError as e: - logger.debug("containers plugin - Can't grab CPU stat for container {} ({})".format(self._container.id, e)) - logger.debug(self.stats) + self._log_debug("Can't grab CPU stats", e) return None try: @@ -103,9 +101,7 @@ class DockerStatsFetcher(StatsFetcher): # CPU usage % = (cpu_delta / system_cpu_delta) * number_cpus * 100.0 stats['total'] = (cpu_delta / system_cpu_delta) * cpu['nb_core'] * 100.0 except TypeError as e: - msg = "containers plugin - Can't compute CPU usage for container {} ({})".format(self._container.id, e) - logger.debug(msg) - logger.debug(self.stats) + self._log_debug("Can't compute CPU usage", e) return None # Return the stats @@ -116,12 +112,11 @@ class DockerStatsFetcher(StatsFetcher): Output: a dict {'rss': 1015808, 'cache': 356352, 'usage': ..., 'max_usage': ...} """ - memory_stats = self.stats.get('memory_stats') + memory_stats = self._streamer.stats.get('memory_stats') # Checks for memory_stats & mandatory fields if not memory_stats or any(field not in memory_stats for field in self.MANDATORY_MEMORY_FIELDS): - logger.debug("containers plugin - Missing MEM usage fields for container {}".format(self._container.id)) - logger.debug(self.stats) + self._log_debug("Missing MEM usage fields") return None stats = {field: memory_stats[field] for field in self.MANDATORY_MEMORY_FIELDS} @@ -132,9 +127,7 @@ class DockerStatsFetcher(StatsFetcher): stats['max_usage'] = detailed_stats.get('max_usage') stats['cache'] = detailed_stats.get('cache') except (KeyError, TypeError) as e: - # self.stats do not have MEM information - logger.debug("containers plugin - Can't grab MEM usage for container {} ({})".format(self._container.id, e)) - logger.debug(self.stats) + self._log_debug("Can't grab MEM usage", e) # stats do not have MEM information return None # Return the stats @@ -149,12 +142,11 @@ class DockerStatsFetcher(StatsFetcher): rx: Number of bytes received tx: Number of bytes transmitted """ - eth0_stats = self.stats.get('networks', {}).get('eth0') + eth0_stats = self._streamer.stats.get('networks', {}).get('eth0') # Checks for net_stats & mandatory fields if not eth0_stats or any(field not in eth0_stats for field in ['rx_bytes', 'tx_bytes']): - logger.debug("containers plugin - Missing Network usage fields for container {}".format(self._container.id)) - logger.debug(self.stats) + self._log_debug("Missing Network usage fields") return None # Read the rx/tx stats (in bytes) @@ -179,12 +171,11 @@ class DockerStatsFetcher(StatsFetcher): ior: Number of bytes read iow: Number of bytes written """ - io_service_bytes_recursive = self.stats.get('blkio_stats', {}).get('io_service_bytes_recursive') + io_service_bytes_recursive = self._streamer.stats.get('blkio_stats', {}).get('io_service_bytes_recursive') # Checks for net_stats if not io_service_bytes_recursive: - logger.debug("containers plugin - Missing blockIO usage fields for container {}".format(self._container.id)) - logger.debug(self.stats) + self._log_debug("Missing blockIO usage fields") return None # Read the ior/iow stats (in bytes) @@ -193,11 +184,7 @@ class DockerStatsFetcher(StatsFetcher): cumulative_ior = [i for i in io_service_bytes_recursive if i['op'].lower() == 'read'][0]['value'] cumulative_iow = [i for i in io_service_bytes_recursive if i['op'].lower() == 'write'][0]['value'] except (TypeError, IndexError, KeyError, AttributeError) as e: - # self.stats do not have io information - logger.debug( - "containers plugin - Can't grab blockIO usage for container {} ({})".format(self._container.id, e) - ) - logger.debug(self.stats) + self._log_debug("Can't grab blockIO usage", e) # stats do not have io information return None stats = {'cumulative_ior': cumulative_ior, 'cumulative_iow': cumulative_iow} diff --git a/glances/plugins/containers/glances_podman.py b/glances/plugins/containers/glances_podman.py index ff2119d4..602d6d52 100644 --- a/glances/plugins/containers/glances_podman.py +++ b/glances/plugins/containers/glances_podman.py @@ -1,9 +1,10 @@ """Podman Extension unit for Glances' Containers plugin.""" +import json from datetime import datetime -from glances.compat import iterkeys, itervalues, nativestr, pretty_date +from glances.compat import iterkeys, itervalues, nativestr, pretty_date, string_value_to_float from glances.logger import logger -from glances.plugins.containers.stats_fetcher import StatsFetcher +from glances.plugins.containers.stats_streamer import StatsStreamer # Podman library (optional and Linux-only) # https://pypi.org/project/podman/ @@ -17,37 +18,51 @@ else: import_podman_error_tag = False -class PodmanStatsFetcher(StatsFetcher): +class PodmanContainerStatsFetcher: MANDATORY_FIELDS = ["CPU", "MemUsage", "MemLimit", "NetInput", "NetOutput", "BlockInput", "BlockOutput"] + def __init__(self, container): + self._container = container + + # Threaded Streamer + stats_iterable = container.stats(decode=True) + self._streamer = StatsStreamer(stats_iterable, initial_stream_value={}) + + def _log_debug(self, msg, exception=None): + logger.debug("containers (Podman) ID: {} - {} ({})".format(self._container.id, msg, exception)) + logger.debug(self._streamer.stats) + + def stop(self): + self._streamer.stop() + @property def stats(self): - if self._raw_stats["Error"]: - logger.error("containers plugin - Stats fetching failed: {}".format(self._raw_stats["Error"])) - logger.error(self._raw_stats) + stats = self._streamer.stats + if stats["Error"]: + self._log_debug("Stats fetching failed", stats["Error"]) - return self._raw_stats["Stats"][0] + return stats["Stats"][0] @property def activity_stats(self): result_stats = {"cpu": {}, "memory": {}, "io": {}, "network": {}} + api_stats = self.stats - if any(field not in self.stats for field in self.MANDATORY_FIELDS): - logger.debug("containers plugin - Missing mandatory fields for container {}".format(self._container.id)) - logger.debug(self.stats) + if any(field not in api_stats for field in self.MANDATORY_FIELDS): + self._log_debug("Missing mandatory fields") return result_stats try: - cpu_usage = float(self.stats.get("CPU", 0)) + cpu_usage = float(api_stats.get("CPU", 0)) - mem_usage = float(self.stats["MemUsage"]) - mem_limit = float(self.stats["MemLimit"]) + mem_usage = float(api_stats["MemUsage"]) + mem_limit = float(api_stats["MemLimit"]) - rx = float(self.stats["NetInput"]) - tx = float(self.stats["NetOutput"]) + rx = float(api_stats["NetInput"]) + tx = float(api_stats["NetOutput"]) - ior = float(self.stats["BlockInput"]) - iow = float(self.stats["BlockOutput"]) + ior = float(api_stats["BlockInput"]) + iow = float(api_stats["BlockOutput"]) # Hardcode `time_since_update` to 1 as podman already sends the calculated rate result_stats = { @@ -56,14 +71,136 @@ class PodmanStatsFetcher(StatsFetcher): "io": {"ior": ior, "iow": iow, "time_since_update": 1}, "network": {"rx": rx, "tx": tx, "time_since_update": 1}, } - except ValueError: - logger.debug("containers plugin - Non float stats values found for container {}".format(self._container.id)) - logger.debug(self.stats) - return result_stats + except ValueError as e: + self._log_debug("Non float stats values found", e) return result_stats +class PodmanPodStatsFetcher: + def __init__(self, pod_manager): + self._pod_manager = pod_manager + + # Threaded Streamer + stats_iterable = pod_manager.stats(stream=True, decode=True) + self._streamer = StatsStreamer(stats_iterable, initial_stream_value={}) + + def _log_debug(self, msg, exception=None): + logger.debug("containers (Podman): Pod Manager - {} ({})".format(msg, exception)) + logger.debug(self._streamer.stats) + + def stop(self): + self._streamer.stop() + + @property + def activity_stats(self): + result_stats = {} + container_stats = self._streamer.stats + for stat in container_stats: + io_stats = self._get_io_stats(stat) + cpu_stats = self._get_cpu_stats(stat) + memory_stats = self._get_memory_stats(stat) + network_stats = self._get_network_stats(stat) + + computed_stats = { + "name": stat["Name"], + "cid": stat["CID"], + "pod_id": stat["Pod"], + "io": io_stats or {}, + "memory": memory_stats or {}, + "network": network_stats or {}, + "cpu": cpu_stats or {"total": 0.0}, + } + result_stats[stat["CID"]] = computed_stats + + return result_stats + + def _get_cpu_stats(self, stats): + """Return the container CPU usage. + + Output: a dict {'total': 1.49} + """ + if "CPU" not in stats: + self._log_debug("Missing CPU usage fields") + return None + + cpu_usage = string_value_to_float(stats["CPU"].rstrip("%")) + return {"total": cpu_usage} + + def _get_memory_stats(self, stats): + """Return the container MEMORY. + + Output: a dict {'rss': 1015808, 'cache': 356352, 'usage': ..., 'max_usage': ...} + """ + if "MemUsage" not in stats or "/" not in stats["MemUsage"]: + self._log_debug("Missing MEM usage fields") + return None + + memory_usage_str = stats["MemUsage"] + usage_str, limit_str = memory_usage_str.split("/") + + try: + usage = string_value_to_float(usage_str) + limit = string_value_to_float(limit_str) + except ValueError as e: + self._log_debug("Compute MEM usage failed", e) + return None + + return {"usage": usage, "limit": limit} + + def _get_network_stats(self, stats): + """Return the container network usage using the Docker API (v1.0 or higher). + + Output: a dict {'time_since_update': 3000, 'rx': 10, 'tx': 65}. + with: + time_since_update: number of seconds elapsed between the latest grab + rx: Number of bytes received + tx: Number of bytes transmitted + """ + if "NetIO" not in stats or "/" not in stats["NetIO"]: + self._log_debug("Compute MEM usage failed") + return None + + net_io_str = stats["NetIO"] + rx_str, tx_str = net_io_str.split("/") + + try: + rx = string_value_to_float(rx_str) + tx = string_value_to_float(tx_str) + except ValueError as e: + self._log_debug("Compute MEM usage failed", e) + return None + + # Hardcode `time_since_update` to 1 as podman docs don't specify the rate calculated procedure + return {"rx": rx, "tx": tx, "time_since_update": 1} + + def _get_io_stats(self, stats): + """Return the container IO usage using the Docker API (v1.0 or higher). + + Output: a dict {'time_since_update': 3000, 'ior': 10, 'iow': 65}. + with: + time_since_update: number of seconds elapsed between the latest grab + ior: Number of bytes read + iow: Number of bytes written + """ + if "BlockIO" not in stats or "/" not in stats["BlockIO"]: + self._log_debug("Missing BlockIO usage fields") + return None + + block_io_str = stats["BlockIO"] + ior_str, iow_str = block_io_str.split("/") + + try: + ior = string_value_to_float(ior_str) + iow = string_value_to_float(iow_str) + except ValueError as e: + self._log_debug("Compute BlockIO usage failed", e) + return None + + # Hardcode `time_since_update` to 1 as podman docs don't specify the rate calculated procedure + return {"ior": ior, "iow": iow, "time_since_update": 1} + + class PodmanContainersExtension: """Glances' Containers Plugin's Docker Extension unit""" @@ -77,6 +214,7 @@ class PodmanContainersExtension: self.ext_name = "Podman (Containers)" self.podman_sock = podman_sock self.stats_fetchers = {} + self.pod_fetcher = None self._version = {} self.connect() @@ -118,6 +256,8 @@ class PodmanContainersExtension: # Issue #1152: Podman module doesn't export details about stopped containers # The Containers/all key of the configuration file should be set to True containers = self.client.containers.list(all=all_tag) + if not self.pod_fetcher: + self.pod_fetcher = PodmanPodStatsFetcher(self.client.pods) except Exception as e: logger.error("{} plugin - Cannot get containers list ({})".format(self.ext_name, e)) return version_stats, [] @@ -128,7 +268,7 @@ class PodmanContainersExtension: # StatsFetcher did not exist in the internal dict # Create it, add it to the internal dict logger.debug("{} plugin - Create thread for container {}".format(self.ext_name, container.id[:12])) - self.stats_fetchers[container.id] = PodmanStatsFetcher(container) + self.stats_fetchers[container.id] = PodmanContainerStatsFetcher(container) # Stop threads for non-existing containers absent_containers = set(iterkeys(self.stats_fetchers)) - set(c.id for c in containers) @@ -141,6 +281,13 @@ class PodmanContainersExtension: # Get stats for all containers container_stats = [self.generate_stats(container) for container in containers] + + pod_stats = self.pod_fetcher.activity_stats + for stats in container_stats: + if stats["Id"][:12] in pod_stats: + stats["pod_name"] = pod_stats[stats["Id"][:12]]["name"] + stats["pod_id"] = pod_stats[stats["Id"][:12]]["pod_id"] + return version_stats, container_stats @property diff --git a/glances/plugins/containers/stats_fetcher.py b/glances/plugins/containers/stats_fetcher.py deleted file mode 100644 index ed08f4ce..00000000 --- a/glances/plugins/containers/stats_fetcher.py +++ /dev/null @@ -1,72 +0,0 @@ -import threading -import time - -from glances.logger import logger - - -class StatsFetcher: - # Should be an Abstract Base Class - # Inherit from abc.ABC by Glancesv4 (not inheriting for compatibility with py2) - """ - Streams the container stats through threading - - Use `StatsFetcher.stats` to access the streamed results - """ - - def __init__(self, container): - """Init the class. - - container: instance of Container returned by Docker or Podman client - """ - # The docker-py return stats as a stream - self._container = container - # Container stats are maintained as dicts - self._raw_stats = {} - # Use a Thread to stream stats - self._thread = threading.Thread(target=self._fetch_stats, daemon=True) - # Event needed to stop properly the thread - self._stopper = threading.Event() - - self._thread.start() - logger.debug("docker plugin - Create thread for container {}".format(self._container.name)) - - def _fetch_stats(self): - """Grab the stats. - - Infinite loop, should be stopped by calling the stop() method - """ - try: - for new_stats in self._container.stats(decode=True): - self._pre_raw_stats_update_hook() - self._raw_stats = new_stats - self._post_raw_stats_update_hook() - - time.sleep(0.1) - if self.stopped(): - break - - except Exception as e: - logger.debug("docker plugin - Exception thrown during run ({})".format(e)) - self.stop() - - def stopped(self): - """Return True is the thread is stopped.""" - return self._stopper.is_set() - - def stop(self, timeout=None): - """Stop the thread.""" - logger.debug("docker plugin - Close thread for container {}".format(self._container.name)) - self._stopper.set() - - @property - def stats(self): - """Raw Stats getter.""" - return self._raw_stats - - def _pre_raw_stats_update_hook(self): - """Hook that runs before worker thread updates the raw_stats""" - pass - - def _post_raw_stats_update_hook(self): - """Hook that runs after worker thread updates the raw_stats""" - pass diff --git a/glances/plugins/containers/stats_streamer.py b/glances/plugins/containers/stats_streamer.py new file mode 100644 index 00000000..0bf7d38e --- /dev/null +++ b/glances/plugins/containers/stats_streamer.py @@ -0,0 +1,76 @@ +import threading +import time + +from glances.logger import logger + + +class StatsStreamer: + """ + Utility class to stream an iterable using a background / daemon Thread + + Use `StatsStreamer.stats` to access the latest streamed results + """ + + def __init__(self, iterable, initial_stream_value=None): + """ + iterable: an Iterable instance that needs to be streamed + """ + self._iterable = iterable + # Iterable results are stored here + self._raw_result = initial_stream_value + # Use a Thread to stream iterable (daemon=True to automatically kill thread when main process dies) + self._thread = threading.Thread(target=self._stream_results, daemon=True) + # Event needed to stop the thread manually + self._stopper = threading.Event() + # Lock to avoid the daemon thread updating stats when main thread reads the stats + self.result_lock = threading.Lock() + # Last result streamed time (initial val 0) + self._last_update_time = 0 + + self._thread.start() + + def stop(self): + """Stop the thread.""" + self._stopper.set() + + def stopped(self): + """Return True is the thread is stopped.""" + return self._stopper.is_set() + + def _stream_results(self): + """Grab the stats. + + Infinite loop, should be stopped by calling the stop() method + """ + try: + for res in self._iterable: + self._pre_update_hook() + self._raw_result = res + self._post_update_hook() + + time.sleep(0.1) + if self.stopped(): + break + + except Exception as e: + logger.debug("docker plugin - Exception thrown during run ({})".format(e)) + self.stop() + + def _pre_update_hook(self): + """Hook that runs before worker thread updates the raw_stats""" + self.result_lock.acquire() + + def _post_update_hook(self): + """Hook that runs after worker thread updates the raw_stats""" + self._last_update_time = time.time() + self.result_lock.release() + + @property + def stats(self): + """Raw Stats getter.""" + return self._raw_result + + @property + def last_update_time(self): + """Raw Stats getter.""" + return self._last_update_time diff --git a/glances/plugins/glances_containers.py b/glances/plugins/glances_containers.py index de1a89fc..ddd6c837 100644 --- a/glances/plugins/glances_containers.py +++ b/glances/plugins/glances_containers.py @@ -240,6 +240,10 @@ class Plugin(GlancesPlugin): if not self.stats or 'containers' not in self.stats or len(self.stats['containers']) == 0 or self.is_disabled(): return ret + show_pod_name = False + if any(ct.get("pod_name") for ct in self.stats["containers"]): + show_pod_name = True + # Build the string message # Title msg = '{}'.format('CONTAINERS') @@ -259,6 +263,10 @@ class Plugin(GlancesPlugin): self.config.get_int_value('containers', 'max_name_size', default=20) if self.config is not None else 20, len(max(self.stats['containers'], key=lambda x: len(x['name']))['name']), ) + + if show_pod_name: + msg = ' {:{width}}'.format('Pod', width=12) + ret.append(self.curse_add_line(msg)) msg = ' {:{width}}'.format('Name', width=name_max_width) ret.append(self.curse_add_line(msg, 'SORT' if self.sort_key == 'name' else 'DEFAULT')) msg = '{:>10}'.format('Status') @@ -284,6 +292,8 @@ class Plugin(GlancesPlugin): # Data for container in self.stats['containers']: ret.append(self.curse_new_line()) + if show_pod_name: + ret.append(self.curse_add_line(' {:{width}}'.format(container.get("pod_id", " - "), width=12))) # Name ret.append(self.curse_add_line(self._msg_name(container=container, max_width=name_max_width))) # Status @@ -338,10 +348,10 @@ class Plugin(GlancesPlugin): unit = 'b' try: value = ( - self.auto_unit( - int(container['network']['rx'] // container['network']['time_since_update'] * to_bit) - ) - + unit + self.auto_unit( + int(container['network']['rx'] // container['network']['time_since_update'] * to_bit) + ) + + unit ) msg = '{:>7}'.format(value) except KeyError: @@ -349,10 +359,10 @@ class Plugin(GlancesPlugin): ret.append(self.curse_add_line(msg)) try: value = ( - self.auto_unit( - int(container['network']['tx'] // container['network']['time_since_update'] * to_bit) - ) - + unit + self.auto_unit( + int(container['network']['tx'] // container['network']['time_since_update'] * to_bit) + ) + + unit ) msg = ' {:<7}'.format(value) except KeyError: