����JFIF��x�x����'
Server IP : 78.140.185.180 / Your IP : 216.73.216.82 Web Server : LiteSpeed System : Linux cpanel13.v.fozzy.com 4.18.0-513.11.1.lve.el8.x86_64 #1 SMP Thu Jan 18 16:21:02 UTC 2024 x86_64 User : builderbox ( 1072) PHP Version : 7.3.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /opt/cloudlinux/venv/lib64/python3.11/site-packages/cl_plus/collectors/ |
Upload File : |
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # import os import platform import re from requests import PreparedRequest, RequestException from prometheus_client.parser import text_string_to_metric_families from typing import Dict, AnyStr, List from itertools import groupby from .collector_base import CollectorBase from cl_plus.utils.unix_socket import UnixHTTPAdapter from clcommon.utils import get_virt_type from cl_plus.consts import METRIC_NAME, METRIC_LABELS, METRIC_VALUE class NodeExporterCollector(CollectorBase): def __init__(self, _logger): super(NodeExporterCollector, self).__init__(_logger) self._required_metrics = ['node_filesystem_avail_bytes', 'node_filesystem_size_bytes', 'node_cpu_seconds_total', 'node_filesystem_free_bytes', 'node_memory_MemTotal_bytes', 'node_memory_MemAvailable_bytes', 'node_boot_time_seconds', 'node_context_switches_total', 'node_disk_io_time_seconds_total', 'node_disk_read_bytes_total', 'node_disk_read_time_seconds_total', 'node_disk_reads_completed_total', 'node_disk_write_time_seconds_total', 'node_disk_writes_completed_total', 'node_disk_written_bytes_total', 'node_network_receive_bytes_total', 'node_network_transmit_bytes_total', 'node_filefd_allocated', 'node_load1', 'node_load15', 'node_load5', 'node_hwmon_temp_celsius'] self._is_ne_error = None self._socket_file = '/var/run/cl_node_exporter.socket' self._metrics_url = 'http://localhost/metrics' self.virt_type = get_virt_type() def init(self): """ Initialize Node Exporter collector :return: None """ self._aggregated_data = {} self._logger.info("Node Exporter collector init") def _get_data_from_socket(self): request = PreparedRequest() request.prepare( method='GET', url=self._metrics_url, ) unix_http_adapter = UnixHTTPAdapter(self._socket_file) response = unix_http_adapter.send(request) try: response.raise_for_status() finally: unix_http_adapter.close() return response.text @staticmethod def _is_metric_value_exclude(labels): """ Determines is metric value with supposed labels need exclude :param labels: labels for mount dictionary to check. Example: {'device': '/dev/sda1', 'fstype': 'xfs', 'mountpoint': '/usr/share/cagefs-skeleton'} :return: True/False - Exclude/not exclude metric value from collector output """ if 'mountpoint' not in labels: return False # Check for special case if CageFs skeleton path (or it part) is symlink to some another dir path_to_check = os.path.realpath('/usr/share/cagefs-skeleton') if labels['mountpoint'].startswith(path_to_check) or '/virtfs/' in labels['mountpoint']: return True return False @staticmethod def _calculate_available_mem(metrics): """ Calculates available mem metric according to article: https://access.redhat.com/solutions/406773 """ return metrics['node_memory_MemFree_bytes'] + \ metrics['node_memory_SReclaimable_bytes'] + \ metrics['node_memory_Buffers_bytes'] + \ metrics['node_memory_Cached_bytes'] def _prepare_node_disk_entries(self, raw_ne_output: AnyStr) -> AnyStr: """ Different virtualization types display all devices in the /proc/diskstats with different titles Node exporter provides default filtering, but it doesn't work with non-kvm virtual machines (at least, XEN) Example on kvm /proc/diskstats output: vda ... vda1 ... sr0 ... loop0 ... And default NE behaviour will leave only vda and sr0 devices Example on XEN: xvda1 ... And default NE behaviour will ignore this entry So this method checks virt type and leaves only suitable devices (depending on vm type) :param raw_ne_output: node exporter output including all devices for node_disk metrics :return: output with skipped lines """ # Set default regexp value (just like NE) # All lines with node_disk metrics and devices to filter # Example of raw output with kvm virtualization: # # HELP node_disk_io_now The number of I/Os currently in progress. # # TYPE node_disk_io_now gauge # node_disk_io_now{device="loop0"} 0 # node_disk_io_now{device="sr0"} 0 # node_disk_io_now{device="vda"} 0 # node_disk_io_now{device="vda1"} 0 # After the regexp substring: # # HELP node_disk_io_now The number of I/Os currently in progress. # # TYPE node_disk_io_now gauge # node_disk_io_now{device="sr0"} 0 # node_disk_io_now{device="vda"} 0 reg = "node_disk.*(ram|loop|fd|(h|s|v|xv)d[a-z]|nvme\\d+n\\d+p)\\d.*\n" if self.virt_type == "xen": # Regex, that will include `xvd*{num}` and `vd*{num}` devices reg = "node_disk.*(ram|loop|fd|(h|s)d[a-z]|nvme\\d+n\\d+p)\\d.*\n" return re.sub(reg, "", raw_ne_output) @staticmethod def filter_samples_for_xen(family) -> List: """ Checks for presence of devices without numbers in a title and ignores all devices with numbers (if any) If there are only numbered devices - doesn't filter anything Example of received node_disk metric family: Name: node_disk_* Labels: {'device': 'xvda'} Value: 811395.84 Name: node_disk_* Labels: {'device': 'xvda1'} Value: 1114.75 Name: node_disk_* Labels: {'device': 'xvda2'} Value: 0.0 Name: node_disk_* Labels: {'device': 'xvdb1'} Value: 813760.16 Name: node_disk_* Labels: {'device': 'xvdb2'} Value: 872.06 Name: node_disk_* Labels: {'device': 'xvdb3'} Value: 0.0 Expected result of filtering: `xvda` group (since we have sample with device without num): Name: node_disk_* Labels: {'device': 'xvda'} Value: 811395.84 `xvdb` group (since all samples devices are with num): Name: node_disk_* Labels: {'device': 'xvdb1'} Value: 813760.16 Name: node_disk_* Labels: {'device': 'xvdb2'} Value: 872.06 Name: node_disk_* Labels: {'device': 'xvdb3'} Value: 0.0 :param family: parsed Metric object with all samples :return: list of needed samples """ filtered_family_samples = [] # Grouping samples of metric by device base # (device value without numbers) for device_base, group in groupby( family.samples, lambda x: re.sub("\\d+", "", x[1]["device"])): group = list(group) for sample in group: # If devices have xvd* title without numbers (`xvda`) if sample[METRIC_LABELS]["device"] == device_base: # Leave only this sample from device group filtered_family_samples.append(sample) break else: # There is no device without a number in title # Add info about all devices in the group filtered_family_samples.extend(group) return filtered_family_samples def _get_data_from_ne(self): """ Retrieve new data from Node Exporter :return: Dict with NE metrics """ # Get data from NE try: node_exporter_output = self._get_data_from_socket() except RequestException as err: if not self._is_ne_error: self._logger.warn("[Node exporter collector] socket get_data error: %s", err) self._is_ne_error = True return None self._is_ne_error = False try: node_exporter_metrics = {} # metrics to calculate availableMEM for cl6 memory_metrics = ['node_memory_MemFree_bytes', 'node_memory_SReclaimable_bytes', 'node_memory_Cached_bytes', 'node_memory_Buffers_bytes'] memory_metrics_values = {} # Filter diskstats entries node_exporter_output = self._prepare_node_disk_entries( node_exporter_output) for family in text_string_to_metric_families(node_exporter_output): # Exclude all unneeded samples for node_disk and XEN VMs if family.name.startswith("node_disk") and self.virt_type == "xen": family.samples = self.filter_samples_for_xen(family) for sample in family.samples: # sample example # Name: node_cpu_seconds_total Labels: {'cpu': '0', 'mode': 'idle'} Value: 811395.84 # Name: node_cpu_seconds_total Labels: {'cpu': '0', 'mode': 'iowait'} Value: 1114.75 # Name: node_cpu_seconds_total Labels: {'cpu': '0', 'mode': 'irq'} Value: 0.0 # Name: node_cpu_seconds_total Labels: {'cpu': '1', 'mode': 'idle'} Value: 813760.16 # Name: node_cpu_seconds_total Labels: {'cpu': '1', 'mode': 'iowait'} Value: 872.06 # Name: node_cpu_seconds_total Labels: {'cpu': '1', 'mode': 'irq'} Value: 0.0 name = sample[METRIC_NAME] # metric name labels = sample[METRIC_LABELS] # labels dict value = sample[METRIC_VALUE] # metric value if name in memory_metrics: memory_metrics_values[name] = value if name not in self._required_metrics: continue # Skip unneeded mounts if self._is_metric_value_exclude(labels): continue if name not in node_exporter_metrics: node_exporter_metrics[name] = [] metric_data = {"value": value} if len(labels) != 0: metric_data.update({"labels": labels}) node_exporter_metrics[name].append(metric_data) self._is_ne_error = False if 'el6' in platform.release() and \ node_exporter_metrics.get('node_memory_MemAvailable_bytes') is None: available_mem = self._calculate_available_mem(memory_metrics_values) node_exporter_metrics['node_memory_MemAvailable_bytes'] = [{'value': available_mem}] return node_exporter_metrics except Exception: if not self._is_ne_error: import traceback var = traceback.format_exc() self._logger.warn("[Node exporter collector] generic get_data error: %s" % var) self._is_ne_error = True return None def _collect_new_data(self, new_ne_data_dict: Dict): """ Add new NE data :param new_ne_data_dict: New data from NE income node exporter dict: { 'metric_name': [ {'value': 1} ], 'metric_name2': [ {'value': 1, 'labels': {...}}, {'value': 1, 'labels': {...}} ] } aggregated data: { 'metric_name': [ {'value': [1, 2, 3]} ], 'metric_name2': [ {'value': [1, 2, 3], 'labels': {...}}, {'value': [1, 3, 5], 'labels': {...}} ] } """ for metric_name, metric_values_list in new_ne_data_dict.items(): for metric_values_dict in metric_values_list: if metric_name not in self._aggregated_data: self._aggregated_data[metric_name] = [] if 'labels' not in metric_values_dict: if self._aggregated_data[metric_name]: self._aggregated_data[metric_name][0]['value'].append(metric_values_dict['value']) else: self._aggregated_data[metric_name].append({ 'value': [metric_values_dict['value']] }) else: for aggregated_item_dict in self._aggregated_data[metric_name]: if metric_values_dict['labels'] == aggregated_item_dict['labels']: aggregated_item_dict['value'].append(metric_values_dict['value']) break else: self._aggregated_data[metric_name].append({ 'value': [metric_values_dict['value']], 'labels': metric_values_dict['labels'] }) def aggregate_new_data(self): """ Retrieve and aggregate new data :return None """ json_dict = self._get_data_from_ne() # Exit if no data if not json_dict: return # New data present - aggregate self._collect_new_data(json_dict) def get_averages(self): """ Get collector's averages data :return: dict { "node_exporter": { "go_gc_duration_seconds": [ { "labels": {"quantile": "0"} , "value": "0" }, { "labels": {"quantile": "0.25"} , "value": "0" }, { "labels": {"quantile": "0.5"} , "value": "0" }, { "labels": {"quantile": "0.75"} , "value": "0" }, { "labels": {"quantile": "1"} , "value": "0" } ], "go_gc_duration_seconds_sum": [ { "value": "0" } ] } } or None if can't get data """ if not self._aggregated_data: return None # calculate average for each metric for _, metric_values_list in self._aggregated_data.items(): for metric_data in metric_values_list: # {'metric_name': [{'value': [1,2,3,...], 'labels': {}}, ...], ...} -> # {'metric_name': [{'value': average([1,2,3,...]), 'labels': {}}, ...], ...} metric_data['value'] = sum(metric_data['value']) // len(metric_data['value']) ret_dict = self._aggregated_data.copy() # Prepare to next collecting cycle - reset all needed variables self._aggregated_data = {} return {"node_exporter": ret_dict}