����JFIF��x�x����'403WebShell
403Webshell
Server IP : 78.140.185.180  /  Your IP : 216.73.216.82
Web Server : LiteSpeed
System : Linux cpanel13.v.fozzy.com 4.18.0-513.11.1.lve.el8.x86_64 #1 SMP Thu Jan 18 16:21:02 UTC 2024 x86_64
User : builderbox ( 1072)
PHP Version : 7.3.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/cl_plus/daemon/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/cl_plus/daemon//daemon.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
#

import sys
import time
import logging
import signal
from typing import Optional

import requests
import json
import os

from urllib.parse import urlparse
from .logsetup import setup_logging
from cl_plus.collectors.node_exporter import NodeExporterCollector
from cl_plus.collectors.lve_stats import LveStatsCollector
from cl_plus.collectors.apache import ApacheCollector
from cl_plus.collectors.mysql import MySQLCollector
from clcommon.lib.consts import DEFAULT_PUSHGATEWAY_URL, DEFAULT_PUSHGATEWAY_CHECK_URL, PUSHGATEWAY_ADDRESS
import clcommon.lib.jwt_token as jwt_token

from clcommon.lib.cmt_utils import log_request_error_debug_info
from clcommon.utils import get_file_lines


_log = None
_collectors_list = None

SECONDS_IN_MINUTE = 60
MAX_TIME_OF_PREVIOUS_EXCEPTION = SECONDS_IN_MINUTE * 10
DATA_AGGREGATIONS_PER_MINUTE = 60
SLEEP_TIME_BETWEEN_AGGREGATIONS = \
    SECONDS_IN_MINUTE // DATA_AGGREGATIONS_PER_MINUTE
JWT_VALIDATION_INTERVAL = 5


class SenderDaemon:
    def __init__(self):
        setup_logging(console_level=logging.CRITICAL)
        global _log, _collectors_list
        _log = self._log = logging.getLogger('CL+_sender_daemon')
        # Collectors list
        _collectors_list = self.collectors_list = [NodeExporterCollector(self._log), LveStatsCollector(self._log),
                                                   ApacheCollector(self._log), MySQLCollector(self._log)]

        # validate of JWT token is failed
        self.jwt_token_error = False
        self.jwt_token = None
        self.is_test_mode = False
        # Pushgateway store URL
        self._pushgateway_url = DEFAULT_PUSHGATEWAY_URL
        # Pushgateway check URL
        self._pushgateway_check_url = DEFAULT_PUSHGATEWAY_CHECK_URL
        # Token is allowed to send the data
        self._token_ready = True
        # Counter for failed requests to pushgateway
        self._error_counter = 0
        # template error message
        self._template_error_message = 'Error sending data to ' \
                                       'pushgateway. HTTP response code: ' \
                                       '%s, Response text: %s'
        self._json_logging_flag_file = '/var/lve/cmt_debug_logging'
        self.component = 'es_sender'
        self.count_data_aggregation = 0

    def init(self):
        """
        Initialise daemon
        :return:
        """
        self._log.info('Initializing collectors')
        # initialize all collectors
        for collector in self.collectors_list:
            collector.init()

    def _validate_jwt_token(
            self,
            validation_period: int,
    ) -> bool:
        """
        Validate JWT token and write warning message if it's failed
        :param validation_period: period of JWT token validation
        :return: token is valid
        """

        token_is_valid, token_error_msg, es_jwt_token = \
            jwt_token.jwt_token_check()
        if not token_is_valid:
            if not self.jwt_token_error:
                self._log.warning(
                    'JWT token error: %s.\n'
                    'Waiting for another %s seconds before next '
                    'attempt to collect statistics.',
                    token_error_msg,
                    validation_period,
                )
            self.jwt_token_error = True
        else:
            self.jwt_token_error = False
        self.jwt_token = es_jwt_token
        return token_is_valid

    def _log_error_from_main_daemon_loop(
            self,
            previous_exc_time: float,
            previous_exc: Optional[Exception],
            current_exc: Exception,
    ):
        """
        Logging an exception from main daemon loop if a previous one was raised
        more than ten minutes ago or had another type than a current one.
        :param previous_exc_time: time of previous exception raising
        :param previous_exc: previous raised exception
        :param current_exc: current raised exception
        """
        if time.time() - previous_exc_time > \
                MAX_TIME_OF_PREVIOUS_EXCEPTION or \
                type(current_exc) != type(previous_exc):
            self._log.exception(current_exc)

    def _sleep_after_main_daemon_loop(self, begin_time_of_loop: float) -> None:
        """
        :param begin_time_of_loop: the time of loop start
        """
        elapsed_time = time.time() - begin_time_of_loop
        # We skip the getting data because wrong JWT token
        # and should sleep the whole minute
        if self.jwt_token_error:
            sleep_time = SECONDS_IN_MINUTE - elapsed_time
        else:
            sleep_time = SLEEP_TIME_BETWEEN_AGGREGATIONS - elapsed_time
        sleep_time = 0 if sleep_time < 0 else sleep_time
        time.sleep(sleep_time)

    def _run_in_prod_mode(self, _unittest_mode=False) -> None:
        """
        Run daemon in production mode:
        1. Validate ES JWT token
        1.1 Log the warning if it isn't valid and wait for next minute
        2. Aggregate data from collectors
        3. Wait for next 5 seconds
        4. We calculate averages from collectors and send data to pushgateway
           if we have aggregated data for one past minute
        5. We logging an exception from main daemon loop if an previous one
           was raised more than ten minutes ago and had another type than
           a current one
        :param _unittest_mode: USE FOR ONLY UNITTESTS. We should break cycle
            after 12 reps if it's True
        """
        previous_exc = None
        previous_exc_time = 0

        while True:
            begin_time_of_loop = time.time()
            try:
                # Check and read JWT token
                if self.count_data_aggregation % JWT_VALIDATION_INTERVAL == 0 \
                        and not self._validate_jwt_token(SECONDS_IN_MINUTE):
                    continue
                self._aggregate_new_data()
                self.count_data_aggregation += 1
                if self.count_data_aggregation == DATA_AGGREGATIONS_PER_MINUTE:
                    # Retrieve data to send
                    data_to_send = self._get_averages()
                    data_to_send.update(
                        {
                            'timestamp': int(time.time() * 1000)
                        }
                    )
                    self._check_disk_metrics_present(data_to_send)
                    self._send_data_to_pushgateway(
                        data_to_send,
                        self.jwt_token
                    )
                    if os.path.isfile(self._json_logging_flag_file):
                        self._log.info(
                            'Sending to pushgateway: %s',
                            data_to_send
                        )
            except Exception as err:
                self._log_error_from_main_daemon_loop(
                    previous_exc_time,
                    previous_exc,
                    err,
                )
                previous_exc_time = time.time()
                previous_exc = err
            finally:
                self._sleep_after_main_daemon_loop(begin_time_of_loop)
                if self.count_data_aggregation == DATA_AGGREGATIONS_PER_MINUTE:
                    self.count_data_aggregation = 0
                    if _unittest_mode:
                        break

    def _run_in_test_mode(self) -> None:
        """
        Run daemon in test mode.
        We collect data one time and print it to stdout
        All of exceptions will be logged to stdout
        """
        try:
            # Check and read JWT token
            self._aggregate_new_data()
            # Retrieve data to send
            data_to_send = self._get_averages()
            data_to_send.update(
                {
                    'timestamp': int(time.time() * 1000)
                }
            )
            print(json.dumps(data_to_send))
            self._log.info("Test mode - exit")
        except Exception as err:
            print(err)

    def run(self):
        """
        Main daemon loop
        """
        if self.is_test_mode:
            self._run_in_test_mode()
        else:
            self._run_in_prod_mode()

    def _aggregate_new_data(self):
        """
        Aggregate new data from each collector
        """
        for collector in self.collectors_list:
            if self.count_data_aggregation % collector.collection_interval == 0:
                collector.aggregate_new_data()

    def _get_averages(self):
        """
        Get averages from each collector
        :return: dict
        """
        data_to_send = {}
        for collector in self.collectors_list:
            collector_data = collector.get_averages()
            if collector_data:
                data_to_send.update(collector_data)
        return data_to_send

    def _check_disk_metrics_present(self, metrics):
        """
        Checks that node_disk_* metrics are in node_exporter dict
        and send those debug info to sentry if not found:
         - content of /proc/diskstats
         - content of node_exporter logs - it may contain some parse errors
        """
        node_exporter_metrics = metrics.get('node_exporter', {}).keys()
        disk_stat_metrics = ['node_disk_io_time_seconds_total',
                             'node_disk_read_bytes_total',
                             'node_disk_read_time_seconds_total',
                             'node_disk_reads_completed_total',
                             'node_disk_write_time_seconds_total',
                             'node_disk_writes_completed_total',
                             'node_disk_written_bytes_total']
        if not all(metric in node_exporter_metrics for metric in disk_stat_metrics):
            node_exporter_logs = get_file_lines('/var/log/node_exporter.log')
            self._log.error("Disk metrics were not collected\nmetrics: %s\n "
                            "/proc/diskstats content:\n %s \n"
                            "NE logs:\n %s", str(node_exporter_metrics),
                            '\n'.join(get_file_lines('/proc/diskstats')),
                            '\n'.join(node_exporter_logs[-50:]))

    def send_request(self, url, data, headers, timeout):
        """
        does several attempts for sending request
        """
        attempts = 5
        for i in range(attempts):
            try:
                return requests.post(url, json=data, headers=headers, timeout=timeout)
            except requests.ConnectionError as e:
                error = e
                self._log.warning('Failed to send POST request to %s', url)
                time.sleep(1)

        else:
            self._log.error('Sending retries over, still no success: %s', str(error))
            raise error

    def _send_data_to_pushgateway(self, data_dict: dict, es_jwt_token: str):
        """
        Send data to pushgateway
        :param data_dict: Data to send
        :param  es_jwt_token: JWT token for authenticate on pushgateway
        :return: None
        """
        if self._token_ready:
            try:
                r = self.send_request(self._pushgateway_url, data_dict, {'X-Auth': es_jwt_token}, 25)
                if r.status_code == 200:
                    self._error_counter = 0
                elif r.status_code == 403:
                    self._token_ready = False
                    # We don't need to send 403 errors to Sentry, since
                    # they just show, that not allowed to send stats
                    self._log.warning(self._template_error_message, r.status_code,
                                      r.content.decode('utf-8'),
                                      extra=dict(code=r.status_code))
                elif r.status_code == 500:
                    self._log.warning(self._template_error_message, r.status_code,
                                      r.content.decode('utf-8'),
                                      extra=dict(code=r.status_code))
                else:
                    self._error_counter += 1
                    if self._error_counter % 10 == 0:
                        self._log.error(self._template_error_message, r.status_code,
                                        r.content.decode('utf-8'),
                                        extra=dict(code=r.status_code,
                                                   data=data_dict))

            except requests.exceptions.RequestException as e:
                # 'https://cm.cloudlinux.com' -> cm.cloudlinux.com
                cm_domain = urlparse(PUSHGATEWAY_ADDRESS).hostname
                log_request_error_debug_info(self.component, self._log, e, cm_domain)
        else:
            try:
                r = requests.get(self._pushgateway_check_url, headers={'X-Auth': es_jwt_token}, timeout=25)
                if r.status_code == 200:
                    self._token_ready = True
                    self._error_counter = 0
                elif r.status_code == 403:
                    # We don't need to send 403 errors to Sentry, since
                    # they just show, that not allowed to send stats
                    self._log.warning(self._template_error_message, r.status_code,
                                      r.content.decode('utf-8'),
                                      extra=dict(code=r.status_code))
                else:  # we are switch to normal workflow cause problem not in disallowed token
                    self._token_ready = True
            except requests.exceptions.RequestException as e:
                # 'https://cm.cloudlinux.com' -> cm.cloudlinux.com
                cm_domain = urlparse(PUSHGATEWAY_ADDRESS).hostname
                log_request_error_debug_info(self.component, self._log, e, cm_domain)

    def shutdown(self):
        """
        Shutdown daemon
        :return:
        """
        self._log.info('Shutting down collectors')
        # shutdown all collectors
        for collector in self.collectors_list:
            collector.shutdown()


def daemon_main(_opts):
    daemon = SenderDaemon()
    daemon.is_test_mode = _opts.test
    # Remember current excepthook
    old_excepthook = sys.excepthook

    def custom_excepthook(_type, _ex, _trace):
        """
        Custom handler for all possible daemon unhandled exceptions
        """
        _log.exception("There is an unhandled exception: %s", _ex)
        return old_excepthook(_type, _ex, _trace)

    # Set custom handler for exceptions
    sys.excepthook = custom_excepthook
    # Run as utility
    sigs = set(signal.Signals)
    # ignore all signals which not Term or Core defined in https://www.man7.org/linux/man-pages/man7/signal.7.html
    # plus ignore SIGUSRx
    # SIGKILL must be ignored as well:
    # https://stackoverflow.com/questions/30732683/python-sigkill-catching-strategies/30732997#30732997
    ignore_sigs = {signal.SIGUSR1, signal.SIGUSR2, signal.SIGCHLD, signal.SIGCLD, signal.SIGURG, signal.SIGCONT,
                   signal.SIGWINCH, signal.SIGSTOP, signal.SIGTSTP, signal.SIGTTIN, signal.SIGTTOU, signal.SIGKILL}
    for sig in sigs - ignore_sigs:
        try:
            signal.signal(sig, signal_handler)
        except (ValueError, OSError, RuntimeError) as exc:
            _log.error("There is an error with signal %s. Error: %s",
                       signal.Signals(sig).name, str(exc))
            continue
    daemon.init()
    daemon.run()


def signal_handler(sig, frame):
    print('Signal {} received. Terminating'.format(signal.Signals(sig).name))
    _log.info('Shutting down collectors. Signal {} received.'.format(signal.Signals(sig).name))
    # shutdown all collectors
    for collector in _collectors_list:
        collector.shutdown()
    sys.exit(0)

Youez - 2016 - github.com/yon3zu
LinuXploit