����JFIF��x�x����'
Server IP : 78.140.185.180 / Your IP : 216.73.216.82 Web Server : LiteSpeed System : Linux cpanel13.v.fozzy.com 4.18.0-513.11.1.lve.el8.x86_64 #1 SMP Thu Jan 18 16:21:02 UTC 2024 x86_64 User : builderbox ( 1072) PHP Version : 7.3.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/plugins/generic/ |
Upload File : |
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import logging import os import re from sqlalchemy.exc import SQLAlchemyError from clcommon.clpwd import ClPwd from lvestats.core.plugin import LveStatsPlugin from lvestats.lib.commons.func import reboot_lock from lvestats.lib.commons.sizeutil import dbgov_io_bytes_value from lvestats.orm.history_gov import history_gov MAX_FILES_PER_TRANSACTION = 1000 class DBGovSaver(LveStatsPlugin): DBSTAT_DIR = "/var/lve/dbgovernor/" FILE_PATTERN = re.compile(r"governor\.[0-9]+$", re.IGNORECASE) _history_gov_col = list(history_gov.__table__.columns.keys()) def __init__(self): self.logger = logging.getLogger(__name__) self.server_id = "localhost" self.engine = None self.headers = ( ("username", str), None, # max_simultaneous_requests not support ("sum_cpu", float), ("sum_write", float), ("sum_read", float), None, # max_cpu not support None, # max_write not support None, # max_read not support ("number_of_restricts", int), ("limit_cpu_on_period_end", int), ("limit_read_on_period_end", int), ("limit_write_on_period_end", int), ("cause_of_restrict", int), ("uid", int), ) self._headers_len = len(self.headers) self.cl_pwd = ClPwd() self.min_uid = self.cl_pwd.get_sys_min_uid(500) def set_config(self, config): self.server_id = config.get("server_id", self.server_id) def get_user_id(self, username): try: return self.cl_pwd.get_uid(username) except self.cl_pwd.NoSuchUserException as e: self.logger.debug('Can not obtain user id for "%s"; %s', username, e) return -1 def scan_dir(self): """ Scans directory generated by db governer and prepares statistics for insertion into database. :return: list of tuples [(file name, [lines]), (file name, [lines])...] """ if os.path.exists(self.DBSTAT_DIR): flist = filter(self.FILE_PATTERN.search, os.listdir(self.DBSTAT_DIR)) for f in flist: try: file_name = os.path.join(self.DBSTAT_DIR, f) with open(file_name, "r", encoding="utf-8") as f_stats: f_stats_lines = f_stats.readlines() yield file_name, f_stats_lines except IOError: self.logger.warning("No file statistic") except UnicodeDecodeError: with open(file_name, "r", errors="surrogateescape", encoding="utf-8") as file: f_source = file.read() self.logger.error( "Error while decoding the file %s", f, exc_info=True, extra={f: f_source}, ) yield file_name, [] def write_to_db(self, conn, scanned): """ :type scanned: generator :type conn: sqlalchemy.engine.base.Connection :rtype: list(dict(str, int|str)) """ values_list = [] unlink_list = [] for n_, (file_name, lines) in enumerate(scanned): if MAX_FILES_PER_TRANSACTION < n_: break for line in lines: try: self.logger.debug("write: %s", line) line_splited = line.strip().split(";") file_timestamp = int(file_name.split(".")[-1]) values = {"server_id": self.server_id, "ts": file_timestamp} values.update(dict([(h_[0], h_[1](v_)) for h_, v_ in zip(self.headers, line_splited) if h_])) if ( # 'uid' might be: # - missing (in governor-mysql < 1.2-1 and under some Gov configurations) # - negative (e.g. if Gov. account name does not coincide with any Unix user name) # and it's planned for removal in the future (CLOS-3317) not values.get("uid") or values.get("uid") < 0 ): values["uid"] = self.get_user_id(values["username"]) # extend dict by user id if values["uid"] >= self.min_uid: # ignoring system users and when we can't extract user id values_list.append(values) except (IndexError, ValueError): self.logger.warning("Can not parse file %s; data from file not be writen to database", file_name) unlink_list.append(file_name) # Data to transfer to CM plugin data_for_cm = {} # insert all data per one commit with reboot_lock(): if values_list: try: # filter for insert only supported columns values_list_filtered = [ {k: v for k, v in list(d.items()) if k in self._history_gov_col} for d in values_list ] # form data for CM plugin for dbgov_data in values_list_filtered: uid = dbgov_data["uid"] data_for_cm[uid] = { "cpu_limit": dbgov_data["limit_cpu_on_period_end"], "io_limit": dbgov_io_bytes_value( dbgov_data["limit_read_on_period_end"], dbgov_data["limit_write_on_period_end"] ), "cpu_usage": round(dbgov_data["sum_cpu"], 1), "io_usage": dbgov_io_bytes_value(dbgov_data["sum_read"], dbgov_data["sum_write"]), } conn.execute(history_gov.__table__.insert(), values_list_filtered) except (SQLAlchemyError, KeyError) as e: self.logger.warning(str(e)) try: list(map(os.unlink, unlink_list)) except OSError: pass return data_for_cm def execute(self, lve_data): """ :type lve_data: dict """ if "dbgov_data" not in lve_data: lve_data["dbgov_data"] = [] conn = self.engine.connect() try: scanned = self.scan_dir() dbgov_data_for_cm = self.write_to_db(conn, scanned) if dbgov_data_for_cm: lve_data["dbgov_data_for_cm"] = dbgov_data_for_cm finally: conn.close()