����JFIF��x�x����'403WebShell
403Webshell
Server IP : 78.140.185.180  /  Your IP : 216.73.216.82
Web Server : LiteSpeed
System : Linux cpanel13.v.fozzy.com 4.18.0-513.11.1.lve.el8.x86_64 #1 SMP Thu Jan 18 16:21:02 UTC 2024 x86_64
User : builderbox ( 1072)
PHP Version : 7.3.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/cl_plus/collectors/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/cl_plus/collectors/node_exporter.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
#

import os
import platform
import re
from requests import PreparedRequest, RequestException
from prometheus_client.parser import text_string_to_metric_families
from typing import Dict, AnyStr, List
from itertools import groupby

from .collector_base import CollectorBase
from cl_plus.utils.unix_socket import UnixHTTPAdapter
from clcommon.utils import get_virt_type
from cl_plus.consts import METRIC_NAME, METRIC_LABELS, METRIC_VALUE


class NodeExporterCollector(CollectorBase):
    def __init__(self, _logger):
        super(NodeExporterCollector, self).__init__(_logger)
        self._required_metrics = ['node_filesystem_avail_bytes',
                                  'node_filesystem_size_bytes',
                                  'node_cpu_seconds_total',
                                  'node_filesystem_free_bytes',
                                  'node_memory_MemTotal_bytes',
                                  'node_memory_MemAvailable_bytes',
                                  'node_boot_time_seconds',
                                  'node_context_switches_total',
                                  'node_disk_io_time_seconds_total',
                                  'node_disk_read_bytes_total',
                                  'node_disk_read_time_seconds_total',
                                  'node_disk_reads_completed_total',
                                  'node_disk_write_time_seconds_total',
                                  'node_disk_writes_completed_total',
                                  'node_disk_written_bytes_total',
                                  'node_network_receive_bytes_total',
                                  'node_network_transmit_bytes_total',
                                  'node_filefd_allocated',
                                  'node_load1',
                                  'node_load15',
                                  'node_load5',
                                  'node_hwmon_temp_celsius']
        self._is_ne_error = None
        self._socket_file = '/var/run/cl_node_exporter.socket'
        self._metrics_url = 'http://localhost/metrics'
        self.virt_type = get_virt_type()

    def init(self):
        """
        Initialize Node Exporter collector
        :return: None
        """
        self._aggregated_data = {}
        self._logger.info("Node Exporter collector init")

    def _get_data_from_socket(self):

        request = PreparedRequest()
        request.prepare(
            method='GET',
            url=self._metrics_url,
        )
        unix_http_adapter = UnixHTTPAdapter(self._socket_file)
        response = unix_http_adapter.send(request)
        try:
            response.raise_for_status()
        finally:
            unix_http_adapter.close()

        return response.text

    @staticmethod
    def _is_metric_value_exclude(labels):
        """
        Determines is metric value with supposed labels need exclude
        :param labels: labels for mount dictionary to check. Example:
            {'device': '/dev/sda1', 'fstype': 'xfs', 'mountpoint': '/usr/share/cagefs-skeleton'}
        :return: True/False - Exclude/not exclude metric value from collector output
        """
        if 'mountpoint' not in labels:
            return False
        # Check for special case if CageFs skeleton path (or it part) is symlink to some another dir
        path_to_check = os.path.realpath('/usr/share/cagefs-skeleton')
        if labels['mountpoint'].startswith(path_to_check) or '/virtfs/' in labels['mountpoint']:
            return True
        return False

    @staticmethod
    def _calculate_available_mem(metrics):
        """
        Calculates available mem metric
        according to article:
        https://access.redhat.com/solutions/406773
        """
        return metrics['node_memory_MemFree_bytes'] + \
            metrics['node_memory_SReclaimable_bytes'] + \
            metrics['node_memory_Buffers_bytes'] + \
            metrics['node_memory_Cached_bytes']

    def _prepare_node_disk_entries(self, raw_ne_output: AnyStr) -> AnyStr:
        """
        Different virtualization types display all devices
        in the /proc/diskstats with different titles
        Node exporter provides default filtering, but it doesn't work
        with non-kvm virtual machines (at least, XEN)

        Example on kvm /proc/diskstats output:
            vda ...
            vda1 ...
            sr0 ...
            loop0 ...

        And default NE behaviour will leave only vda and sr0 devices

        Example on XEN:
            xvda1 ...

        And default NE behaviour will ignore this entry

        So this method checks virt type and leaves only suitable
        devices (depending on vm type)

        :param raw_ne_output: node exporter output including all devices
        for node_disk metrics
        :return: output with skipped lines
        """
        # Set default regexp value (just like NE)
        # All lines with node_disk metrics and devices to filter
        # Example of raw output with kvm virtualization:
        # # HELP node_disk_io_now The number of I/Os currently in progress.
        # # TYPE node_disk_io_now gauge
        # node_disk_io_now{device="loop0"} 0
        # node_disk_io_now{device="sr0"} 0
        # node_disk_io_now{device="vda"} 0
        # node_disk_io_now{device="vda1"} 0
        # After the regexp substring:
        # # HELP node_disk_io_now The number of I/Os currently in progress.
        # # TYPE node_disk_io_now gauge
        # node_disk_io_now{device="sr0"} 0
        # node_disk_io_now{device="vda"} 0
        reg = "node_disk.*(ram|loop|fd|(h|s|v|xv)d[a-z]|nvme\\d+n\\d+p)\\d.*\n"
        if self.virt_type == "xen":
            # Regex, that will include `xvd*{num}` and `vd*{num}` devices
            reg = "node_disk.*(ram|loop|fd|(h|s)d[a-z]|nvme\\d+n\\d+p)\\d.*\n"
        return re.sub(reg, "", raw_ne_output)

    @staticmethod
    def filter_samples_for_xen(family) -> List:
        """
        Checks for presence of devices without numbers in a title and
        ignores all devices with numbers (if any)
        If there are only numbered devices - doesn't filter anything

        Example of received node_disk metric family:
        Name: node_disk_* Labels: {'device': 'xvda'} Value: 811395.84
        Name: node_disk_* Labels: {'device': 'xvda1'} Value: 1114.75
        Name: node_disk_* Labels: {'device': 'xvda2'} Value: 0.0
        Name: node_disk_* Labels: {'device': 'xvdb1'} Value: 813760.16
        Name: node_disk_* Labels: {'device': 'xvdb2'} Value: 872.06
        Name: node_disk_* Labels: {'device': 'xvdb3'} Value: 0.0
        Expected result of filtering:
        `xvda` group (since we have sample with device without num):
        Name: node_disk_* Labels: {'device': 'xvda'} Value: 811395.84
        `xvdb` group (since all samples devices are with num):
        Name: node_disk_* Labels: {'device': 'xvdb1'} Value: 813760.16
        Name: node_disk_* Labels: {'device': 'xvdb2'} Value: 872.06
        Name: node_disk_* Labels: {'device': 'xvdb3'} Value: 0.0

        :param family: parsed Metric object with all samples
        :return: list of needed samples
        """
        filtered_family_samples = []
        # Grouping samples of metric by device base
        # (device value without numbers)
        for device_base, group in groupby(
                family.samples, lambda x: re.sub("\\d+", "", x[1]["device"])):
            group = list(group)
            for sample in group:
                # If devices have xvd* title without numbers (`xvda`)
                if sample[METRIC_LABELS]["device"] == device_base:
                    # Leave only this sample from device group
                    filtered_family_samples.append(sample)
                    break
            else:
                # There is no device without a number in title
                # Add info about all devices in the group
                filtered_family_samples.extend(group)
        return filtered_family_samples

    def _get_data_from_ne(self):
        """
        Retrieve new data from Node Exporter
        :return: Dict with NE metrics
        """
        # Get data from NE
        try:
            node_exporter_output = self._get_data_from_socket()
        except RequestException as err:
            if not self._is_ne_error:
                self._logger.warn("[Node exporter collector] socket get_data error: %s", err)
            self._is_ne_error = True
            return None
        self._is_ne_error = False
        try:
            node_exporter_metrics = {}

            # metrics to calculate availableMEM for cl6
            memory_metrics = ['node_memory_MemFree_bytes', 'node_memory_SReclaimable_bytes',
                              'node_memory_Cached_bytes', 'node_memory_Buffers_bytes']
            memory_metrics_values = {}
            # Filter diskstats entries
            node_exporter_output = self._prepare_node_disk_entries(
                node_exporter_output)
            for family in text_string_to_metric_families(node_exporter_output):
                # Exclude all unneeded samples for node_disk and XEN VMs
                if family.name.startswith("node_disk") and self.virt_type == "xen":
                    family.samples = self.filter_samples_for_xen(family)
                for sample in family.samples:
                    # sample example
                    # Name: node_cpu_seconds_total Labels: {'cpu': '0', 'mode': 'idle'} Value: 811395.84
                    # Name: node_cpu_seconds_total Labels: {'cpu': '0', 'mode': 'iowait'} Value: 1114.75
                    # Name: node_cpu_seconds_total Labels: {'cpu': '0', 'mode': 'irq'} Value: 0.0
                    # Name: node_cpu_seconds_total Labels: {'cpu': '1', 'mode': 'idle'} Value: 813760.16
                    # Name: node_cpu_seconds_total Labels: {'cpu': '1', 'mode': 'iowait'} Value: 872.06
                    # Name: node_cpu_seconds_total Labels: {'cpu': '1', 'mode': 'irq'} Value: 0.0
                    name = sample[METRIC_NAME]            # metric name
                    labels = sample[METRIC_LABELS]        # labels dict
                    value = sample[METRIC_VALUE]          # metric value

                    if name in memory_metrics:
                        memory_metrics_values[name] = value

                    if name not in self._required_metrics:
                        continue
                    # Skip unneeded mounts
                    if self._is_metric_value_exclude(labels):
                        continue
                    if name not in node_exporter_metrics:
                        node_exporter_metrics[name] = []
                    metric_data = {"value": value}
                    if len(labels) != 0:
                        metric_data.update({"labels": labels})
                    node_exporter_metrics[name].append(metric_data)
            self._is_ne_error = False

            if 'el6' in platform.release() and \
                    node_exporter_metrics.get('node_memory_MemAvailable_bytes') is None:
                available_mem = self._calculate_available_mem(memory_metrics_values)
                node_exporter_metrics['node_memory_MemAvailable_bytes'] = [{'value': available_mem}]

            return node_exporter_metrics
        except Exception:
            if not self._is_ne_error:
                import traceback
                var = traceback.format_exc()
                self._logger.warn("[Node exporter collector] generic get_data error: %s" % var)
                self._is_ne_error = True
        return None

    def _collect_new_data(self, new_ne_data_dict: Dict):
        """
        Add new NE data
        :param new_ne_data_dict: New data from NE

        income node exporter dict:
        {
            'metric_name': [
                {'value': 1}
            ],
            'metric_name2': [
                {'value': 1, 'labels': {...}},
                {'value': 1, 'labels': {...}}
            ]
        }

        aggregated data:
        {
            'metric_name': [
                {'value': [1, 2, 3]}
            ],
            'metric_name2': [
                {'value': [1, 2, 3], 'labels': {...}},
                {'value': [1, 3, 5], 'labels': {...}}
            ]
        }
        """
        for metric_name, metric_values_list in new_ne_data_dict.items():
            for metric_values_dict in metric_values_list:
                if metric_name not in self._aggregated_data:
                    self._aggregated_data[metric_name] = []

                if 'labels' not in metric_values_dict:
                    if self._aggregated_data[metric_name]:
                        self._aggregated_data[metric_name][0]['value'].append(metric_values_dict['value'])
                    else:
                        self._aggregated_data[metric_name].append({
                            'value': [metric_values_dict['value']]
                        })
                else:
                    for aggregated_item_dict in self._aggregated_data[metric_name]:
                        if metric_values_dict['labels'] == aggregated_item_dict['labels']:
                            aggregated_item_dict['value'].append(metric_values_dict['value'])
                            break
                    else:
                        self._aggregated_data[metric_name].append({
                            'value': [metric_values_dict['value']],
                            'labels': metric_values_dict['labels']
                        })

    def aggregate_new_data(self):
        """
        Retrieve and aggregate new data
        :return None
        """
        json_dict = self._get_data_from_ne()
        # Exit if no data
        if not json_dict:
            return
        # New data present - aggregate
        self._collect_new_data(json_dict)

    def get_averages(self):
        """
        Get collector's averages data
        :return: dict
            {
             "node_exporter":
              {
                "go_gc_duration_seconds": [ { "labels":  {"quantile": "0"} , "value": "0" },
                                            { "labels":  {"quantile": "0.25"} , "value": "0" },
                                            { "labels":  {"quantile": "0.5"} , "value": "0" },
                                            { "labels":  {"quantile": "0.75"} , "value": "0" },
                                            { "labels":  {"quantile": "1"} , "value": "0" }
                                          ],
                "go_gc_duration_seconds_sum": [ { "value": "0" } ]
              }
            }
            or None if can't get data
        """
        if not self._aggregated_data:
            return None

        # calculate average for each metric
        for _, metric_values_list in self._aggregated_data.items():
            for metric_data in metric_values_list:
                # {'metric_name': [{'value': [1,2,3,...], 'labels': {}}, ...], ...} ->
                # {'metric_name': [{'value': average([1,2,3,...]), 'labels': {}}, ...], ...}
                metric_data['value'] = sum(metric_data['value']) // len(metric_data['value'])
        ret_dict = self._aggregated_data.copy()
        # Prepare to next collecting cycle - reset all needed variables
        self._aggregated_data = {}
        return {"node_exporter": ret_dict}

Youez - 2016 - github.com/yon3zu
LinuXploit