����JFIF��x�x����'
Server IP : 78.140.185.180 / Your IP : 216.73.216.82 Web Server : LiteSpeed System : Linux cpanel13.v.fozzy.com 4.18.0-513.11.1.lve.el8.x86_64 #1 SMP Thu Jan 18 16:21:02 UTC 2024 x86_64 User : builderbox ( 1072) PHP Version : 7.3.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /opt/cloudlinux/venv/lib/python3.11/site-packages/cl_plus/utils/ |
Upload File : |
# coding=utf-8 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # # Copyright © 2016 Docker, Inc. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Taken from # https://github.com/docker/docker-py/blob/5efe28149a2060ff64faab5d4c1566f4cb994792/docker/transport/unixconn.py # import socket from requests.adapters import HTTPAdapter from urllib3._collections import RecentlyUsedContainer from urllib3.connectionpool import HTTPConnectionPool from urllib3.connection import HTTPConnection from urllib3.response import HTTPResponse class BaseHTTPAdapter(HTTPAdapter): def close(self): super(BaseHTTPAdapter, self).close() if hasattr(self, 'pools'): self.pools.clear() class UnixHTTPResponse(HTTPResponse, object): def __init__(self, sock, *args, **kwargs): super(UnixHTTPResponse, self).__init__(sock, *args, **kwargs) class UnixHTTPConnection(HTTPConnection, object): def __init__(self, base_url, unix_socket, timeout=60): super(UnixHTTPConnection, self).__init__( 'localhost', timeout=timeout ) self.base_url = base_url self.unix_socket = unix_socket self.timeout = timeout self.disable_buffering = False def connect(self): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(self.timeout) sock.connect(self.unix_socket) self.sock = sock class UnixHTTPConnectionPool(HTTPConnectionPool): def __init__(self, base_url, socket_path, timeout=60, maxsize=10): super(UnixHTTPConnectionPool, self).__init__( 'localhost', timeout=timeout, maxsize=maxsize ) self.base_url = base_url self.socket_path = socket_path self.timeout = timeout def _new_conn(self): return UnixHTTPConnection( self.base_url, self.socket_path, self.timeout ) class UnixHTTPAdapter(BaseHTTPAdapter): __attrs__ = HTTPAdapter.__attrs__ + ['pools', 'socket_path', 'timeout'] def __init__(self, socket_url, timeout=60, pool_connections=25): socket_path = socket_url.replace('http+unix://', '') if not socket_path.startswith('/'): socket_path = '/' + socket_path self.socket_path = socket_path self.timeout = timeout self.pools = RecentlyUsedContainer( pool_connections, dispose_func=lambda p: p.close() ) super(UnixHTTPAdapter, self).__init__() def get_connection(self, url, proxies=None): """ overrode for requrests < 2.32.0 """ with self.pools.lock: pool = self.pools.get(url) if pool: return pool pool = UnixHTTPConnectionPool( url, self.socket_path, self.timeout ) self.pools[url] = pool return pool def get_connection_with_tls_context(self, request, verify, proxies=None, cert=None): """ overrode for requrests >= 2.32.0 """ return self.get_connection(request.url, proxies) def request_url(self, request, proxies): return request.path_url