# SPDX-License-Identifier: Apache-2.0## The OpenSearch Contributors require contributions made to# this file be licensed under the Apache-2.0 license or a# compatible open source license.## Modifications Copyright OpenSearch Contributors. See# GitHub history for details.## Licensed to Elasticsearch B.V. under one or more contributor# license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright# ownership. Elasticsearch B.V. licenses this file to you under# the Apache License, Version 2.0 (the "License"); you may# not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importcollections.abcascollections_abcfromfnmatchimportfnmatchfromtypingimportAny,Tuple,Typefromopensearchpy.connection.connectionsimportget_connectionfromopensearchpy.exceptionsimportNotFoundError,RequestErrorfrom..exceptionsimportIllegalOperation,ValidationExceptionfrom.fieldimportFieldfrom.indeximportIndexfrom.mappingimportMappingfrom.searchimportSearchfrom.utilsimportDOC_META_FIELDS,META_FIELDS,ObjectBase,mergeclassMetaField:def__init__(self,*args:Any,**kwargs:Any)->None:self.args,self.kwargs=args,kwargsclassDocumentMeta(type):def__new__(cls:Any,name:str,bases:Tuple[Type[ObjectBase]],attrs:Any,)->Any:# DocumentMeta filters attrs in placeattrs["_doc_type"]=DocumentOptions(name,bases,attrs)returnsuper().__new__(cls,name,bases,attrs)classIndexMeta(DocumentMeta):# global flag to guard us from associating an Index with the base Document# class, only user defined subclasses should have an _index attr_document_initialized=Falsedef__new__(cls:Any,name:str,bases:Tuple[Type[ObjectBase]],attrs:Any,)->Any:new_cls=super().__new__(cls,name,bases,attrs)ifcls._document_initialized:index_opts=attrs.pop("Index",None)index=cls.construct_index(index_opts,bases)new_cls._index=indexindex.document(new_cls)cls._document_initialized=Truereturnnew_cls@classmethoddefconstruct_index(cls,opts:Any,bases:Any)->Any:ifoptsisNone:forbinbases:ifhasattr(b,"_index"):returnb._index# Set None as Index name so it will set _all while making the queryreturnIndex(name=None)i=Index(getattr(opts,"name","*"),using=getattr(opts,"using","default"))i.settings(**getattr(opts,"settings",{}))i.aliases(**getattr(opts,"aliases",{}))foraingetattr(opts,"analyzers",()):i.analyzer(a)returniclassDocumentOptions:def__init__(self,name:str,bases:Tuple[Type[ObjectBase]],attrs:Any,)->None:meta=attrs.pop("Meta",None)# create the mapping instanceself.mapping=getattr(meta,"mapping",Mapping())# register all declared fields into the mappingforname,valueinlist(attrs.items()):ifisinstance(value,Field):self.mapping.field(name,value)delattrs[name]# add all the mappings for meta fieldsfornameindir(meta):ifisinstance(getattr(meta,name,None),MetaField):params=getattr(meta,name)self.mapping.meta(name,*params.args,**params.kwargs)# document inheritance - include the fields from parents' mappingsforbinbases:ifhasattr(b,"_doc_type")andhasattr(b._doc_type,"mapping"):self.mapping.update(b._doc_type.mapping,update_only=True)@propertydefname(self)->Any:returnself.mapping.properties.nameclassInnerDoc(ObjectBase,metaclass=DocumentMeta):""" Common class for inner documents like Object or Nested """@classmethoddeffrom_opensearch(cls,data:Any,data_only:bool=False)->Any:ifdata_only:data={"_source":data}returnsuper().from_opensearch(data)
[docs]classDocument(ObjectBase,metaclass=IndexMeta):""" Model-like class for persisting documents in opensearch. """@classmethoddef_matches(cls:Any,hit:Any)->Any:ifcls._index._nameisNone:returnTruereturnfnmatch(hit.get("_index",""),cls._index._name)@classmethoddef_get_using(cls:Any,using:Any=None)->Any:returnusingorcls._index._using@classmethoddef_get_connection(cls,using:Any=None)->Any:returnget_connection(cls._get_using(using))@classmethoddef_default_index(cls:Any,index:Any=None)->Any:returnindexorcls._index._name
[docs]@classmethoddefinit(cls:Any,index:Any=None,using:Any=None)->None:""" Create the index and populate the mappings in opensearch. """i=cls._indexifindex:i=i.clone(name=index)i.save(using=using)
def_get_index(self,index:Any=None,required:bool=True)->Any:ifindexisNone:index=getattr(self.meta,"index",None)ifindexisNone:index=getattr(self._index,"_name",None)ifindexisNoneandrequired:raiseValidationException("No index")ifindexand"*"inindex:raiseValidationException("You cannot write to a wildcard index.")returnindex
[docs]@classmethoddefsearch(cls,using:Any=None,index:Any=None)->Any:""" Create an :class:`~opensearchpy.Search` instance that will search over this ``Document``. """returnSearch(using=cls._get_using(using),index=cls._default_index(index),doc_type=[cls])
[docs]@classmethoddefget(cls:Any,id:Any,using:Any=None,index:Any=None,**kwargs:Any)->Any:# type: ignore""" Retrieve a single document from opensearch using its ``id``. :arg id: ``id`` of the document to be retrieved :arg index: opensearch index to use, if the ``Document`` is associated with an index this can be omitted. :arg using: connection alias to use, defaults to ``'default'`` Any additional keyword arguments will be passed to ``OpenSearch.get`` unchanged. """opensearch=cls._get_connection(using)doc=opensearch.get(index=cls._default_index(index),id=id,**kwargs)ifnotdoc.get("found",False):returnNonereturncls.from_opensearch(doc)
[docs]@classmethoddefexists(cls,id:Any,using:Any=None,index:Any=None,**kwargs:Any)->Any:""" check if exists a single document from opensearch using its ``id``. :arg id: ``id`` of the document to check if exists :arg index: opensearch index to use, if the ``Document`` is associated with an index this can be omitted. :arg using: connection alias to use, defaults to ``'default'`` Any additional keyword arguments will be passed to ``OpenSearch.exists`` unchanged. """opensearch=cls._get_connection(using)returnopensearch.exists(index=cls._default_index(index),id=id,**kwargs)
[docs]@classmethoddefmget(cls,docs:Any,using:Any=None,index:Any=None,raise_on_error:bool=True,missing:str="none",**kwargs:Any,)->Any:""" Retrieve multiple document by their ``id``'s. Returns a list of instances in the same order as requested. :arg docs: list of ``id``'s of the documents to be retrieved or a list of document specifications as per https://opensearch.org/docs/latest/opensearch/rest-api/document-apis/multi-get/ :arg index: opensearch index to use, if the ``Document`` is associated with an index this can be omitted. :arg using: connection alias to use, defaults to ``'default'`` :arg missing: what to do when one of the documents requested is not found. Valid options are ``'none'`` (use ``None``), ``'raise'`` (raise ``NotFoundError``) or ``'skip'`` (ignore the missing document). Any additional keyword arguments will be passed to ``OpenSearch.mget`` unchanged. """ifmissingnotin("raise","skip","none"):raiseValueError("'missing' must be 'raise', 'skip', or 'none'.")opensearch=cls._get_connection(using)body={"docs":[docifisinstance(doc,collections_abc.Mapping)else{"_id":doc}fordocindocs]}results=opensearch.mget(body,index=cls._default_index(index),**kwargs)objs:Any=[]error_docs:Any=[]missing_docs:Any=[]fordocinresults["docs"]:ifdoc.get("found"):iferror_docsormissing_docs:# We're going to raise an exception anyway, so avoid an# expensive call to cls.from_opensearch().continueobjs.append(cls.from_opensearch(doc))elifdoc.get("error"):ifraise_on_error:error_docs.append(doc)ifmissing=="none":objs.append(None)# The doc didn't cause an error, but the doc also wasn't found.elifmissing=="raise":missing_docs.append(doc)elifmissing=="none":objs.append(None)iferror_docs:error_ids=[doc["_id"]fordocinerror_docs]message="Required routing not provided for documents %s."message%=", ".join(error_ids)raiseRequestError(400,message,error_docs)ifmissing_docs:missing_ids=[doc["_id"]fordocinmissing_docs]message=f"Documents {', '.join(missing_ids)} not found."raiseNotFoundError(404,message,{"docs":missing_docs})returnobjs
[docs]defdelete(self,using:Any=None,index:Any=None,**kwargs:Any)->Any:""" Delete the instance in opensearch. :arg index: opensearch index to use, if the ``Document`` is associated with an index this can be omitted. :arg using: connection alias to use, defaults to ``'default'`` Any additional keyword arguments will be passed to ``OpenSearch.delete`` unchanged. """opensearch=self._get_connection(using)# extract routing etc from metadoc_meta={k:self.meta[k]forkinDOC_META_FIELDSifkinself.meta}# Optimistic concurrency controlif"seq_no"inself.metaand"primary_term"inself.meta:doc_meta["if_seq_no"]=self.meta["seq_no"]doc_meta["if_primary_term"]=self.meta["primary_term"]doc_meta.update(kwargs)opensearch.delete(index=self._get_index(index),**doc_meta)
[docs]defto_dict(self,include_meta:bool=False,skip_empty:bool=True)->Any:# type: ignore""" Serialize the instance into a dictionary so that it can be saved in opensearch. :arg include_meta: if set to ``True`` will include all the metadata (``_index``, ``_id`` etc). Otherwise just the document's data is serialized. This is useful when passing multiple instances into ``opensearchpy.helpers.bulk``. :arg skip_empty: if set to ``False`` will cause empty values (``None``, ``[]``, ``{}``) to be left on the document. Those values will be stripped out otherwise as they make no difference in opensearch. """d=super().to_dict(skip_empty=skip_empty)ifnotinclude_meta:returndmeta={"_"+k:self.meta[k]forkinDOC_META_FIELDSifkinself.meta}# in case of to_dict include the index unlike save/update/deleteindex=self._get_index(required=False)ifindexisnotNone:meta["_index"]=indexmeta["_source"]=dreturnmeta
[docs]defupdate(self,using:Any=None,index:Any=None,detect_noop:bool=True,doc_as_upsert:bool=False,refresh:bool=False,retry_on_conflict:int=0,script:Any=None,script_id:Any=None,scripted_upsert:bool=False,upsert:Any=None,return_doc_meta:bool=False,**fields:Any,)->Any:""" Partial update of the document, specify fields you wish to update and both the instance and the document in opensearch will be updated:: doc = MyDocument(title='Document Title!') doc.save() doc.update(title='New Document Title!') :arg index: opensearch index to use, if the ``Document`` is associated with an index this can be omitted. :arg using: connection alias to use, defaults to ``'default'`` :arg detect_noop: Set to ``False`` to disable noop detection. :arg refresh: Control when the changes made by this request are visible to search. Set to ``True`` for immediate effect. :arg retry_on_conflict: In between the get and indexing phases of the update, it is possible that another process might have already updated the same document. By default, the update will fail with a version conflict exception. The retry_on_conflict parameter controls how many times to retry the update before finally throwing an exception. :arg doc_as_upsert: Instead of sending a partial doc plus an upsert doc, setting doc_as_upsert to true will use the contents of doc as the upsert value :arg return_doc_meta: set to ``True`` to return all metadata from the index API call instead of only the operation result :return operation result noop/updated """body:Any={"doc_as_upsert":doc_as_upsert,"detect_noop":detect_noop,}# scripted updateifscriptorscript_id:ifupsertisnotNone:body["upsert"]=upsertifscript:script={"source":script}else:script={"id":script_id}script["params"]=fieldsbody["script"]=scriptbody["scripted_upsert"]=scripted_upsert# partial document updateelse:ifnotfields:raiseIllegalOperation("You cannot call update() without updating individual fields or a script. ""If you wish to update the entire object use save().")# update given fields locallymerge(self,fields)# prepare data for OpenSearchvalues=self.to_dict()# if fields were given: partial updatebody["doc"]={k:values.get(k)forkinfields.keys()}# extract routing etc from metadoc_meta={k:self.meta[k]forkinDOC_META_FIELDSifkinself.meta}ifretry_on_conflictisnotNone:doc_meta["retry_on_conflict"]=retry_on_conflict# Optimistic concurrency controlif(retry_on_conflictin(None,0)and"seq_no"inself.metaand"primary_term"inself.meta):doc_meta["if_seq_no"]=self.meta["seq_no"]doc_meta["if_primary_term"]=self.meta["primary_term"]meta=self._get_connection(using).update(index=self._get_index(index),body=body,refresh=refresh,**doc_meta)# update meta information from OpenSearchforkinMETA_FIELDS:if"_"+kinmeta:setattr(self.meta,k,meta["_"+k])returnmetaifreturn_doc_metaelsemeta["result"]
[docs]defsave(self,using:Any=None,index:Any=None,validate:bool=True,skip_empty:bool=True,return_doc_meta:bool=False,**kwargs:Any,)->Any:""" Save the document into opensearch. If the document doesn't exist it is created, it is overwritten otherwise. Returns ``True`` if this operations resulted in new document being created. :arg index: opensearch index to use, if the ``Document`` is associated with an index this can be omitted. :arg using: connection alias to use, defaults to ``'default'`` :arg validate: set to ``False`` to skip validating the document :arg skip_empty: if set to ``False`` will cause empty values (``None``, ``[]``, ``{}``) to be left on the document. Those values will be stripped out otherwise as they make no difference in opensearch. :arg return_doc_meta: set to ``True`` to return all metadata from the update API call instead of only the operation result Any additional keyword arguments will be passed to ``OpenSearch.index`` unchanged. :return operation result created/updated """ifvalidate:self.full_clean()opensearch=self._get_connection(using)# extract routing etc from metadoc_meta={k:self.meta[k]forkinDOC_META_FIELDSifkinself.meta}# Optimistic concurrency controlif"seq_no"inself.metaand"primary_term"inself.meta:doc_meta["if_seq_no"]=self.meta["seq_no"]doc_meta["if_primary_term"]=self.meta["primary_term"]doc_meta.update(kwargs)meta=opensearch.index(index=self._get_index(index),body=self.to_dict(skip_empty=skip_empty),**doc_meta,)# update meta information from OpenSearchforkinMETA_FIELDS:if"_"+kinmeta:setattr(self.meta,k,meta["_"+k])returnmetaifreturn_doc_metaelsemeta["result"]