# SPDX-License-Identifier: Apache-2.0## The OpenSearch Contributors require contributions made to# this file be licensed under the Apache-2.0 license or a# compatible open source license.## Modifications Copyright OpenSearch Contributors. See# GitHub history for details.## Licensed to Elasticsearch B.V. under one or more contributor# license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright# ownership. Elasticsearch B.V. licenses this file to you under# the Apache License, Version 2.0 (the "License"); you may# not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importcollections.abcascollections_abcfromitertoolsimportchainfromtypingimportAnyfromopensearchpy.connection.connectionsimportget_connectionfromopensearchpy.helpers.fieldimportNested,Text,construct_fieldfrom.utilsimportDslBaseMETA_FIELDS=frozenset(("dynamic","transform","dynamic_date_formats","date_detection","numeric_detection","dynamic_templates","enabled",))classProperties(DslBase):name="properties"_param_defs={"properties":{"type":"field","hash":True}}def__init__(self)->None:super().__init__()def__repr__(self)->str:return"Properties()"def__getitem__(self,name:Any)->Any:returnself.properties[name]def__contains__(self,name:Any)->bool:returnnameinself.propertiesdefto_dict(self)->Any:returnsuper().to_dict()["properties"]deffield(self,name:Any,*args:Any,**kwargs:Any)->"Properties":self.properties[name]=construct_field(*args,**kwargs)returnselfdef_collect_fields(self)->Any:"""Iterate over all Field objects within, including multi fields."""forfinself.properties.to_dict().values():yieldf# multi fieldsifhasattr(f,"fields"):yield fromf.fields.to_dict().values()# nested and inner objectsifhasattr(f,"_collect_fields"):yield fromf._collect_fields()defupdate(self,other_object:Any)->None:ifnothasattr(other_object,"properties"):# not an inner/nested object, no merge possiblereturnour,other=self.properties,other_object.propertiesfornameinother:ifnameinour:ifhasattr(our[name],"update"):our[name].update(other[name])continueour[name]=other[name]
def_clone(self)->Any:m=Mapping()m.properties._params=self.properties._params.copy()returnm@classmethoddeffrom_opensearch(cls,index:Any,using:str="default")->Any:m=cls()m.update_from_opensearch(index,using)returnmdefresolve_nested(self,field_path:Any)->Any:field=selfnested=[]parts=field_path.split(".")fori,stepinenumerate(parts):try:field=field[step]exceptKeyError:return(),Noneifisinstance(field,Nested):nested.append(".".join(parts[:i+1]))returnnested,fielddefresolve_field(self,field_path:Any)->Any:field=selfforstepinfield_path.split("."):try:field=field[step]exceptKeyError:returnNonereturnfielddef_collect_analysis(self)->Any:analysis:Any={}fields:Any=[]if"_all"inself._meta:fields.append(Text(**self._meta["_all"]))forfinchain(fields,self.properties._collect_fields()):foranalyzer_namein("analyzer","normalizer","search_analyzer","search_quote_analyzer",):ifnothasattr(f,analyzer_name):continueanalyzer=getattr(f,analyzer_name)d=analyzer.get_analysis_definition()# empty custom analyzer, probably already defined out of our controlifnotd:continue# merge the definition# TODO: conflict detection/resolutionforkeyind:analysis.setdefault(key,{}).update(d[key])returnanalysisdefsave(self,index:Any,using:str="default")->Any:fromopensearchpy.helpers.indeximportIndexindex=Index(index,using=using)index.mapping(self)returnindex.save()defupdate_from_opensearch(self,index:Any,using:str="default")->None:opensearch=get_connection(using)raw=opensearch.indices.get_mapping(index=index)_,raw=raw.popitem()self._update_from_dict(raw["mappings"])def_update_from_dict(self,raw:Any)->None:forname,definitioninraw.get("properties",{}).items():self.field(name,definition)# metadata like _all etcforname,valueinraw.items():ifname!="properties":ifisinstance(value,collections_abc.Mapping):self.meta(name,**value)else:self.meta(name,value)defupdate(self,mapping:Any,update_only:bool=False)->None:fornameinmapping:ifupdate_onlyandnameinself:# nested and inner objects, merge recursivelyifhasattr(self[name],"update"):# FIXME only merge subfields, not the settingsself[name].update(mapping[name],update_only)continueself.field(name,mapping[name])ifupdate_only:fornameinmapping._meta:ifnamenotinself._meta:self._meta[name]=mapping._meta[name]else:self._meta.update(mapping._meta)def__contains__(self,name:Any)->Any:returnnameinself.properties.propertiesdef__getitem__(self,name:Any)->Any:returnself.properties.properties[name]def__iter__(self)->Any:returniter(self.properties.properties)deffield(self,*args:Any,**kwargs:Any)->"Mapping":self.properties.field(*args,**kwargs)returnselfdefmeta(self,name:Any,params:Any=None,**kwargs:Any)->"Mapping":ifnotname.startswith("_")andnamenotinMETA_FIELDS:name="_"+nameifparamsandkwargs:raiseValueError("Meta configs cannot have both value and a dictionary.")self._meta[name]=kwargsifparamsisNoneelseparamsreturnselfdefto_dict(self)->Any:meta=self._meta# hard coded serialization of analyzers in _allif"_all"inmeta:meta=meta.copy()_all=meta["_all"]=meta["_all"].copy()forfin("analyzer","search_analyzer","search_quote_analyzer"):ifhasattr(_all.get(f,None),"to_dict"):_all[f]=_all[f].to_dict()meta.update(self.properties.to_dict())returnmeta