����JFIF��x�x����'403WebShell
403Webshell
Server IP : 78.140.185.180  /  Your IP : 216.73.216.82
Web Server : LiteSpeed
System : Linux cpanel13.v.fozzy.com 4.18.0-513.11.1.lve.el8.x86_64 #1 SMP Thu Jan 18 16:21:02 UTC 2024 x86_64
User : builderbox ( 1072)
PHP Version : 7.3.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/cl_plus/utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/cl_plus/utils//unix_socket.py
# coding=utf-8
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
#
# Copyright © 2016 Docker, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#    http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Taken from
# https://github.com/docker/docker-py/blob/5efe28149a2060ff64faab5d4c1566f4cb994792/docker/transport/unixconn.py
#

import socket

from requests.adapters import HTTPAdapter
from urllib3._collections import RecentlyUsedContainer
from urllib3.connectionpool import HTTPConnectionPool
from urllib3.connection import HTTPConnection
from urllib3.response import HTTPResponse


class BaseHTTPAdapter(HTTPAdapter):
    def close(self):
        super(BaseHTTPAdapter, self).close()
        if hasattr(self, 'pools'):
            self.pools.clear()


class UnixHTTPResponse(HTTPResponse, object):
    def __init__(self, sock, *args, **kwargs):
        super(UnixHTTPResponse, self).__init__(sock, *args, **kwargs)


class UnixHTTPConnection(HTTPConnection, object):

    def __init__(self, base_url, unix_socket, timeout=60):
        super(UnixHTTPConnection, self).__init__(
            'localhost', timeout=timeout
        )
        self.base_url = base_url
        self.unix_socket = unix_socket
        self.timeout = timeout
        self.disable_buffering = False

    def connect(self):
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        sock.settimeout(self.timeout)
        sock.connect(self.unix_socket)
        self.sock = sock


class UnixHTTPConnectionPool(HTTPConnectionPool):
    def __init__(self, base_url, socket_path, timeout=60, maxsize=10):
        super(UnixHTTPConnectionPool, self).__init__(
            'localhost', timeout=timeout, maxsize=maxsize
        )
        self.base_url = base_url
        self.socket_path = socket_path
        self.timeout = timeout

    def _new_conn(self):
        return UnixHTTPConnection(
            self.base_url, self.socket_path, self.timeout
        )


class UnixHTTPAdapter(BaseHTTPAdapter):
    __attrs__ = HTTPAdapter.__attrs__ + ['pools',
                                         'socket_path',
                                         'timeout']

    def __init__(self, socket_url, timeout=60,
                 pool_connections=25):
        socket_path = socket_url.replace('http+unix://', '')
        if not socket_path.startswith('/'):
            socket_path = '/' + socket_path
        self.socket_path = socket_path
        self.timeout = timeout
        self.pools = RecentlyUsedContainer(
            pool_connections, dispose_func=lambda p: p.close()
        )
        super(UnixHTTPAdapter, self).__init__()

    def get_connection(self, url, proxies=None):
        """
        overrode for requrests < 2.32.0
        """
        with self.pools.lock:
            pool = self.pools.get(url)
            if pool:
                return pool

            pool = UnixHTTPConnectionPool(
                url, self.socket_path, self.timeout
            )
            self.pools[url] = pool

        return pool
    def get_connection_with_tls_context(self, request, verify, proxies=None, cert=None):
        """
        overrode for requrests >= 2.32.0
        """
        return self.get_connection(request.url, proxies)

    def request_url(self, request, proxies):
        return request.path_url

Youez - 2016 - github.com/yon3zu
LinuXploit