# SPDX-License-Identifier: Apache-2.0## The OpenSearch Contributors require contributions made to# this file be licensed under the Apache-2.0 license or a# compatible open source license.## Modifications Copyright OpenSearch Contributors. See# GitHub history for details.## Licensed to Elasticsearch B.V. under one or more contributor# license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright# ownership. Elasticsearch B.V. licenses this file to you under# the Apache License, Version 2.0 (the "License"); you may# not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importbase64importcollections.abcascollections_abcimportcopyimportipaddressfromdatetimeimportdate,datetimefromtypingimportAny,Optional,Typefromdateutilimportparser,tzfrom..exceptionsimportValidationExceptionfrom.queryimportQfrom.utilsimportAttrDict,AttrList,DslBasefrom.wrappersimportRangeunicode:Type[str]=strdefconstruct_field(name_or_field:Any,**params:Any)->Any:# {"type": "text", "analyzer": "snowball"}ifisinstance(name_or_field,collections_abc.Mapping):ifparams:raiseValueError("construct_field() cannot accept parameters when passing in a dict.")params=name_or_field.copy()# type: ignoreif"type"notinparams:# inner object can be implicitly definedif"properties"inparams:name="object"else:raiseValueError('construct_field() needs to have a "type" key.')else:name=params.pop("type")returnField.get_dsl_class(name)(**params)# Text()ifisinstance(name_or_field,Field):ifparams:raiseValueError("construct_field() cannot accept parameters ""when passing in a construct_field object.")returnname_or_field# "text", analyzer="snowball"returnField.get_dsl_class(name_or_field)(**params)
[docs]classField(DslBase):_type_name:str="field"_type_shortcut=staticmethod(construct_field)# all fields can be multifields_param_defs={"fields":{"type":"field","hash":True}}name:Optional[str]=None_coerce:bool=Falsedef__init__(self,multi:bool=False,required:bool=False,*args:Any,**kwargs:Any)->None:""" :arg bool multi: specifies whether field can contain array of values :arg bool required: specifies whether field is required """self._multi=multiself._required=requiredsuper().__init__(*args,**kwargs)def__getitem__(self,subfield:Any)->Any:returnself._params.get("fields",{})[subfield]def_serialize(self,data:Any)->Any:returndatadef_deserialize(self,data:Any)->Any:returndatadef_empty(self)->None:returnNonedefempty(self)->Any:ifself._multi:returnAttrList([])returnself._empty()defserialize(self,data:Any)->Any:ifisinstance(data,(list,AttrList,tuple)):returnlist(map(self._serialize,data))returnself._serialize(data)defdeserialize(self,data:Any)->Any:ifisinstance(data,(list,AttrList,tuple)):data=[NoneifdisNoneelseself._deserialize(d)fordindata]returndataifdataisNone:returnNonereturnself._deserialize(data)defclean(self,data:Any)->Any:ifdataisnotNone:data=self.deserialize(data)ifdatain(None,[],{})andself._required:raiseValidationException("Value required for this field.")returndata
classCustomField(Field):name="custom"_coerce=Truedefto_dict(self)->Any:ifisinstance(self.builtin_type,Field):returnself.builtin_type.to_dict()d=super().to_dict()d["type"]=self.builtin_typereturndclassObject(Field):name:Optional[str]="object"_coerce:bool=Truedef__init__(self,doc_class:Any=None,dynamic:Any=None,properties:Any=None,**kwargs:Any,)->None:""" :arg document.InnerDoc doc_class: base doc class that handles mapping. If no `doc_class` is provided, new instance of `InnerDoc` will be created, populated with `properties` and used. Can not be provided together with `properties` :arg dynamic: whether new properties may be created dynamically. Valid values are `True`, `False`, `'strict'`. Can not be provided together with `doc_class`. :arg dict properties: used to construct underlying mapping if no `doc_class` is provided. Can not be provided together with `doc_class` """ifdoc_classand(propertiesordynamicisnotNone):raiseValidationException("doc_class and properties/dynamic should not be provided together")ifdoc_class:self._doc_class:Any=doc_classelse:# FIXME importfromopensearchpy.helpers.documentimportInnerDoc# no InnerDoc subclass, creating one instead...self._doc_class=type("InnerDoc",(InnerDoc,),{})forname,fieldin(propertiesor{}).items():self._doc_class._doc_type.mapping.field(name,field)# type: ignoreifdynamicisnotNone:self._doc_class._doc_type.mapping.meta("dynamic",dynamic)# type: ignoreself._mapping=copy.deepcopy(self._doc_class._doc_type.mapping)super().__init__(**kwargs)def__getitem__(self,name:Any)->Any:returnself._mapping[name]def__contains__(self,name:Any)->bool:returnnameinself._mappingdef_empty(self)->Any:returnself._wrap({})def_wrap(self,data:Any)->Any:returnself._doc_class.from_opensearch(data,data_only=True)defempty(self)->Any:ifself._multi:returnAttrList([],self._wrap)returnself._empty()defto_dict(self)->Any:d=self._mapping.to_dict()d.update(super().to_dict())returnddef_collect_fields(self)->Any:returnself._mapping.properties._collect_fields()def_deserialize(self,data:Any)->Any:# don't wrap already wrapped dataifisinstance(data,self._doc_class):returndataifisinstance(data,AttrDict):data=data._d_returnself._wrap(data)def_serialize(self,data:Any)->Any:ifdataisNone:returnNone# somebody assigned raw dict to the field, we should tolerate thatifisinstance(data,collections_abc.Mapping):returndatareturndata.to_dict()defclean(self,data:Any)->Any:data=super().clean(data)ifdataisNone:returnNoneifisinstance(data,(list,AttrList)):fordindata:d.full_clean()else:data.full_clean()returndatadefupdate(self,other:"Object",update_only:bool=False)->None:ifnotisinstance(other,Object):# not an inner/nested object, no merge possiblereturnself._mapping.update(other._mapping,update_only)classNested(Object):name:Optional[str]="nested"def__init__(self,*args:Any,**kwargs:Any)->None:kwargs.setdefault("multi",True)super().__init__(*args,**kwargs)classDate(Field):name:Optional[str]="date"_coerce:bool=Truedef__init__(self,default_timezone:Any=None,*args:Any,**kwargs:Any)->None:""" :arg default_timezone: timezone that will be automatically used for tz-naive values May be instance of `datetime.tzinfo` or string containing TZ offset """self._default_timezone=default_timezoneifisinstance(self._default_timezone,str):self._default_timezone=tz.gettz(self._default_timezone)super().__init__(*args,**kwargs)def_deserialize(self,data:Any)->Any:ifisinstance(data,str):try:data=parser.parse(data)exceptExceptionase:raiseValidationException(f"Could not parse date from the value ({data!r})",e)ifisinstance(data,datetime):ifself._default_timezoneanddata.tzinfoisNone:data=data.replace(tzinfo=self._default_timezone)returndataifisinstance(data,date):returndataifisinstance(data,int):# Divide by a float to preserve milliseconds on the datetime.returndatetime.utcfromtimestamp(data/1000.0)raiseValidationException(f"Could not parse date from the value ({data!r})")classText(Field):_param_defs={"fields":{"type":"field","hash":True},"analyzer":{"type":"analyzer"},"search_analyzer":{"type":"analyzer"},"search_quote_analyzer":{"type":"analyzer"},}name:Optional[str]="text"classSearchAsYouType(Field):_param_defs={"analyzer":{"type":"analyzer"},"search_analyzer":{"type":"analyzer"},"search_quote_analyzer":{"type":"analyzer"},}name:Optional[str]="search_as_you_type"classKeyword(Field):_param_defs={"fields":{"type":"field","hash":True},"search_analyzer":{"type":"analyzer"},"normalizer":{"type":"normalizer"},}name:Optional[str]="keyword"classConstantKeyword(Keyword):name:Optional[str]="constant_keyword"classBoolean(Field):name:Optional[str]="boolean"_coerce:bool=Truedef_deserialize(self,data:Any)->Any:ifdata=="false":returnFalsereturnbool(data)defclean(self,data:Any)->Any:ifdataisnotNone:data=self.deserialize(data)ifdataisNoneandself._required:raiseValidationException("Value required for this field.")returndataclassFloat(Field):name:Optional[str]="float"_coerce:bool=Truedef_deserialize(self,data:Any)->Any:returnfloat(data)classDenseVector(Float):name:Optional[str]="dense_vector"def__init__(self,dims:Any,**kwargs:Any)->None:kwargs["multi"]=Truesuper().__init__(dims=dims,**kwargs)classSparseVector(Field):name:Optional[str]="sparse_vector"classHalfFloat(Float):name:Optional[str]="half_float"classScaledFloat(Float):name:Optional[str]="scaled_float"def__init__(self,scaling_factor:Any,*args:Any,**kwargs:Any)->None:super().__init__(scaling_factor=scaling_factor,*args,**kwargs)classDouble(Float):name:Optional[str]="double"classRankFeature(Float):name:Optional[str]="rank_feature"classRankFeatures(Field):name:Optional[str]="rank_features"classInteger(Field):name:Optional[str]="integer"_coerce:bool=Truedef_deserialize(self,data:Any)->Any:returnint(data)classByte(Integer):name:Optional[str]="byte"classShort(Integer):name:Optional[str]="short"classLong(Integer):name:Optional[str]="long"classIp(Field):name:Optional[str]="ip"_coerce:bool=Truedef_deserialize(self,data:Any)->Any:# the ipaddress library for pypy only accepts unicode.returnipaddress.ip_address(unicode(data))def_serialize(self,data:Any)->Any:ifdataisNone:returnNonereturnstr(data)classBinary(Field):name:Optional[str]="binary"_coerce:bool=Truedefclean(self,data:Any)->Any:# Binary fields are opaque, so there's not much cleaning# that can be done.returndatadef_deserialize(self,data:Any)->Any:returnbase64.b64decode(data)def_serialize(self,data:Any)->Any:ifdataisNone:returnNonereturnbase64.b64encode(data).decode()classGeoPoint(Field):name:Optional[str]="geo_point"classGeoShape(Field):name:Optional[str]="geo_shape"classCompletion(Field):_param_defs={"analyzer":{"type":"analyzer"},"search_analyzer":{"type":"analyzer"},}name="completion"classPercolator(Field):name:Optional[str]="percolator"_coerce:bool=Truedef_deserialize(self,data:Any)->Any:returnQ(data)def_serialize(self,data:Any)->Any:ifdataisNone:returnNonereturndata.to_dict()classRangeField(Field):_coerce:bool=True_core_field:Any=Nonedef_deserialize(self,data:Any)->Any:ifisinstance(data,Range):returndatadata={k:self._core_field.deserialize(v)fork,vindata.items()}returnRange(data)def_serialize(self,data:Any)->Any:ifdataisNone:returnNoneifnotisinstance(data,collections_abc.Mapping):data=data.to_dict()return{k:self._core_field.serialize(v)fork,vindata.items()}classIntegerRange(RangeField):name:Optional[str]="integer_range"_core_field:Any=Integer()classFloatRange(RangeField):name:Optional[str]="float_range"_core_field:Any=Float()classLongRange(RangeField):name:Optional[str]="long_range"_core_field:Any=Long()classDoubleRange(RangeField):name:Optional[str]="double_range"_core_field:Any=Double()classDateRange(RangeField):name:Optional[str]="date_range"_core_field:Any=Date()classIpRange(Field):# not a RangeField since ip_range supports CIDR rangesname:Optional[str]="ip_range"classJoin(Field):name:Optional[str]="join"classTokenCount(Field):name:Optional[str]="token_count"classMurmur3(Field):name:Optional[str]="murmur3"