Source code for opensearchpy.connection.http_requests
# SPDX-License-Identifier: Apache-2.0## The OpenSearch Contributors require contributions made to# this file be licensed under the Apache-2.0 license or a# compatible open source license.## Modifications Copyright OpenSearch Contributors. See# GitHub history for details.## Licensed to Elasticsearch B.V. under one or more contributor# license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright# ownership. Elasticsearch B.V. licenses this file to you under# the Apache License, Version 2.0 (the "License"); you may# not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importtimeimportwarningsfromtypingimportAny,Collection,Mapping,Optional,Uniontry:importrequestsREQUESTS_AVAILABLE=TrueexceptImportError:REQUESTS_AVAILABLE=Falsefromopensearchpy.metricsimportMetrics,MetricsNonefrom..compatimportreraise_exceptions,string_types,urlencodefrom..exceptionsimport(ConnectionError,ConnectionTimeout,ImproperlyConfigured,SSLError,)from.baseimportConnection
[docs]classRequestsHttpConnection(Connection):""" Connection using the `requests` library. :arg http_auth: optional http auth information as either ':' separated string or a tuple. Any value will be passed into requests as `auth`. :arg use_ssl: use ssl for the connection if `True` :arg verify_certs: whether to verify SSL certificates :arg ssl_show_warn: show warning when verify certs is disabled :arg ca_certs: optional path to CA bundle. Defaults to configured OpenSSL bundles from environment variables and then certifi before falling back to the standard requests bundle to improve consistency with other Connection implementations :arg client_cert: path to the file containing the private key and the certificate, or cert only if using client_key :arg client_key: path to the file containing the private key if using separate cert and key files (client_cert will contain only the cert) :arg headers: any custom http headers to be add to requests :arg http_compress: Use gzip compression :arg opaque_id: Send this value in the 'X-Opaque-Id' HTTP header For tracing all requests made by this transport. :arg pool_maxsize: Maximum connection pool size used by pool-manager For custom connection-pooling on current session :arg metrics: metrics is an instance of a subclass of the :class:`~opensearchpy.Metrics` class, used for collecting and reporting metrics related to the client's operations; """def__init__(self,host:str="localhost",port:Optional[int]=None,http_auth:Any=None,use_ssl:bool=False,verify_certs:bool=True,ssl_show_warn:bool=True,ca_certs:Any=None,client_cert:Any=None,client_key:Any=None,headers:Any=None,http_compress:Any=None,opaque_id:Any=None,pool_maxsize:Any=None,metrics:Metrics=MetricsNone(),**kwargs:Any,)->None:self.metrics=metricsifnotREQUESTS_AVAILABLE:raiseImproperlyConfigured("Please install requests to use RequestsHttpConnection.")# Initialize Session so .headers works before calling super().__init__().self.session=requests.Session()forkeyinlist(self.session.headers):self.session.headers.pop(key)# Mount http-adapter with custom connection-pool size. Default=10ifpool_maxsizeandisinstance(pool_maxsize,int):pool_adapter=requests.adapters.HTTPAdapter(pool_maxsize=pool_maxsize)self.session.mount("http://",pool_adapter)self.session.mount("https://",pool_adapter)super().__init__(host=host,port=port,use_ssl=use_ssl,headers=headers,http_compress=http_compress,opaque_id=opaque_id,**kwargs,)ifnotself.http_compress:# Need to set this to 'None' otherwise Requests adds its own.self.session.headers["accept-encoding"]=None# type: ignoreifhttp_authisnotNone:ifisinstance(http_auth,(tuple,list)):http_auth=tuple(http_auth)elifisinstance(http_auth,string_types):http_auth=tuple(http_auth.split(":",1))# type: ignoreself.session.auth=http_authself.base_url=f"{self.host}{self.url_prefix}"self.session.verify=verify_certsifnotclient_key:self.session.cert=client_certelifclient_cert:# cert is a tuple of (certfile, keyfile)self.session.cert=(client_cert,client_key)ifca_certs:ifnotverify_certs:raiseImproperlyConfigured("You cannot pass CA certificates when verify SSL is off.")self.session.verify=ca_certselifverify_certs:ca_certs=self.default_ca_certs()ifca_certs:self.session.verify=ca_certsifnotssl_show_warn:requests.packages.urllib3.disable_warnings()# type: ignoreifself.use_sslandnotverify_certsandssl_show_warn:warnings.warn("Connecting to %s using SSL with verify_certs=False is insecure."%self.host)defperform_request(# type: ignoreself,method:str,url:str,params:Optional[Mapping[str,Any]]=None,body:Optional[bytes]=None,timeout:Optional[Union[int,float]]=None,allow_redirects:Optional[bool]=True,ignore:Collection[int]=(),headers:Optional[Mapping[str,str]]=None,)->Any:url=self.base_url+urlheaders=headersor{}ifparams:url=f"{url}?{urlencode(paramsor{})}"orig_body=bodyifself.http_compressandbody:body=self._gzip_compress(body)headers["content-encoding"]="gzip"# type: ignorestart=time.time()request=requests.Request(method=method,headers=headers,url=url,data=body)prepared_request=self.session.prepare_request(request)settings=self.session.merge_environment_settings(prepared_request.url,{},None,None,None)send_kwargs:Any={"timeout":timeoutorself.timeout,"allow_redirects":allow_redirects,}send_kwargs.update(settings)try:self.metrics.request_start()response=self.session.send(prepared_request,**send_kwargs)duration=time.time()-startraw_data=response.content.decode("utf-8","surrogatepass")exceptreraise_exceptions:raiseexceptExceptionase:self.log_request_fail(method,url,prepared_request.path_url,orig_body,time.time()-start,exception=e,)ifisinstance(e,requests.exceptions.SSLError):raiseSSLError("N/A",str(e),e)ifisinstance(e,requests.Timeout):raiseConnectionTimeout("TIMEOUT",str(e),e)raiseConnectionError("N/A",str(e),e)finally:self.metrics.request_end()# raise warnings if any from the 'Warnings' header.warnings_headers=((response.headers["warning"],)if"warning"inresponse.headerselse())self._raise_warnings(warnings_headers)# raise errors based on http status codes, let the client handle those if neededif(not(200<=response.status_code<300)andresponse.status_codenotinignore):self.log_request_fail(method,url,response.request.path_url,orig_body,duration,response.status_code,raw_data,)self._raise_error(response.status_code,raw_data,response.headers.get("Content-Type"),)self.log_request_success(method,url,response.request.path_url,orig_body,response.status_code,raw_data,duration,)returnresponse.status_code,response.headers,raw_data@propertydefheaders(self)->Any:# type: ignorereturnself.session.headers