Source code for opensearchpy.helpers.faceted_search
# SPDX-License-Identifier: Apache-2.0## The OpenSearch Contributors require contributions made to# this file be licensed under the Apache-2.0 license or a# compatible open source license.## Modifications Copyright OpenSearch Contributors. See# GitHub history for details.## Licensed to Elasticsearch B.V. under one or more contributor# license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright# ownership. Elasticsearch B.V. licenses this file to you under# the Apache License, Version 2.0 (the "License"); you may# not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromdatetimeimportdatetime,timedeltafromtypingimportAny,Optionalfromopensearchpy.helpers.aggsimportAfrom.queryimportMatchAll,Nested,Range,Termsfrom.responseimportResponsefrom.searchimportSearchfrom.utilsimportAttrDict__all__=["FacetedSearch","HistogramFacet","TermsFacet","DateHistogramFacet","RangeFacet","NestedFacet",]classFacet:""" A facet on faceted search. Wraps and aggregation and provides functionality to create a filter for selected values and return a list of facet values from the result of the aggregation. """agg_type:Optional[str]=Nonedef__init__(self,metric:Any=None,metric_sort:str="desc",**kwargs:Any)->None:self.filter_values=()self._params=kwargsself._metric=metricifmetricandmetric_sort:self._params["order"]={"metric":metric_sort}defget_aggregation(self)->Any:""" Return the aggregation object. """agg=A(self.agg_type,**self._params)ifself._metric:agg.metric("metric",self._metric)returnaggdefadd_filter(self,filter_values:Any)->Any:""" Construct a filter. """ifnotfilter_values:returnf=self.get_value_filter(filter_values[0])forvinfilter_values[1:]:f|=self.get_value_filter(v)returnfdefget_value_filter(self,filter_value:Any)->Any:returnNonedefis_filtered(self,key:Any,filter_values:Any)->bool:""" Is a filter active on the given key. """returnkeyinfilter_valuesdefget_value(self,bucket:Any)->Any:""" return a value representing a bucket. Its key as default. """returnbucket["key"]defget_metric(self,bucket:Any)->Any:""" Return a metric, by default doc_count for a bucket. """ifself._metric:returnbucket["metric"]["value"]returnbucket["doc_count"]defget_values(self,data:Any,filter_values:Any)->Any:""" Turn the raw bucket data into a list of tuples containing the key, number of documents and a flag indicating whether this value has been selected or not. """out=[]forbucketindata.buckets:key=self.get_value(bucket)out.append((key,self.get_metric(bucket),self.is_filtered(key,filter_values)))returnoutclassTermsFacet(Facet):agg_type:Optional[str]="terms"defadd_filter(self,filter_values:Any)->Any:"""Create a terms filter instead of bool containing term filters."""iffilter_values:returnTerms(_expand__to_dot=False,**{self._params["field"]:filter_values})classRangeFacet(Facet):agg_type="range"def_range_to_dict(self,range:Any)->Any:key,range=rangeout={"key":key}ifrange[0]isnotNone:out["from"]=range[0]ifrange[1]isnotNone:out["to"]=range[1]returnoutdef__init__(self,ranges:Any,**kwargs:Any)->None:super().__init__(**kwargs)self._params["ranges"]=list(map(self._range_to_dict,ranges))self._params["keyed"]=Falseself._ranges=dict(ranges)defget_value_filter(self,filter_value:Any)->Any:f,t=self._ranges[filter_value]limits={}iffisnotNone:limits["gte"]=fiftisnotNone:limits["lt"]=treturnRange(_expand__to_dot=False,**{self._params["field"]:limits})classHistogramFacet(Facet):agg_type="histogram"defget_value_filter(self,filter_value:Any)->Any:returnRange(_expand__to_dot=False,**{self._params["field"]:{"gte":filter_value,"lt":filter_value+self._params["interval"],}})def_date_interval_year(d:Any)->Any:returnd.replace(year=d.year+1,day=(28ifd.month==2andd.day==29elsed.day))def_date_interval_month(d:Any)->Any:return(d+timedelta(days=32)).replace(day=1)def_date_interval_week(d:Any)->Any:returnd+timedelta(days=7)def_date_interval_day(d:Any)->Any:returnd+timedelta(days=1)def_date_interval_hour(d:Any)->Any:returnd+timedelta(hours=1)classDateHistogramFacet(Facet):agg_type="date_histogram"DATE_INTERVALS={"year":_date_interval_year,"1Y":_date_interval_year,"month":_date_interval_month,"1M":_date_interval_month,"week":_date_interval_week,"1w":_date_interval_week,"day":_date_interval_day,"1d":_date_interval_day,"hour":_date_interval_hour,"1h":_date_interval_hour,}def__init__(self,**kwargs:Any)->None:kwargs.setdefault("min_doc_count",0)super().__init__(**kwargs)defget_value(self,bucket:Any)->Any:ifnotisinstance(bucket["key"],datetime):# OpenSearch returns key=None instead of 0 for date 1970-01-01,# so we need to set key to 0 to avoid TypeError exceptionifbucket["key"]isNone:bucket["key"]=0# Preserve milliseconds in the datetimereturndatetime.utcfromtimestamp(int(bucket["key"])/1000.0)# type: ignoreelse:returnbucket["key"]defget_value_filter(self,filter_value:Any)->Any:forinterval_typein("calendar_interval","fixed_interval"):ifinterval_typeinself._params:breakelse:interval_type="interval"returnRange(_expand__to_dot=False,**{self._params["field"]:{"gte":filter_value,"lt":self.DATE_INTERVALS[self._params[interval_type]](filter_value),}})classNestedFacet(Facet):agg_type="nested"def__init__(self,path:Any,nested_facet:Any)->None:self._path=pathself._inner=nested_facetsuper().__init__(path=path,aggs={"inner":nested_facet.get_aggregation()})defget_values(self,data:Any,filter_values:Any)->Any:returnself._inner.get_values(data.inner,filter_values)defadd_filter(self,filter_values:Any)->Any:inner_q=self._inner.add_filter(filter_values)ifinner_q:returnNested(path=self._path,query=inner_q)classFacetedResponse(Response):@propertydefquery_string(self)->Any:returnself._faceted_search._query@propertydeffacets(self)->Any:ifnothasattr(self,"_facets"):super(AttrDict,self).__setattr__("_facets",AttrDict({}))forname,facetinself._faceted_search.facets.items():self._facets[name]=facet.get_values(getattr(getattr(self.aggregations,"_filter_"+name),name),self._faceted_search.filter_values.get(name,()),)returnself._facets
[docs]classFacetedSearch:""" Abstraction for creating faceted navigation searches that takes care of composing the queries, aggregations and filters as needed as well as presenting the results in an easy-to-consume fashion:: class BlogSearch(FacetedSearch): index = 'blogs' doc_types = [Blog, Post] fields = ['title^5', 'category', 'description', 'body'] facets = { 'type': TermsFacet(field='_type'), 'category': TermsFacet(field='category'), 'weekly_posts': DateHistogramFacet(field='published_from', interval='week') } def search(self): ' Override search to add your own filters ' s = super(BlogSearch, self).search() return s.filter('term', published=True) # when using: blog_search = BlogSearch("web framework", filters={"category": "python"}) # supports pagination blog_search[10:20] response = blog_search.execute() # easy access to aggregation results: for category, hit_count, is_selected in response.facets.category: print( "Category %s has %d hits%s." % ( category, hit_count, ' and is chosen' if is_selected else '' ) ) """index:Any=Nonedoc_types:Any=Nonefields:Any=Nonefacets:Any={}using="default"def__init__(self,query:Any=None,filters:Any={},sort:Any=())->None:""" :arg query: the text to search for :arg filters: facet values to filter :arg sort: sort information to be passed to :class:`~opensearchpy.Search` """self._query=queryself._filters:Any={}self._sort=sortself.filter_values:Any={}forname,valueinfilters.items():self.add_filter(name,value)self._s=self.build_search()defcount(self)->Any:returnself._s.count()def__getitem__(self,k:Any)->Any:self._s=self._s[k]returnselfdef__iter__(self)->Any:returniter(self._s)
[docs]defadd_filter(self,name:Any,filter_values:Any)->Any:""" Add a filter for a facet. """# normalize the value into a listifnotisinstance(filter_values,(tuple,list)):iffilter_valuesisNone:returnfilter_values=[filter_values,]# remember the filter values for use in FacetedResponseself.filter_values[name]=filter_values# get the filter from the facetf=self.facets[name].add_filter(filter_values)iffisNone:returnself._filters[name]=f
[docs]defsearch(self)->Any:""" Returns the base Search object to which the facets are added. You can customize the query by overriding this method and returning a modified search object. """s=Search(doc_type=self.doc_types,index=self.index,using=self.using)returns.response_class(FacetedResponse)
[docs]defquery(self,search:Any,query:Any)->Any:""" Add query part to ``search``. Override this if you wish to customize the query used. """ifquery:ifself.fields:returnsearch.query("multi_match",fields=self.fields,query=query)else:returnsearch.query("multi_match",query=query)returnsearch
[docs]defaggregate(self,search:Any)->Any:""" Add aggregations representing the facets selected, including potential filters. """forf,facetinself.facets.items():agg=facet.get_aggregation()agg_filter=MatchAll()forfield,filterinself._filters.items():iff==field:continueagg_filter&=filtersearch.aggs.bucket("_filter_"+f,"filter",filter=agg_filter).bucket(f,agg)
[docs]deffilter(self,search:Any)->Any:""" Add a ``post_filter`` to the search request narrowing the results based on the facet filters. """ifnotself._filters:returnsearchpost_filter=MatchAll()forfinself._filters.values():post_filter&=freturnsearch.post_filter(post_filter)
[docs]defhighlight(self,search:Any)->Any:""" Add highlighting for all the fields """returnsearch.highlight(*(fif"^"notinfelsef.split("^",1)[0]forfinself.fields))
[docs]defsort(self,search:Any)->Any:""" Add sorting information to the request. """ifself._sort:search=search.sort(*self._sort)returnsearch
[docs]defbuild_search(self)->Any:""" Construct the ``Search`` object. """s=self.search()s=self.query(s,self._query)s=self.filter(s)ifself.fields:s=self.highlight(s)s=self.sort(s)self.aggregate(s)returns
[docs]defexecute(self)->Any:""" Execute the search and return the response. """r=self._s.execute()r._faceted_search=selfreturnr