# SPDX-License-Identifier: Apache-2.0## The OpenSearch Contributors require contributions made to# this file be licensed under the Apache-2.0 license or a# compatible open source license.## Modifications Copyright OpenSearch Contributors. See# GitHub history for details.## Licensed to Elasticsearch B.V. under one or more contributor# license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright# ownership. Elasticsearch B.V. licenses this file to you under# the Apache License, Version 2.0 (the "License"); you may# not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromtypingimportAny,Optionalfromopensearchpy.clientimportOpenSearchfromopensearchpy.connection.connectionsimportget_connectionfromopensearchpy.helpersimportanalysisfrom..exceptionsimportIllegalOperation,ValidationExceptionfrom.mappingimportMappingfrom.searchimportSearchfrom.update_by_queryimportUpdateByQueryfrom.utilsimportmergeclassIndexTemplate:def__init__(self,name:Any,template:Any,index:Any=None,order:Any=None,**kwargs:Any)->None:ifindexisNone:self._index=Index(template,**kwargs)else:ifkwargs:raiseValueError("You cannot specify options for Index when"" passing an Index instance.")self._index=index.clone()self._index._name=templateself._template_name=nameself.order=orderdef__getattr__(self,attr_name:Any)->Any:returngetattr(self._index,attr_name)defto_dict(self)->Any:d=self._index.to_dict()d["index_patterns"]=[self._index._name]ifself.orderisnotNone:d["order"]=self.orderreturnddefsave(self,using:Any=None)->Any:opensearch=get_connection(usingorself._index._using)returnopensearch.indices.put_template(name=self._template_name,body=self.to_dict())
[docs]classIndex:def__init__(self,name:Any,using:Any="default")->None:""" :arg name: name of the index :arg using: connection alias to use, defaults to ``'default'`` """self._name=nameself._doc_types:Any=[]self._using=usingself._settings:Any={}self._aliases:Any={}self._analysis:Any={}self._mapping:Any=Nonedefget_or_create_mapping(self)->Any:ifself._mappingisNone:self._mapping=Mapping()returnself._mappingdefas_template(self,template_name:Any,pattern:Any=None,order:Any=None)->Any:# TODO: should we allow pattern to be a top-level arg?# or maybe have an IndexPattern that allows for it and have# Document._index be that?returnIndexTemplate(template_name,patternorself._name,index=self,order=order)defresolve_nested(self,field_path:Any)->Any:fordocinself._doc_types:nested,field=doc._doc_type.mapping.resolve_nested(field_path)iffieldisnotNone:returnnested,fieldifself._mapping:returnself._mapping.resolve_nested(field_path)return(),Nonedefresolve_field(self,field_path:Any)->Any:fordocinself._doc_types:field=doc._doc_type.mapping.resolve_field(field_path)iffieldisnotNone:returnfieldifself._mapping:returnself._mapping.resolve_field(field_path)returnNonedefload_mappings(self,using:Optional[OpenSearch]=None)->None:self.get_or_create_mapping().update_from_opensearch(self._name,using=usingorself._using)
[docs]defclone(self,name:Any=None,using:Any=None)->Any:""" Create a copy of the instance with another name or connection alias. Useful for creating multiple indices with shared configuration:: i = Index('base-index') i.settings(number_of_shards=1) i.create() i2 = i.clone('other-index') i2.create() :arg name: name of the index :arg using: connection alias to use, defaults to ``'default'`` """i=Index(nameorself._name,using=usingorself._using)i._settings=self._settings.copy()i._aliases=self._aliases.copy()i._analysis=self._analysis.copy()i._doc_types=self._doc_types[:]ifself._mappingisnotNone:i._mapping=self._mapping._clone()returni
def_get_connection(self,using:Any=None)->Any:ifself._nameisNone:raiseValueError("You cannot perform API calls on the default index.")returnget_connection(usingorself._using)connection=property(_get_connection)
[docs]defmapping(self,mapping:Any)->Any:""" Associate a mapping (an instance of :class:`~opensearchpy.Mapping`) with this index. This means that, when this index is created, it will contain the mappings for the document type defined by those mappings. """self.get_or_create_mapping().update(mapping)
[docs]defdocument(self,document:Any)->Any:""" Associate a :class:`~opensearchpy.Document` subclass with an index. This means that, when this index is created, it will contain the mappings for the ``Document``. If the ``Document`` class doesn't have a default index yet (by defining ``class Index``), this instance will be used. Can be used as a decorator:: i = Index('blog') @i.document class Post(Document): title = Text() # create the index, including Post mappings i.create() # .search() will now return a Search object that will return # properly deserialized Post instances s = i.search() """self._doc_types.append(document)# If the document index does not have any name, that means the user# did not set any index already to the document.# So set this index as document indexifdocument._index._nameisNone:document._index=selfreturndocument
[docs]defsettings(self,**kwargs:Any)->Any:""" Add settings to the index:: i = Index('i') i.settings(number_of_shards=1, number_of_replicas=0) Multiple calls to ``settings`` will merge the keys, later overriding the earlier. """self._settings.update(kwargs)returnself
[docs]defaliases(self,**kwargs:Any)->Any:""" Add aliases to the index definition:: i = Index('blog-v2') i.aliases(blog={}, published={'filter': Q('term', published=True)}) """self._aliases.update(kwargs)returnself
[docs]defanalyzer(self,*args:Any,**kwargs:Any)->Any:""" Explicitly add an analyzer to an index. Note that all custom analyzers defined in mappings will also be created. This is useful for search analyzers. Example:: from opensearchpy import analyzer, tokenizer my_analyzer = analyzer('my_analyzer', tokenizer=tokenizer('trigram', 'nGram', min_gram=3, max_gram=3), filter=['lowercase'] ) i = Index('blog') i.analyzer(my_analyzer) """analyzer=analysis.analyzer(*args,**kwargs)d=analyzer.get_analysis_definition()# empty custom analyzer, probably already defined out of our controlifnotd:return# merge the definitionmerge(self._analysis,d,True)
[docs]defsearch(self,using:Optional[OpenSearch]=None)->Search:""" Return a :class:`~opensearchpy.Search` object searching over the index (or all the indices belonging to this template) and its ``Document``\\s. """returnSearch(using=usingorself._using,index=self._name,doc_type=self._doc_types)
[docs]defupdateByQuery(# pylint: disable=invalid-nameself,using:Optional[OpenSearch]=None)->UpdateByQuery:""" Return a :class:`~opensearchpy.UpdateByQuery` object searching over the index (or all the indices belonging to this template) and updating Documents that match the search criteria. For more information, see here: https://opensearch.org/docs/latest/opensearch/rest-api/document-apis/update-by-query/ """returnUpdateByQuery(using=usingorself._using,index=self._name,)
[docs]defcreate(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Creates the index in opensearch. Any additional keyword arguments will be passed to ``OpenSearch.indices.create`` unchanged. """returnself._get_connection(using).indices.create(index=self._name,body=self.to_dict(),**kwargs)
[docs]defsave(self,using:Optional[OpenSearch]=None)->Any:""" Sync the index definition with opensearch, creating the index if it doesn't exist and updating its settings and mappings if it does. Note some settings and mapping changes cannot be done on an open index (or at all on an existing index) and for those this method will fail with the underlying exception. """ifnotself.exists(using=using):returnself.create(using=using)body=self.to_dict()settings=body.pop("settings",{})analysis=settings.pop("analysis",None)# If _name points to an alias, the response object will contain keys with# the index name(s) the alias points to. If the alias points to multiple# indices, raise exception as the intention is ambiguoussettings_response=self.get_settings(using=using)iflen(settings_response)>1:raiseValidationException("Settings for %s point to multiple indices: %s."%(self._name,", ".join(list(settings_response.keys()))))current_settings=settings_response.popitem()[1]["settings"]["index"]ifanalysis:ifself.is_closed(using=using):# closed index, update awaysettings["analysis"]=analysiselse:# compare analysis definition, if all analysis objects are# already defined as requested, skip analysis update and# proceed, otherwise raise IllegalOperationexisting_analysis=current_settings.get("analysis",{})ifany(existing_analysis.get(section,{}).get(k,None)!=analysis[section][k]forsectioninanalysisforkinanalysis[section]):raiseIllegalOperation("You cannot update analysis configuration on an open index, ""you need to close index %s first."%self._name)# try and update the settingsifsettings:settings=settings.copy()fork,vinlist(settings.items()):ifkincurrent_settingsandcurrent_settings[k]==str(v):delsettings[k]ifsettings:self.put_settings(using=using,body=settings)# update the mappings, any conflict in the mappings will result in an# exceptionmappings=body.pop("mappings",{})ifmappings:self.put_mapping(using=using,body=mappings)
[docs]defanalyze(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Perform the analysis process on a text and return the tokens breakdown of the text. Any additional keyword arguments will be passed to ``OpenSearch.indices.analyze`` unchanged. """returnself._get_connection(using).indices.analyze(index=self._name,**kwargs)
[docs]defrefresh(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Performs a refresh operation on the index. Any additional keyword arguments will be passed to ``OpenSearch.indices.refresh`` unchanged. """returnself._get_connection(using).indices.refresh(index=self._name,**kwargs)
[docs]defflush(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Performs a flush operation on the index. Any additional keyword arguments will be passed to ``OpenSearch.indices.flush`` unchanged. """returnself._get_connection(using).indices.flush(index=self._name,**kwargs)
[docs]defget(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" The get index API allows to retrieve information about the index. Any additional keyword arguments will be passed to ``OpenSearch.indices.get`` unchanged. """returnself._get_connection(using).indices.get(index=self._name,**kwargs)
[docs]defopen(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Opens the index in opensearch. Any additional keyword arguments will be passed to ``OpenSearch.indices.open`` unchanged. """returnself._get_connection(using).indices.open(index=self._name,**kwargs)
[docs]defclose(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Closes the index in opensearch. Any additional keyword arguments will be passed to ``OpenSearch.indices.close`` unchanged. """returnself._get_connection(using).indices.close(index=self._name,**kwargs)
[docs]defdelete(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Deletes the index in opensearch. Any additional keyword arguments will be passed to ``OpenSearch.indices.delete`` unchanged. """returnself._get_connection(using).indices.delete(index=self._name,**kwargs)
[docs]defexists(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Returns ``True`` if the index already exists in opensearch. Any additional keyword arguments will be passed to ``OpenSearch.indices.exists`` unchanged. """returnself._get_connection(using).indices.exists(index=self._name,**kwargs)
[docs]defput_mapping(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Register specific mapping definition for a specific type. Any additional keyword arguments will be passed to ``OpenSearch.indices.put_mapping`` unchanged. """returnself._get_connection(using).indices.put_mapping(index=self._name,**kwargs)
[docs]defget_mapping(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Retrieve specific mapping definition for a specific type. Any additional keyword arguments will be passed to ``OpenSearch.indices.get_mapping`` unchanged. """returnself._get_connection(using).indices.get_mapping(index=self._name,**kwargs)
[docs]defget_field_mapping(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Retrieve mapping definition of a specific field. Any additional keyword arguments will be passed to ``OpenSearch.indices.get_field_mapping`` unchanged. """returnself._get_connection(using).indices.get_field_mapping(index=self._name,**kwargs)
[docs]defput_alias(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Create an alias for the index. Any additional keyword arguments will be passed to ``OpenSearch.indices.put_alias`` unchanged. """returnself._get_connection(using).indices.put_alias(index=self._name,**kwargs)
[docs]defexists_alias(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Return a boolean indicating whether given alias exists for this index. Any additional keyword arguments will be passed to ``OpenSearch.indices.exists_alias`` unchanged. """returnself._get_connection(using).indices.exists_alias(index=self._name,**kwargs)
[docs]defget_alias(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Retrieve a specified alias. Any additional keyword arguments will be passed to ``OpenSearch.indices.get_alias`` unchanged. """returnself._get_connection(using).indices.get_alias(index=self._name,**kwargs)
[docs]defdelete_alias(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Delete specific alias. Any additional keyword arguments will be passed to ``OpenSearch.indices.delete_alias`` unchanged. """returnself._get_connection(using).indices.delete_alias(index=self._name,**kwargs)
[docs]defget_settings(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Retrieve settings for the index. Any additional keyword arguments will be passed to ``OpenSearch.indices.get_settings`` unchanged. """returnself._get_connection(using).indices.get_settings(index=self._name,**kwargs)
[docs]defput_settings(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Change specific index level settings in real time. Any additional keyword arguments will be passed to ``OpenSearch.indices.put_settings`` unchanged. """returnself._get_connection(using).indices.put_settings(index=self._name,**kwargs)
[docs]defstats(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Retrieve statistics on different operations happening on the index. Any additional keyword arguments will be passed to ``OpenSearch.indices.stats`` unchanged. """returnself._get_connection(using).indices.stats(index=self._name,**kwargs)
[docs]defsegments(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Provide low level segments information that a Lucene index (shard level) is built with. Any additional keyword arguments will be passed to ``OpenSearch.indices.segments`` unchanged. """returnself._get_connection(using).indices.segments(index=self._name,**kwargs)
[docs]defvalidate_query(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Validate a potentially expensive query without executing it. Any additional keyword arguments will be passed to ``OpenSearch.indices.validate_query`` unchanged. """returnself._get_connection(using).indices.validate_query(index=self._name,**kwargs)
[docs]defclear_cache(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Clear all caches or specific cached associated with the index. Any additional keyword arguments will be passed to ``OpenSearch.indices.clear_cache`` unchanged. """returnself._get_connection(using).indices.clear_cache(index=self._name,**kwargs)
[docs]defrecovery(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" The indices recovery API provides insight into on-going shard recoveries for the index. Any additional keyword arguments will be passed to ``OpenSearch.indices.recovery`` unchanged. """returnself._get_connection(using).indices.recovery(index=self._name,**kwargs)
[docs]defupgrade(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Upgrade the index to the latest format. Any additional keyword arguments will be passed to ``OpenSearch.indices.upgrade`` unchanged. """returnself._get_connection(using).indices.upgrade(index=self._name,**kwargs)
[docs]defget_upgrade(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Monitor how much of the index is upgraded. Any additional keyword arguments will be passed to ``OpenSearch.indices.get_upgrade`` unchanged. """returnself._get_connection(using).indices.get_upgrade(index=self._name,**kwargs)
[docs]defshard_stores(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" Provides store information for shard copies of the index. Store information reports on which nodes shard copies exist, the shard copy version, indicating how recent they are, and any exceptions encountered while opening the shard index or from earlier engine failure. Any additional keyword arguments will be passed to ``OpenSearch.indices.shard_stores`` unchanged. """returnself._get_connection(using).indices.shard_stores(index=self._name,**kwargs)
[docs]defforcemerge(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" The force merge API allows to force merging of the index through an API. The merge relates to the number of segments a Lucene index holds within each shard. The force merge operation allows to reduce the number of segments by merging them. This call will block until the merge is complete. If the http connection is lost, the request will continue in the background, and any new requests will block until the previous force merge is complete. Any additional keyword arguments will be passed to ``OpenSearch.indices.forcemerge`` unchanged. """returnself._get_connection(using).indices.forcemerge(index=self._name,**kwargs)
[docs]defshrink(self,using:Optional[OpenSearch]=None,**kwargs:Any)->Any:""" The shrink index API allows you to shrink an existing index into a new index with fewer primary shards. The number of primary shards in the target index must be a factor of the shards in the source index. For example an index with 8 primary shards can be shrunk into 4, 2 or 1 primary shards or an index with 15 primary shards can be shrunk into 5, 3 or 1. If the number of shards in the index is a prime number it can only be shrunk into a single primary shard. Before shrinking, a (primary or replica) copy of every shard in the index must be present on the same node. Any additional keyword arguments will be passed to ``OpenSearch.indices.shrink`` unchanged. """returnself._get_connection(using).indices.shrink(index=self._name,**kwargs)