# SPDX-License-Identifier: Apache-2.0## The OpenSearch Contributors require contributions made to# this file be licensed under the Apache-2.0 license or a# compatible open source license.## Modifications Copyright OpenSearch Contributors. See# GitHub history for details.## Licensed to Elasticsearch B.V. under one or more contributor# license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright# ownership. Elasticsearch B.V. licenses this file to you under# the Apache License, Version 2.0 (the "License"); you may# not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importgzipimportioimportloggingimportosimportreimportwarningsfromplatformimportpython_versionfromtypingimportAny,Collection,Dict,Mapping,Optional,Uniontry:importsimplejsonasjsonexceptImportError:importjson# type: ignorefrom.._versionimport__versionstr__from..exceptionsimportHTTP_EXCEPTIONS,OpenSearchWarning,TransportErrorlogger=logging.getLogger("opensearch")# create the opensearchpy.trace logger, but only set propagate to False if the# logger hasn't already been configuredTRACER_ALREADY_CONFIGURED="opensearchpy.trace"inlogging.Logger.manager.loggerDicttracer=logging.getLogger("opensearchpy.trace")ifnotTRACER_ALREADY_CONFIGURED:tracer.propagate=False_WARNING_RE=re.compile(r"\"([^\"]*)\"")
[docs]classConnection:""" Class responsible for maintaining a connection to an OpenSearch node. It holds persistent connection pool to it and its main interface (`perform_request`) is thread-safe. Also responsible for logging. :arg host: hostname of the node (default: localhost) :arg port: port to use (integer, default: 9200) :arg use_ssl: use ssl for the connection if `True` :arg url_prefix: optional url prefix for opensearch :arg timeout: default timeout in seconds (float, default: 10) :arg http_compress: Use gzip compression :arg opaque_id: Send this value in the 'X-Opaque-Id' HTTP header For tracing all requests made by this transport. """def__init__(self,host:str="localhost",port:Optional[int]=None,use_ssl:bool=False,url_prefix:str="",timeout:int=10,headers:Optional[Dict[str,str]]=None,http_compress:Optional[bool]=None,opaque_id:Optional[str]=None,**kwargs:Any,)->None:ifportisNone:port=9200# Work-around if the implementing class doesn't# define the headers property before calling super().__init__()ifnothasattr(self,"headers"):self.headers={}headers=headersor{}forkeyinheaders:self.headers[key.lower()]=headers[key]ifopaque_id:self.headers["x-opaque-id"]=opaque_idifos.getenv("ELASTIC_CLIENT_APIVERSIONING")=="1":self.headers.setdefault("accept","application/vnd.elasticsearch+json;compatible-with=7")self.headers.setdefault("content-type","application/json")self.headers.setdefault("user-agent",self._get_default_user_agent())ifhttp_compress:self.headers["accept-encoding"]="gzip,deflate"scheme=kwargs.get("scheme","http")ifuse_sslorscheme=="https":scheme="https"use_ssl=Trueself.use_ssl=use_sslself.http_compress=http_compressorFalseself.scheme=schemeself.hostname=hostself.port=portif":"inhost:# IPv6self.host=f"{scheme}://[{host}]"else:self.host=f"{scheme}://{host}"ifself.portisnotNone:self.host+=f":{self.port}"ifurl_prefix:url_prefix="/"+url_prefix.strip("/")self.url_prefix=url_prefixself.timeout=timeout
[docs]def__eq__(self,other:object)->bool:ifnotisinstance(other,Connection):raiseTypeError(f"Unsupported equality check for {self} and {other}")returnself.__hash__()==other.__hash__()
[docs]def__lt__(self,other:object)->bool:ifnotisinstance(other,Connection):raiseTypeError(f"Unsupported lt check for {self} and {other}")returnself.__hash__()<other.__hash__()
[docs]def_raise_warnings(self,warning_headers:Any)->None:"""If 'headers' contains a 'Warning' header raise the warnings to be seen by the user. Takes an iterable of string values from any number of 'Warning' headers. """ifnotwarning_headers:return# Grab only the message from each header, the rest is discarded.# Format is: '(number) OpenSearch-(version)-(instance) "(message)"'warning_messages=[]forheaderinwarning_headers:# Because 'Requests' does its own folding of multiple HTTP headers# into one header delimited by commas (totally standard compliant, just# annoying for cases like this) we need to expect there may be# more than one message per 'Warning' header.matches=_WARNING_RE.findall(header)ifmatches:warning_messages.extend(matches)else:# Don't want to throw away any warnings, even if they# don't follow the format we have now. Use the whole header.warning_messages.append(header)formessageinwarning_messages:warnings.warn(message,category=OpenSearchWarning)
def_pretty_json(self,data:Union[str,bytes])->str:# pretty JSON in tracer curl logstry:returnjson.dumps(json.loads(data),sort_keys=True,indent=2,separators=(",",": ")).replace("'",r"\u0027")except(ValueError,TypeError):# non-json data or a bulk requestreturndata# type: ignoredef_log_request_response(self,body:Optional[Union[str,bytes]],response:Optional[str])->None:iflogger.isEnabledFor(logging.DEBUG):ifbodyandisinstance(body,bytes):body=body.decode("utf-8","ignore")logger.debug("> %s",body)ifresponseisnotNone:logger.debug("< %s",response)def_log_trace(self,method:str,path:str,body:Optional[Union[str,bytes]],status_code:Optional[int],response:Optional[str],duration:Optional[float],)->None:ifnottracer.isEnabledFor(logging.INFO)ornottracer.handlers:return# include pretty in trace curlspath=path.replace("?","?pretty&",1)if"?"inpathelsepath+"?pretty"ifself.url_prefix:path=path.replace(self.url_prefix,"",1)tracer.info("curl %s-X%s 'http://localhost:9200%s' -d '%s'","-H 'Content-Type: application/json' "ifbodyelse"",method,path,self._pretty_json(body)ifbodyelse"",)iftracer.isEnabledFor(logging.DEBUG):tracer.debug("#[%s] (%.3fs)\n#%s",status_code,duration,self._pretty_json(response).replace("\n","\n#")ifresponseelse"",)defperform_request(self,method:str,url:str,params:Optional[Mapping[str,Any]]=None,body:Optional[bytes]=None,timeout:Optional[Union[int,float]]=None,ignore:Collection[int]=(),headers:Optional[Mapping[str,str]]=None,)->Any:raiseNotImplementedError()
[docs]deflog_request_success(self,method:str,full_url:str,path:str,body:Any,status_code:int,response:str,duration:float,)->None:"""Log a successful API call."""# TODO: optionally pass in params instead of full_url and do urlencode only when neededlogger.info("%s%s [status:%s request:%.3fs]",method,full_url,status_code,duration)self._log_request_response(body,response)self._log_trace(method,path,body,status_code,response,duration)
[docs]deflog_request_fail(self,method:str,full_url:str,path:str,body:Any,duration:float,status_code:Optional[int]=None,response:Optional[str]=None,exception:Optional[Exception]=None,)->None:"""Log an unsuccessful API call."""# do not log 404s on HEAD requestsifmethod=="HEAD"andstatus_code==404:returnlogger.warning("%s%s [status:%s request:%.3fs]",method,full_url,status_codeor"N/A",duration,exc_info=exceptionisnotNone,)self._log_request_response(body,response)self._log_trace(method,path,body,status_code,response,duration)
[docs]def_raise_error(self,status_code:int,raw_data:Union[str,bytes],content_type:Optional[str]=None,)->None:"""Locate appropriate exception and raise it."""error_message=raw_dataadditional_info=Nonetry:content_type=("text/plain"ifcontent_typeisNoneelsecontent_type.split(";")[0].strip())ifraw_dataandcontent_type=="application/json":additional_info=json.loads(raw_data)error_message=additional_info.get("error",error_message)ifisinstance(error_message,dict)and"type"inerror_message:error_message=error_message["type"]except(ValueError,TypeError)aserr:logger.warning("Undecodable raw error response from server: %s",err)raiseHTTP_EXCEPTIONS.get(status_code,TransportError)(status_code,error_message,additional_info)
[docs]@staticmethoddefdefault_ca_certs()->Union[str,None]:""" Get the default CA certificate bundle, preferring those configured in the standard OpenSSL environment variables before those provided by certifi (if available) """ca_certs=os.environ.get("SSL_CERT_FILE")oros.environ.get("SSL_CERT_DIR")ifnotca_certs:try:importcertifica_certs=certifi.where()exceptImportError:passreturnca_certs