# SPDX-License-Identifier: Apache-2.0## The OpenSearch Contributors require contributions made to# this file be licensed under the Apache-2.0 license or a# compatible open source license.## Modifications Copyright OpenSearch Contributors. See# GitHub history for details.## Licensed to Elasticsearch B.V. under one or more contributor# license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright# ownership. Elasticsearch B.V. licenses this file to you under# the Apache License, Version 2.0 (the "License"); you may# not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importcollections.abcascollections_abcimportcopyfromtypingimportAnyfromopensearchpy.connection.connectionsimportget_connectionfromopensearchpy.exceptionsimportTransportErrorfromopensearchpy.helpersimportscanfrom..exceptionsimportIllegalOperationfrom..helpers.queryimportBool,Qfrom.aggsimportA,AggBasefrom.responseimportHit,Responsefrom.utilsimportAttrDict,DslBase,recursive_to_dictclassQueryProxy:""" Simple proxy around DSL objects (queries) that can be called (to add query/post_filter) and also allows attribute access which is proxied to the wrapped query. """def__init__(self,search:Any,attr_name:Any)->None:self._search=searchself._proxied:Any=Noneself._attr_name=attr_namedef__nonzero__(self)->bool:returnself._proxiedisnotNone__bool__=__nonzero__def__call__(self,*args:Any,**kwargs:Any)->Any:s=self._search._clone()# we cannot use self._proxied since we just cloned self._search and# need to access the new self on the cloneproxied=getattr(s,self._attr_name)ifproxied._proxiedisNone:proxied._proxied=Q(*args,**kwargs)else:proxied._proxied&=Q(*args,**kwargs)# always return search to be chainablereturnsdef__getattr__(self,attr_name:Any)->Any:returngetattr(self._proxied,attr_name)def__setattr__(self,attr_name:Any,value:Any)->None:ifnotattr_name.startswith("_"):self._proxied=Q(self._proxied.to_dict())setattr(self._proxied,attr_name,value)super().__setattr__(attr_name,value)def__getstate__(self)->Any:returnself._search,self._proxied,self._attr_namedef__setstate__(self,state:Any)->None:self._search,self._proxied,self._attr_name=stateclassProxyDescriptor:""" Simple descriptor to enable setting of queries and filters as: s = Search() s.query = Q(...) """def__init__(self,name:str)->None:self._attr_name=f"_{name}_proxy"def__get__(self,instance:Any,owner:Any)->Any:returngetattr(instance,self._attr_name)def__set__(self,instance:Any,value:Any)->None:proxy=getattr(instance,self._attr_name)proxy._proxied=Q(value)classAggsProxy(AggBase,DslBase):name="aggs"def__init__(self,search:Any)->None:self._base=selfself._search=searchself._params={"aggs":{}}defto_dict(self)->Any:returnsuper().to_dict().get("aggs",{})classRequest:_doc_type:Any_doc_type_map:Anydef__init__(self,using:str="default",index:Any=None,doc_type:Any=None,extra:Any=None,)->None:self._using=usingself._index=Noneifisinstance(index,(tuple,list)):self._index=list(index)elifindex:self._index=[index]self._doc_type=[]self._doc_type_map={}ifisinstance(doc_type,(tuple,list)):self._doc_type.extend(doc_type)elifisinstance(doc_type,collections_abc.Mapping):self._doc_type.extend(doc_type.keys())self._doc_type_map.update(doc_type)elifdoc_type:self._doc_type.append(doc_type)self._params:Any={}self._extra:Any=extraor{}def__eq__(self:Any,other:Any)->bool:return(isinstance(other,Request)andother._params==self._paramsandother._index==self._indexandother._doc_type==self._doc_typeandother.to_dict()==self.to_dict()# type: ignore)def__copy__(self)->Any:returnself._clone()defparams(self,**kwargs:Any)->Any:""" Specify query params to be used when executing the search. All the keyword arguments will override the current values. Example:: s = Search() s = s.params(routing='user-1', preference='local') """s=self._clone()s._params.update(kwargs)returnsdefindex(self,*index:Any)->Any:""" Set the index for the search. If called empty it will remove all information. Example: s = Search() s = s.index('twitter-2015.01.01', 'twitter-2015.01.02') s = s.index(['twitter-2015.01.01', 'twitter-2015.01.02']) """# .index() resetss=self._clone()ifnotindex:s._index=Noneelse:indexes=[]foriinindex:ifisinstance(i,str):indexes.append(i)elifisinstance(i,list):indexes+=ielifisinstance(i,tuple):indexes+=list(i)s._index=(self._indexor[])+indexesreturnsdef_resolve_field(self,path:Any)->Any:fordtinself._doc_type:ifnothasattr(dt,"_index"):continuefield=dt._index.resolve_field(path)iffieldisnotNone:returnfielddef_resolve_nested(self,hit:Any,parent_class:Any=None)->Any:doc_class=Hitnested_path:Any=[]nesting=hit["_nested"]whilenestingand"field"innesting:nested_path.append(nesting["field"])nesting=nesting.get("_nested")nested_path=".".join(nested_path)ifhasattr(parent_class,"_index"):nested_field=parent_class._index.resolve_field(nested_path)else:nested_field=self._resolve_field(nested_path)ifnested_fieldisnotNone:returnnested_field._doc_classreturndoc_classdef_get_result(self,hit:Any,parent_class:Any=None)->Any:doc_class=Hitdt=hit.get("_type")if"_nested"inhit:doc_class=self._resolve_nested(hit,parent_class)elifdtinself._doc_type_map:doc_class=self._doc_type_map[dt]else:fordoc_typeinself._doc_type:ifhasattr(doc_type,"_matches")anddoc_type._matches(hit):doc_class=doc_typebreakfortinhit.get("inner_hits",()):hit["inner_hits"][t]=Response(self,hit["inner_hits"][t],doc_class=doc_class)callback=getattr(doc_class,"from_opensearch",doc_class)returncallback(hit)defdoc_type(self,*doc_type:Any,**kwargs:Any)->Any:""" Set the type to search through. You can supply a single value or multiple. Values can be strings or subclasses of ``Document``. You can also pass in any keyword arguments, mapping a doc_type to a callback that should be used instead of the Hit class. If no doc_type is supplied any information stored on the instance will be erased. Example: s = Search().doc_type('product', 'store', User, custom=my_callback) """# .doc_type() resetss=self._clone()ifnotdoc_typeandnotkwargs:s._doc_type=[]s._doc_type_map={}else:s._doc_type.extend(doc_type)s._doc_type.extend(kwargs.keys())s._doc_type_map.update(kwargs)returnsdefusing(self,client:Any)->Any:""" Associate the search request with an opensearch client. A fresh copy will be returned with current instance remaining unchanged. :arg client: an instance of ``opensearchpy.OpenSearch`` to use or an alias to look up in ``opensearchpy.connections`` """s=self._clone()s._using=clientreturnsdefextra(self,**kwargs:Any)->Any:""" Add extra keys to the request body. Mostly here for backwards compatibility. """s=self._clone()if"from_"inkwargs:kwargs["from"]=kwargs.pop("from_")s._extra.update(kwargs)returnsdef_clone(self)->Any:s=self.__class__(using=self._using,index=self._index,doc_type=self._doc_type)s._doc_type_map=self._doc_type_map.copy()s._extra=self._extra.copy()s._params=self._params.copy()returns
[docs]classSearch(Request):query=ProxyDescriptor("query")post_filter=ProxyDescriptor("post_filter")def__init__(self,**kwargs:Any)->None:""" Search request to opensearch. :arg using: `OpenSearch` instance to use :arg index: limit the search to index :arg doc_type: only query this type. All the parameters supplied (or omitted) at creation type can be later overridden by methods (`using`, `index` and `doc_type` respectively). """super().__init__(**kwargs)self.aggs=AggsProxy(self)self._sort:Any=[]self._collapse:Any={}self._source:Any=Noneself._highlight:Any={}self._highlight_opts:Any={}self._suggest:Any={}self._script_fields:Any={}self._response_class=Responseself._query_proxy=QueryProxy(self,"query")self._post_filter_proxy=QueryProxy(self,"post_filter")deffilter(self,*args:Any,**kwargs:Any)->Any:returnself.query(Bool(filter=[Q(*args,**kwargs)]))defexclude(self,*args:Any,**kwargs:Any)->Any:returnself.query(Bool(filter=[~Q(*args,**kwargs)]))
[docs]def__iter__(self)->Any:""" Iterate over the hits. """returniter(self.execute())
[docs]def__getitem__(self,n:Any)->Any:""" Support slicing the `Search` instance for pagination. Slicing equates to the from/size parameters. E.g.:: s = Search().query(...)[0:25] is equivalent to:: s = Search().query(...).extra(from_=0, size=25) """s=self._clone()ifisinstance(n,slice):# If negative slicing, abort.ifn.startandn.start<0orn.stopandn.stop<0:raiseValueError("Search does not support negative slicing.")# OpenSearch won't get all results so we default to size: 10 if# stop not given.s._extra["from"]=n.startor0s._extra["size"]=max(0,n.stop-(n.startor0)ifn.stopisnotNoneelse10)returnselse:# This is an index lookup, equivalent to slicing by [n:n+1].# If negative index, abort.ifn<0:raiseValueError("Search does not support negative indexing.")s._extra["from"]=ns._extra["size"]=1returns
[docs]@classmethoddeffrom_dict(cls,d:Any)->Any:""" Construct a new `Search` instance from a raw dict containing the search body. Useful when migrating from raw dictionaries. Example:: s = Search.from_dict({ "query": { "bool": { "must": [...] } }, "aggs": {...} }) s = s.filter('term', published=True) """s=cls()s.update_from_dict(d)returns
[docs]def_clone(self)->Any:""" Return a clone of the current search request. Performs a shallow copy of all the underlying objects. Used internally by most state modifying APIs. """s=super()._clone()s._response_class=self._response_classs._sort=self._sort[:]s._source=copy.copy(self._source)ifself._sourceisnotNoneelseNones._highlight=self._highlight.copy()s._highlight_opts=self._highlight_opts.copy()s._suggest=self._suggest.copy()s._script_fields=self._script_fields.copy()s._collapse=self._collapse.copy()forxin("query","post_filter"):getattr(s,x)._proxied=getattr(self,x)._proxied# copy top-level bucket definitionsifself.aggs._params.get("aggs"):s.aggs._params={"aggs":self.aggs._params["aggs"].copy()}returns
[docs]defresponse_class(self,cls:Any)->Any:""" Override the default wrapper used for the response. """s=self._clone()s._response_class=clsreturns
[docs]defupdate_from_dict(self,d:Any)->"Search":""" Apply options from a serialized body to the current instance. Modifies the object in-place. Used mostly by ``from_dict``. """d=d.copy()if"query"ind:self.query._proxied=Q(d.pop("query"))if"post_filter"ind:self.post_filter._proxied=Q(d.pop("post_filter"))aggs=d.pop("aggs",d.pop("aggregations",{}))ifaggs:self.aggs._params={"aggs":{name:A(value)for(name,value)inaggs.items()}}if"sort"ind:self._sort=d.pop("sort")if"_source"ind:self._source=d.pop("_source")if"highlight"ind:high=d.pop("highlight").copy()self._highlight=high.pop("fields")self._highlight_opts=highif"suggest"ind:self._suggest=d.pop("suggest")if"text"inself._suggest:text=self._suggest.pop("text")forsinself._suggest.values():s.setdefault("text",text)if"script_fields"ind:self._script_fields=d.pop("script_fields")self._extra.update(d)returnself
[docs]defscript_fields(self,**kwargs:Any)->Any:""" Define script fields to be calculated on hits. Example:: s = Search() s = s.script_fields(times_two="doc['field'].value * 2") s = s.script_fields( times_three={ 'script': { 'lang': 'painless', 'source': "doc['field'].value * params.n", 'params': {'n': 3} } } ) """s=self._clone()fornameinkwargs:ifisinstance(kwargs[name],str):kwargs[name]={"script":kwargs[name]}s._script_fields.update(kwargs)returns
[docs]defsource(self,fields:Any=None,**kwargs:Any)->Any:""" Selectively control how the _source field is returned. :arg fields: wildcard string, array of wildcards, or dictionary of includes and excludes If ``fields`` is None, the entire document will be returned for each hit. If fields is a dictionary with keys of 'includes' and/or 'excludes' the fields will be either included or excluded appropriately. Calling this multiple times with the same named parameter will override the previous values with the new ones. Example:: s = Search() s = s.source(includes=['obj1.*'], excludes=["*.description"]) s = Search() s = s.source(includes=['obj1.*']).source(excludes=["*.description"]) """s=self._clone()iffieldsandkwargs:raiseValueError("You cannot specify fields and kwargs at the same time.")iffieldsisnotNone:s._source=fieldsreturnsifkwargsandnotisinstance(s._source,dict):s._source={}forkey,valueinkwargs.items():ifvalueisNone:try:dels._source[key]exceptKeyError:passelse:s._source[key]=valuereturns
[docs]defsort(self,*keys:Any)->Any:""" Add sorting information to the search request. If called without arguments it will remove all sort requirements. Otherwise it will replace them. Acceptable arguments are:: 'some.field' '-some.other.field' {'different.field': {'any': 'dict'}} so for example:: s = Search().sort( 'category', '-title', {"price" : {"order" : "asc", "mode" : "avg"}} ) will sort by ``category``, ``title`` (in descending order) and ``price`` in ascending order using the ``avg`` mode. The API returns a copy of the Search object and can thus be chained. """s=self._clone()s._sort=[]forkinkeys:ifisinstance(k,str)andk.startswith("-"):ifk[1:]=="_score":raiseIllegalOperation("Sorting by `-_score` is not allowed.")k={k[1:]:{"order":"desc"}}s._sort.append(k)returns
[docs]defcollapse(self,field:Any=None,inner_hits:Any=None,max_concurrent_group_searches:Any=None,)->Any:""" Add collapsing information to the search request. If called without providing ``field``, it will remove all collapse requirements, otherwise it will replace them with the provided arguments. The API returns a copy of the Search object and can thus be chained. """s=self._clone()s._collapse={}iffieldisNone:returnss._collapse["field"]=fieldifinner_hits:s._collapse["inner_hits"]=inner_hitsifmax_concurrent_group_searches:s._collapse["max_concurrent_group_searches"]=max_concurrent_group_searchesreturns
[docs]defhighlight_options(self,**kwargs:Any)->Any:""" Update the global highlighting options used for this request. For example:: s = Search() s = s.highlight_options(order='score') """s=self._clone()s._highlight_opts.update(kwargs)returns
[docs]defhighlight(self,*fields:Any,**kwargs:Any)->Any:""" Request highlighting of some fields. All keyword arguments passed in will be used as parameters for all the fields in the ``fields`` parameter. Example:: Search().highlight('title', 'body', fragment_size=50) will produce the equivalent of:: { "highlight": { "fields": { "body": {"fragment_size": 50}, "title": {"fragment_size": 50} } } } If you want to have different options for different fields you can call ``highlight`` twice:: Search().highlight('title', fragment_size=50).highlight('body', fragment_size=100) which will produce:: { "highlight": { "fields": { "body": {"fragment_size": 100}, "title": {"fragment_size": 50} } } } """s=self._clone()forfinfields:s._highlight[f]=kwargsreturns
[docs]defsuggest(self,name:Any,text:Any,**kwargs:Any)->Any:""" Add a suggestions request to the search. :arg name: name of the suggestion :arg text: text to suggest on All keyword arguments will be added to the suggestions body. For example:: s = Search() s = s.suggest('suggestion-1', 'OpenSearch', term={'field': 'body'}) """s=self._clone()s._suggest[name]={"text":text}s._suggest[name].update(kwargs)returns
[docs]defto_dict(self,count:bool=False,**kwargs:Any)->Any:""" Serialize the search into the dictionary that will be sent over as the request's body. :arg count: a flag to specify if we are interested in a body for count - no aggregations, no pagination bounds etc. All additional keyword arguments will be included into the dictionary. """d={}ifself.query:d["query"]=self.query.to_dict()# count request doesn't care for sorting and other thingsifnotcount:ifself.post_filter:d["post_filter"]=self.post_filter.to_dict()ifself.aggs.aggs:d.update(self.aggs.to_dict())ifself._sort:d["sort"]=self._sortifself._collapse:d["collapse"]=self._collapsed.update(recursive_to_dict(self._extra))ifself._sourcenotin(None,{}):d["_source"]=self._sourceifself._highlight:d["highlight"]={"fields":self._highlight}d["highlight"].update(self._highlight_opts)ifself._suggest:d["suggest"]=self._suggestifself._script_fields:d["script_fields"]=self._script_fieldsd.update(recursive_to_dict(kwargs))returnd
[docs]defcount(self)->Any:""" Return the number of hits matching the query and filters. Note that only the actual number is returned. """ifhasattr(self,"_response")andself._response.hits.total.relation=="eq":returnself._response.hits.total.valueopensearch=get_connection(self._using)d=self.to_dict(count=True)# TODO: failed shards detectionreturnopensearch.count(index=self._index,body=d,**self._params)["count"]
[docs]defexecute(self,ignore_cache:bool=False)->Any:""" Execute the search and return an instance of ``Response`` wrapping all the data. :arg ignore_cache: if set to ``True``, consecutive calls will hit OpenSearch, while cached result will be ignored. Defaults to `False` """ifignore_cacheornothasattr(self,"_response"):opensearch=get_connection(self._using)self._response=self._response_class(self,opensearch.search(index=self._index,body=self.to_dict(),**self._params),)returnself._response
[docs]defscan(self)->Any:""" Turn the search into a scan search and return a generator that will iterate over all the documents matching the query. Use ``params`` method to specify any additional arguments you with to pass to the underlying ``scan`` helper from ``opensearchpy`` """opensearch=get_connection(self._using)forhitinscan(opensearch,query=self.to_dict(),index=self._index,**self._params):yieldself._get_result(hit)
[docs]defdelete(self)->Any:""" delete() executes the query by delegating to delete_by_query() """opensearch=get_connection(self._using)returnAttrDict(opensearch.delete_by_query(index=self._index,body=self.to_dict(),**self._params))
classMultiSearch(Request):""" Combine multiple :class:`~opensearchpy.Search` objects into a single request. """def__init__(self,**kwargs:Any)->None:super().__init__(**kwargs)self._searches:Any=[]def__getitem__(self,key:Any)->Any:returnself._searches[key]def__iter__(self)->Any:returniter(self._searches)def_clone(self)->Any:ms=super()._clone()ms._searches=self._searches[:]returnmsdefadd(self,search:Any)->Any:""" Adds a new :class:`~opensearchpy.Search` object to the request:: ms = MultiSearch(index='my-index') ms = ms.add(Search(doc_type=Category).filter('term', category='python')) ms = ms.add(Search(doc_type=Blog)) """ms=self._clone()ms._searches.append(search)returnmsdefto_dict(self)->Any:out=[]forsinself._searches:meta={}ifs._index:meta["index"]=s._indexmeta.update(s._params)out.append(meta)out.append(s.to_dict())returnoutdefexecute(self,ignore_cache:Any=False,raise_on_error:Any=True)->Any:""" Execute the multi search request and return a list of search results. """ifignore_cacheornothasattr(self,"_response"):opensearch=get_connection(self._using)responses=opensearch.msearch(index=self._index,body=self.to_dict(),**self._params)out=[]fors,rinzip(self._searches,responses["responses"]):ifr.get("error",False):ifraise_on_error:raiseTransportError("N/A",r["error"]["type"],r["error"])r=Noneelse:r=Response(s,r)out.append(r)self._response=outreturnself._response__all__=["Q"]