# SPDX-License-Identifier: Apache-2.0## The OpenSearch Contributors require contributions made to# this file be licensed under the Apache-2.0 license or a# compatible open source license.## Modifications Copyright OpenSearch Contributors. See# GitHub history for details.## Licensed to Elasticsearch B.V. under one or more contributor# license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright# ownership. Elasticsearch B.V. licenses this file to you under# the Apache License, Version 2.0 (the "License"); you may# not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importcollections.abcascollections_abcfromtypingimportAny,Optionalfrom.response.aggsimportAggResponse,BucketData,FieldBucketData,TopHitsDatafrom.utilsimportDslBasedefA(# pylint: disable=invalid-namename_or_agg:Any,filter:Any=None,**params:Any)->Any:iffilterisnotNone:ifname_or_agg!="filter":raiseValueError("Aggregation %r doesn't accept positional argument 'filter'."%name_or_agg)params["filter"]=filter# {"terms": {"field": "tags"}, "aggs": {...}}ifisinstance(name_or_agg,collections_abc.Mapping):ifparams:raiseValueError("A() cannot accept parameters when passing in a dict.")# copy to avoid modifying in-placeagg=name_or_agg.copy()# type: ignore# pop out nested aggsaggs=agg.pop("aggs",None)# pop out meta datameta=agg.pop("meta",None)# should be {"terms": {"field": "tags"}}iflen(agg)!=1:raiseValueError('A() can only accept dict with an aggregation ({"terms": {...}}). '"Instead it got (%r)"%name_or_agg)agg_type,params=agg.popitem()ifaggs:params=params.copy()params["aggs"]=aggsifmeta:params=params.copy()params["meta"]=metareturnAgg.get_dsl_class(agg_type)(_expand__to_dot=False,**params)# Terms(...) just return the nested aggelifisinstance(name_or_agg,Agg):ifparams:raiseValueError("A() cannot accept parameters when passing in an Agg object.")returnname_or_agg# "terms", field="tags"returnAgg.get_dsl_class(name_or_agg)(**params)
classAggBase:_param_defs={"aggs":{"type":"agg","hash":True},}def__contains__(self:Any,key:Any)->bool:returnkeyinself._params.get("aggs",{})def__getitem__(self:Any,agg_name:Any)->Any:agg=self._params.setdefault("aggs",{})[agg_name]# propagate KeyError# make sure we're not mutating a shared state - whenever accessing a# bucket, return a shallow copy of it to be safeifisinstance(agg,Bucket):agg=A(agg.name,**agg._params)# be sure to store the copy so any modifications to it will affect usself._params["aggs"][agg_name]=aggreturnaggdef__setitem__(self:Any,agg_name:str,agg:Any)->None:self.aggs[agg_name]=A(agg)def__iter__(self:Any)->Any:returniter(self.aggs)def_agg(self:Any,bucket:Any,name:Any,agg_type:Any,*args:Any,**params:Any)->Any:agg=self[name]=A(agg_type,*args,**params)# For chaining - when creating new buckets return them...ifbucket:returnagg# otherwise return self._base so we can keep chainingelse:returnself._basedefmetric(self:Any,name:Any,agg_type:Any,*args:Any,**params:Any)->Any:returnself._agg(False,name,agg_type,*args,**params)defbucket(self:Any,name:Any,agg_type:Any,*args:Any,**params:Any)->Any:returnself._agg(True,name,agg_type,*args,**params)defpipeline(self:Any,name:Any,agg_type:Any,*args:Any,**params:Any)->Any:returnself._agg(False,name,agg_type,*args,**params)defresult(self:Any,search:Any,data:Any)->Any:returnBucketData(self,search,data)classBucket(AggBase,Agg):def__init__(self,**params:Any)->None:super().__init__(**params)# remember self for chainingself._base=selfdefto_dict(self)->Any:d=super(AggBase,self).to_dict()if"aggs"ind[self.name]:d["aggs"]=d[self.name].pop("aggs")returndclassFilter(Bucket):name:Optional[str]="filter"_param_defs={"filter":{"type":"query"},"aggs":{"type":"agg","hash":True},}def__init__(self,filter:Any=None,**params:Any)->None:iffilterisnotNone:params["filter"]=filtersuper().__init__(**params)defto_dict(self)->Any:d=super().to_dict()d[self.name].update(d[self.name].pop("filter",{}))returndclassPipeline(Agg):pass# bucket aggregationsclassFilters(Bucket):name:str="filters"_param_defs={"filters":{"type":"query","hash":True},"aggs":{"type":"agg","hash":True},}classChildren(Bucket):name="children"classParent(Bucket):name="parent"classDateHistogram(Bucket):name="date_histogram"defresult(self,search:Any,data:Any)->Any:returnFieldBucketData(self,search,data)classAutoDateHistogram(DateHistogram):name="auto_date_histogram"classDateRange(Bucket):name="date_range"classGeoDistance(Bucket):name="geo_distance"classGeohashGrid(Bucket):name="geohash_grid"classGeotileGrid(Bucket):name="geotile_grid"classGeoCentroid(Bucket):name="geo_centroid"classGlobal(Bucket):name="global"classHistogram(Bucket):name="histogram"defresult(self,search:Any,data:Any)->Any:returnFieldBucketData(self,search,data)classIPRange(Bucket):name="ip_range"classMissing(Bucket):name="missing"classNested(Bucket):name="nested"classRange(Bucket):name="range"classRareTerms(Bucket):name="rare_terms"defresult(self,search:Any,data:Any)->Any:returnFieldBucketData(self,search,data)classReverseNested(Bucket):name="reverse_nested"classSignificantTerms(Bucket):name="significant_terms"classSignificantText(Bucket):name="significant_text"classTerms(Bucket):name="terms"defresult(self,search:Any,data:Any)->Any:returnFieldBucketData(self,search,data)classSampler(Bucket):name="sampler"classDiversifiedSampler(Bucket):name="diversified_sampler"classComposite(Bucket):name="composite"_param_defs={"sources":{"type":"agg","hash":True,"multi":True},"aggs":{"type":"agg","hash":True},}classVariableWidthHistogram(Bucket):name="variable_width_histogram"defresult(self,search:Any,data:Any)->Any:returnFieldBucketData(self,search,data)classMultiTerms(Bucket):name="multi_terms"# metric aggregationsclassTopHits(Agg):name="top_hits"defresult(self,search:Any,data:Any)->Any:returnTopHitsData(self,search,data)classAvg(Agg):name="avg"classWeightedAvg(Agg):name="weighted_avg"classCardinality(Agg):name="cardinality"classExtendedStats(Agg):name="extended_stats"classBoxplot(Agg):name="boxplot"classGeoBounds(Agg):name="geo_bounds"classMax(Agg):name="max"classMedianAbsoluteDeviation(Agg):name="median_absolute_deviation"classMin(Agg):name="min"classPercentiles(Agg):name="percentiles"classPercentileRanks(Agg):name="percentile_ranks"classScriptedMetric(Agg):name="scripted_metric"classStats(Agg):name="stats"classSum(Agg):name="sum"classTTest(Agg):name="t_test"classValueCount(Agg):name="value_count"# pipeline aggregationsclassAvgBucket(Pipeline):name="avg_bucket"classBucketScript(Pipeline):name="bucket_script"classBucketSelector(Pipeline):name="bucket_selector"classCumulativeSum(Pipeline):name="cumulative_sum"classCumulativeCardinality(Pipeline):name="cumulative_cardinality"classDerivative(Pipeline):name="derivative"classExtendedStatsBucket(Pipeline):name="extended_stats_bucket"classInference(Pipeline):name="inference"classMaxBucket(Pipeline):name="max_bucket"classMinBucket(Pipeline):name="min_bucket"classMovingFn(Pipeline):name="moving_fn"classMovingAvg(Pipeline):name="moving_avg"classMovingPercentiles(Pipeline):name="moving_percentiles"classNormalize(Pipeline):name="normalize"classPercentilesBucket(Pipeline):name="percentiles_bucket"classSerialDiff(Pipeline):name="serial_diff"classStatsBucket(Pipeline):name="stats_bucket"classSumBucket(Pipeline):name="sum_bucket"classBucketSort(Pipeline):name="bucket_sort"