Source code for opensearchpy.helpers.faceted_search

# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
#
#  Licensed to Elasticsearch B.V. under one or more contributor
#  license agreements. See the NOTICE file distributed with
#  this work for additional information regarding copyright
#  ownership. Elasticsearch B.V. licenses this file to you under
#  the Apache License, Version 2.0 (the "License"); you may
#  not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
# 	http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing,
#  software distributed under the License is distributed on an
#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#  KIND, either express or implied.  See the License for the
#  specific language governing permissions and limitations
#  under the License.
from datetime import datetime, timedelta
from typing import Any, Optional

from opensearchpy.helpers.aggs import A

from .query import MatchAll, Nested, Range, Terms
from .response import Response
from .search import Search
from .utils import AttrDict

__all__ = [
    "FacetedSearch",
    "HistogramFacet",
    "TermsFacet",
    "DateHistogramFacet",
    "RangeFacet",
    "NestedFacet",
]


class Facet:
    """
    A facet on faceted search. Wraps and aggregation and provides functionality
    to create a filter for selected values and return a list of facet values
    from the result of the aggregation.
    """

    agg_type: Optional[str] = None

    def __init__(
        self, metric: Any = None, metric_sort: str = "desc", **kwargs: Any
    ) -> None:
        self.filter_values = ()
        self._params = kwargs
        self._metric = metric
        if metric and metric_sort:
            self._params["order"] = {"metric": metric_sort}

    def get_aggregation(self) -> Any:
        """
        Return the aggregation object.
        """
        agg = A(self.agg_type, **self._params)
        if self._metric:
            agg.metric("metric", self._metric)
        return agg

    def add_filter(self, filter_values: Any) -> Any:
        """
        Construct a filter.
        """
        if not filter_values:
            return

        f = self.get_value_filter(filter_values[0])
        for v in filter_values[1:]:
            f |= self.get_value_filter(v)
        return f

    def get_value_filter(self, filter_value: Any) -> Any:
        return None

    def is_filtered(self, key: Any, filter_values: Any) -> bool:
        """
        Is a filter active on the given key.
        """
        return key in filter_values

    def get_value(self, bucket: Any) -> Any:
        """
        return a value representing a bucket. Its key as default.
        """
        return bucket["key"]

    def get_metric(self, bucket: Any) -> Any:
        """
        Return a metric, by default doc_count for a bucket.
        """
        if self._metric:
            return bucket["metric"]["value"]
        return bucket["doc_count"]

    def get_values(self, data: Any, filter_values: Any) -> Any:
        """
        Turn the raw bucket data into a list of tuples containing the key,
        number of documents and a flag indicating whether this value has been
        selected or not.
        """
        out = []
        for bucket in data.buckets:
            key = self.get_value(bucket)
            out.append(
                (key, self.get_metric(bucket), self.is_filtered(key, filter_values))
            )
        return out


class TermsFacet(Facet):
    agg_type: Optional[str] = "terms"

    def add_filter(self, filter_values: Any) -> Any:
        """Create a terms filter instead of bool containing term filters."""
        if filter_values:
            return Terms(
                _expand__to_dot=False, **{self._params["field"]: filter_values}
            )


class RangeFacet(Facet):
    agg_type = "range"

    def _range_to_dict(self, range: Any) -> Any:
        key, range = range
        out = {"key": key}
        if range[0] is not None:
            out["from"] = range[0]
        if range[1] is not None:
            out["to"] = range[1]
        return out

    def __init__(self, ranges: Any, **kwargs: Any) -> None:
        super().__init__(**kwargs)
        self._params["ranges"] = list(map(self._range_to_dict, ranges))
        self._params["keyed"] = False
        self._ranges = dict(ranges)

    def get_value_filter(self, filter_value: Any) -> Any:
        f, t = self._ranges[filter_value]
        limits = {}
        if f is not None:
            limits["gte"] = f
        if t is not None:
            limits["lt"] = t

        return Range(_expand__to_dot=False, **{self._params["field"]: limits})


class HistogramFacet(Facet):
    agg_type = "histogram"

    def get_value_filter(self, filter_value: Any) -> Any:
        return Range(
            _expand__to_dot=False,
            **{
                self._params["field"]: {
                    "gte": filter_value,
                    "lt": filter_value + self._params["interval"],
                }
            }
        )


def _date_interval_year(d: Any) -> Any:
    return d.replace(
        year=d.year + 1, day=(28 if d.month == 2 and d.day == 29 else d.day)
    )


def _date_interval_month(d: Any) -> Any:
    return (d + timedelta(days=32)).replace(day=1)


def _date_interval_week(d: Any) -> Any:
    return d + timedelta(days=7)


def _date_interval_day(d: Any) -> Any:
    return d + timedelta(days=1)


def _date_interval_hour(d: Any) -> Any:
    return d + timedelta(hours=1)


class DateHistogramFacet(Facet):
    agg_type = "date_histogram"

    DATE_INTERVALS = {
        "year": _date_interval_year,
        "1Y": _date_interval_year,
        "month": _date_interval_month,
        "1M": _date_interval_month,
        "week": _date_interval_week,
        "1w": _date_interval_week,
        "day": _date_interval_day,
        "1d": _date_interval_day,
        "hour": _date_interval_hour,
        "1h": _date_interval_hour,
    }

    def __init__(self, **kwargs: Any) -> None:
        kwargs.setdefault("min_doc_count", 0)
        super().__init__(**kwargs)

    def get_value(self, bucket: Any) -> Any:
        if not isinstance(bucket["key"], datetime):
            # OpenSearch returns key=None instead of 0 for date 1970-01-01,
            # so we need to set key to 0 to avoid TypeError exception
            if bucket["key"] is None:
                bucket["key"] = 0
            # Preserve milliseconds in the datetime
            return datetime.utcfromtimestamp(int(bucket["key"]) / 1000.0)  # type: ignore
        else:
            return bucket["key"]

    def get_value_filter(self, filter_value: Any) -> Any:
        for interval_type in ("calendar_interval", "fixed_interval"):
            if interval_type in self._params:
                break
        else:
            interval_type = "interval"

        return Range(
            _expand__to_dot=False,
            **{
                self._params["field"]: {
                    "gte": filter_value,
                    "lt": self.DATE_INTERVALS[self._params[interval_type]](
                        filter_value
                    ),
                }
            }
        )


class NestedFacet(Facet):
    agg_type = "nested"

    def __init__(self, path: Any, nested_facet: Any) -> None:
        self._path = path
        self._inner = nested_facet
        super().__init__(path=path, aggs={"inner": nested_facet.get_aggregation()})

    def get_values(self, data: Any, filter_values: Any) -> Any:
        return self._inner.get_values(data.inner, filter_values)

    def add_filter(self, filter_values: Any) -> Any:
        inner_q = self._inner.add_filter(filter_values)
        if inner_q:
            return Nested(path=self._path, query=inner_q)


class FacetedResponse(Response):
    @property
    def query_string(self) -> Any:
        return self._faceted_search._query

    @property
    def facets(self) -> Any:
        if not hasattr(self, "_facets"):
            super(AttrDict, self).__setattr__("_facets", AttrDict({}))
            for name, facet in self._faceted_search.facets.items():
                self._facets[name] = facet.get_values(
                    getattr(getattr(self.aggregations, "_filter_" + name), name),
                    self._faceted_search.filter_values.get(name, ()),
                )
        return self._facets


[docs]class FacetedSearch: """ Abstraction for creating faceted navigation searches that takes care of composing the queries, aggregations and filters as needed as well as presenting the results in an easy-to-consume fashion:: class BlogSearch(FacetedSearch): index = 'blogs' doc_types = [Blog, Post] fields = ['title^5', 'category', 'description', 'body'] facets = { 'type': TermsFacet(field='_type'), 'category': TermsFacet(field='category'), 'weekly_posts': DateHistogramFacet(field='published_from', interval='week') } def search(self): ' Override search to add your own filters ' s = super(BlogSearch, self).search() return s.filter('term', published=True) # when using: blog_search = BlogSearch("web framework", filters={"category": "python"}) # supports pagination blog_search[10:20] response = blog_search.execute() # easy access to aggregation results: for category, hit_count, is_selected in response.facets.category: print( "Category %s has %d hits%s." % ( category, hit_count, ' and is chosen' if is_selected else '' ) ) """ index: Any = None doc_types: Any = None fields: Any = None facets: Any = {} using = "default" def __init__(self, query: Any = None, filters: Any = {}, sort: Any = ()) -> None: """ :arg query: the text to search for :arg filters: facet values to filter :arg sort: sort information to be passed to :class:`~opensearchpy.Search` """ self._query = query self._filters: Any = {} self._sort = sort self.filter_values: Any = {} for name, value in filters.items(): self.add_filter(name, value) self._s = self.build_search() def count(self) -> Any: return self._s.count() def __getitem__(self, k: Any) -> Any: self._s = self._s[k] return self def __iter__(self) -> Any: return iter(self._s)
[docs] def add_filter(self, name: Any, filter_values: Any) -> Any: """ Add a filter for a facet. """ # normalize the value into a list if not isinstance(filter_values, (tuple, list)): if filter_values is None: return filter_values = [ filter_values, ] # remember the filter values for use in FacetedResponse self.filter_values[name] = filter_values # get the filter from the facet f = self.facets[name].add_filter(filter_values) if f is None: return self._filters[name] = f
[docs] def search(self) -> Any: """ Returns the base Search object to which the facets are added. You can customize the query by overriding this method and returning a modified search object. """ s = Search(doc_type=self.doc_types, index=self.index, using=self.using) return s.response_class(FacetedResponse)
[docs] def query(self, search: Any, query: Any) -> Any: """ Add query part to ``search``. Override this if you wish to customize the query used. """ if query: if self.fields: return search.query("multi_match", fields=self.fields, query=query) else: return search.query("multi_match", query=query) return search
[docs] def aggregate(self, search: Any) -> Any: """ Add aggregations representing the facets selected, including potential filters. """ for f, facet in self.facets.items(): agg = facet.get_aggregation() agg_filter = MatchAll() for field, filter in self._filters.items(): if f == field: continue agg_filter &= filter search.aggs.bucket("_filter_" + f, "filter", filter=agg_filter).bucket( f, agg )
[docs] def filter(self, search: Any) -> Any: """ Add a ``post_filter`` to the search request narrowing the results based on the facet filters. """ if not self._filters: return search post_filter = MatchAll() for f in self._filters.values(): post_filter &= f return search.post_filter(post_filter)
[docs] def highlight(self, search: Any) -> Any: """ Add highlighting for all the fields """ return search.highlight( *(f if "^" not in f else f.split("^", 1)[0] for f in self.fields) )
[docs] def sort(self, search: Any) -> Any: """ Add sorting information to the request. """ if self._sort: search = search.sort(*self._sort) return search
[docs] def execute(self) -> Any: """ Execute the search and return the response. """ r = self._s.execute() r._faceted_search = self return r