����JFIF��x�x����'
Server IP : 78.140.185.180 / Your IP : 216.73.216.82 Web Server : LiteSpeed System : Linux cpanel13.v.fozzy.com 4.18.0-513.11.1.lve.el8.x86_64 #1 SMP Thu Jan 18 16:21:02 UTC 2024 x86_64 User : builderbox ( 1072) PHP Version : 7.3.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /opt/cloudlinux/venv/lib64/python3.11/site-packages/cl_plus/daemon/ |
Upload File : |
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # import sys import time import logging import signal from typing import Optional import requests import json import os from urllib.parse import urlparse from .logsetup import setup_logging from cl_plus.collectors.node_exporter import NodeExporterCollector from cl_plus.collectors.lve_stats import LveStatsCollector from cl_plus.collectors.apache import ApacheCollector from cl_plus.collectors.mysql import MySQLCollector from clcommon.lib.consts import DEFAULT_PUSHGATEWAY_URL, DEFAULT_PUSHGATEWAY_CHECK_URL, PUSHGATEWAY_ADDRESS import clcommon.lib.jwt_token as jwt_token from clcommon.lib.cmt_utils import log_request_error_debug_info from clcommon.utils import get_file_lines _log = None _collectors_list = None SECONDS_IN_MINUTE = 60 MAX_TIME_OF_PREVIOUS_EXCEPTION = SECONDS_IN_MINUTE * 10 DATA_AGGREGATIONS_PER_MINUTE = 60 SLEEP_TIME_BETWEEN_AGGREGATIONS = \ SECONDS_IN_MINUTE // DATA_AGGREGATIONS_PER_MINUTE JWT_VALIDATION_INTERVAL = 5 class SenderDaemon: def __init__(self): setup_logging(console_level=logging.CRITICAL) global _log, _collectors_list _log = self._log = logging.getLogger('CL+_sender_daemon') # Collectors list _collectors_list = self.collectors_list = [NodeExporterCollector(self._log), LveStatsCollector(self._log), ApacheCollector(self._log), MySQLCollector(self._log)] # validate of JWT token is failed self.jwt_token_error = False self.jwt_token = None self.is_test_mode = False # Pushgateway store URL self._pushgateway_url = DEFAULT_PUSHGATEWAY_URL # Pushgateway check URL self._pushgateway_check_url = DEFAULT_PUSHGATEWAY_CHECK_URL # Token is allowed to send the data self._token_ready = True # Counter for failed requests to pushgateway self._error_counter = 0 # template error message self._template_error_message = 'Error sending data to ' \ 'pushgateway. HTTP response code: ' \ '%s, Response text: %s' self._json_logging_flag_file = '/var/lve/cmt_debug_logging' self.component = 'es_sender' self.count_data_aggregation = 0 def init(self): """ Initialise daemon :return: """ self._log.info('Initializing collectors') # initialize all collectors for collector in self.collectors_list: collector.init() def _validate_jwt_token( self, validation_period: int, ) -> bool: """ Validate JWT token and write warning message if it's failed :param validation_period: period of JWT token validation :return: token is valid """ token_is_valid, token_error_msg, es_jwt_token = \ jwt_token.jwt_token_check() if not token_is_valid: if not self.jwt_token_error: self._log.warning( 'JWT token error: %s.\n' 'Waiting for another %s seconds before next ' 'attempt to collect statistics.', token_error_msg, validation_period, ) self.jwt_token_error = True else: self.jwt_token_error = False self.jwt_token = es_jwt_token return token_is_valid def _log_error_from_main_daemon_loop( self, previous_exc_time: float, previous_exc: Optional[Exception], current_exc: Exception, ): """ Logging an exception from main daemon loop if a previous one was raised more than ten minutes ago or had another type than a current one. :param previous_exc_time: time of previous exception raising :param previous_exc: previous raised exception :param current_exc: current raised exception """ if time.time() - previous_exc_time > \ MAX_TIME_OF_PREVIOUS_EXCEPTION or \ type(current_exc) != type(previous_exc): self._log.exception(current_exc) def _sleep_after_main_daemon_loop(self, begin_time_of_loop: float) -> None: """ :param begin_time_of_loop: the time of loop start """ elapsed_time = time.time() - begin_time_of_loop # We skip the getting data because wrong JWT token # and should sleep the whole minute if self.jwt_token_error: sleep_time = SECONDS_IN_MINUTE - elapsed_time else: sleep_time = SLEEP_TIME_BETWEEN_AGGREGATIONS - elapsed_time sleep_time = 0 if sleep_time < 0 else sleep_time time.sleep(sleep_time) def _run_in_prod_mode(self, _unittest_mode=False) -> None: """ Run daemon in production mode: 1. Validate ES JWT token 1.1 Log the warning if it isn't valid and wait for next minute 2. Aggregate data from collectors 3. Wait for next 5 seconds 4. We calculate averages from collectors and send data to pushgateway if we have aggregated data for one past minute 5. We logging an exception from main daemon loop if an previous one was raised more than ten minutes ago and had another type than a current one :param _unittest_mode: USE FOR ONLY UNITTESTS. We should break cycle after 12 reps if it's True """ previous_exc = None previous_exc_time = 0 while True: begin_time_of_loop = time.time() try: # Check and read JWT token if self.count_data_aggregation % JWT_VALIDATION_INTERVAL == 0 \ and not self._validate_jwt_token(SECONDS_IN_MINUTE): continue self._aggregate_new_data() self.count_data_aggregation += 1 if self.count_data_aggregation == DATA_AGGREGATIONS_PER_MINUTE: # Retrieve data to send data_to_send = self._get_averages() data_to_send.update( { 'timestamp': int(time.time() * 1000) } ) self._check_disk_metrics_present(data_to_send) self._send_data_to_pushgateway( data_to_send, self.jwt_token ) if os.path.isfile(self._json_logging_flag_file): self._log.info( 'Sending to pushgateway: %s', data_to_send ) except Exception as err: self._log_error_from_main_daemon_loop( previous_exc_time, previous_exc, err, ) previous_exc_time = time.time() previous_exc = err finally: self._sleep_after_main_daemon_loop(begin_time_of_loop) if self.count_data_aggregation == DATA_AGGREGATIONS_PER_MINUTE: self.count_data_aggregation = 0 if _unittest_mode: break def _run_in_test_mode(self) -> None: """ Run daemon in test mode. We collect data one time and print it to stdout All of exceptions will be logged to stdout """ try: # Check and read JWT token self._aggregate_new_data() # Retrieve data to send data_to_send = self._get_averages() data_to_send.update( { 'timestamp': int(time.time() * 1000) } ) print(json.dumps(data_to_send)) self._log.info("Test mode - exit") except Exception as err: print(err) def run(self): """ Main daemon loop """ if self.is_test_mode: self._run_in_test_mode() else: self._run_in_prod_mode() def _aggregate_new_data(self): """ Aggregate new data from each collector """ for collector in self.collectors_list: if self.count_data_aggregation % collector.collection_interval == 0: collector.aggregate_new_data() def _get_averages(self): """ Get averages from each collector :return: dict """ data_to_send = {} for collector in self.collectors_list: collector_data = collector.get_averages() if collector_data: data_to_send.update(collector_data) return data_to_send def _check_disk_metrics_present(self, metrics): """ Checks that node_disk_* metrics are in node_exporter dict and send those debug info to sentry if not found: - content of /proc/diskstats - content of node_exporter logs - it may contain some parse errors """ node_exporter_metrics = metrics.get('node_exporter', {}).keys() disk_stat_metrics = ['node_disk_io_time_seconds_total', 'node_disk_read_bytes_total', 'node_disk_read_time_seconds_total', 'node_disk_reads_completed_total', 'node_disk_write_time_seconds_total', 'node_disk_writes_completed_total', 'node_disk_written_bytes_total'] if not all(metric in node_exporter_metrics for metric in disk_stat_metrics): node_exporter_logs = get_file_lines('/var/log/node_exporter.log') self._log.error("Disk metrics were not collected\nmetrics: %s\n " "/proc/diskstats content:\n %s \n" "NE logs:\n %s", str(node_exporter_metrics), '\n'.join(get_file_lines('/proc/diskstats')), '\n'.join(node_exporter_logs[-50:])) def send_request(self, url, data, headers, timeout): """ does several attempts for sending request """ attempts = 5 for i in range(attempts): try: return requests.post(url, json=data, headers=headers, timeout=timeout) except requests.ConnectionError as e: error = e self._log.warning('Failed to send POST request to %s', url) time.sleep(1) else: self._log.error('Sending retries over, still no success: %s', str(error)) raise error def _send_data_to_pushgateway(self, data_dict: dict, es_jwt_token: str): """ Send data to pushgateway :param data_dict: Data to send :param es_jwt_token: JWT token for authenticate on pushgateway :return: None """ if self._token_ready: try: r = self.send_request(self._pushgateway_url, data_dict, {'X-Auth': es_jwt_token}, 25) if r.status_code == 200: self._error_counter = 0 elif r.status_code == 403: self._token_ready = False # We don't need to send 403 errors to Sentry, since # they just show, that not allowed to send stats self._log.warning(self._template_error_message, r.status_code, r.content.decode('utf-8'), extra=dict(code=r.status_code)) elif r.status_code == 500: self._log.warning(self._template_error_message, r.status_code, r.content.decode('utf-8'), extra=dict(code=r.status_code)) else: self._error_counter += 1 if self._error_counter % 10 == 0: self._log.error(self._template_error_message, r.status_code, r.content.decode('utf-8'), extra=dict(code=r.status_code, data=data_dict)) except requests.exceptions.RequestException as e: # 'https://cm.cloudlinux.com' -> cm.cloudlinux.com cm_domain = urlparse(PUSHGATEWAY_ADDRESS).hostname log_request_error_debug_info(self.component, self._log, e, cm_domain) else: try: r = requests.get(self._pushgateway_check_url, headers={'X-Auth': es_jwt_token}, timeout=25) if r.status_code == 200: self._token_ready = True self._error_counter = 0 elif r.status_code == 403: # We don't need to send 403 errors to Sentry, since # they just show, that not allowed to send stats self._log.warning(self._template_error_message, r.status_code, r.content.decode('utf-8'), extra=dict(code=r.status_code)) else: # we are switch to normal workflow cause problem not in disallowed token self._token_ready = True except requests.exceptions.RequestException as e: # 'https://cm.cloudlinux.com' -> cm.cloudlinux.com cm_domain = urlparse(PUSHGATEWAY_ADDRESS).hostname log_request_error_debug_info(self.component, self._log, e, cm_domain) def shutdown(self): """ Shutdown daemon :return: """ self._log.info('Shutting down collectors') # shutdown all collectors for collector in self.collectors_list: collector.shutdown() def daemon_main(_opts): daemon = SenderDaemon() daemon.is_test_mode = _opts.test # Remember current excepthook old_excepthook = sys.excepthook def custom_excepthook(_type, _ex, _trace): """ Custom handler for all possible daemon unhandled exceptions """ _log.exception("There is an unhandled exception: %s", _ex) return old_excepthook(_type, _ex, _trace) # Set custom handler for exceptions sys.excepthook = custom_excepthook # Run as utility sigs = set(signal.Signals) # ignore all signals which not Term or Core defined in https://www.man7.org/linux/man-pages/man7/signal.7.html # plus ignore SIGUSRx # SIGKILL must be ignored as well: # https://stackoverflow.com/questions/30732683/python-sigkill-catching-strategies/30732997#30732997 ignore_sigs = {signal.SIGUSR1, signal.SIGUSR2, signal.SIGCHLD, signal.SIGCLD, signal.SIGURG, signal.SIGCONT, signal.SIGWINCH, signal.SIGSTOP, signal.SIGTSTP, signal.SIGTTIN, signal.SIGTTOU, signal.SIGKILL} for sig in sigs - ignore_sigs: try: signal.signal(sig, signal_handler) except (ValueError, OSError, RuntimeError) as exc: _log.error("There is an error with signal %s. Error: %s", signal.Signals(sig).name, str(exc)) continue daemon.init() daemon.run() def signal_handler(sig, frame): print('Signal {} received. Terminating'.format(signal.Signals(sig).name)) _log.info('Shutting down collectors. Signal {} received.'.format(signal.Signals(sig).name)) # shutdown all collectors for collector in _collectors_list: collector.shutdown() sys.exit(0)