# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
#
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any
from opensearchpy.connection.connections import get_connection
from ..helpers.query import Bool, Q
from ..helpers.search import ProxyDescriptor, QueryProxy, Request
from .response import UpdateByQueryResponse
from .utils import recursive_to_dict
[docs]class UpdateByQuery(Request):
query = ProxyDescriptor("query")
def __init__(self, **kwargs: Any) -> None:
"""
Update by query request to opensearch.
:arg using: `OpenSearch` instance to use
:arg index: limit the search to index
:arg doc_type: only query this type.
All the parameters supplied (or omitted) at creation type can be later
overridden by methods (`using`, `index` and `doc_type` respectively).
"""
super().__init__(**kwargs)
self._response_class = UpdateByQueryResponse
self._script: Any = {}
self._query_proxy = QueryProxy(self, "query")
def filter(self, *args: Any, **kwargs: Any) -> Any:
return self.query(Bool(filter=[Q(*args, **kwargs)]))
def exclude(self, *args: Any, **kwargs: Any) -> Any:
return self.query(Bool(filter=[~Q(*args, **kwargs)]))
[docs] @classmethod
def from_dict(cls, d: Any) -> Any:
"""
Construct a new `UpdateByQuery` instance from a raw dict containing the search
body. Useful when migrating from raw dictionaries.
Example::
ubq = UpdateByQuery.from_dict({
"query": {
"bool": {
"must": [...]
}
},
"script": {...}
})
ubq = ubq.filter('term', published=True)
"""
u = cls()
u.update_from_dict(d)
return u
[docs] def _clone(self) -> Any:
"""
Return a clone of the current search request. Performs a shallow copy
of all the underlying objects. Used internally by most state modifying
APIs.
"""
ubq = super()._clone()
ubq._response_class = self._response_class
ubq._script = self._script.copy()
ubq.query._proxied = self.query._proxied
return ubq
[docs] def response_class(self, cls: Any) -> Any:
"""
Override the default wrapper used for the response.
"""
ubq = self._clone()
ubq._response_class = cls
return ubq
[docs] def update_from_dict(self, d: Any) -> "UpdateByQuery":
"""
Apply options from a serialized body to the current instance. Modifies
the object in-place. Used mostly by ``from_dict``.
"""
d = d.copy()
if "query" in d:
self.query._proxied = Q(d.pop("query"))
if "script" in d:
self._script = d.pop("script")
self._extra.update(d)
return self
[docs] def script(self, **kwargs: Any) -> Any:
"""
Define update action to take:
Note: the API only accepts a single script, so
calling the script multiple times will overwrite.
Example::
ubq = Search()
ubq = ubq.script(source="ctx._source.likes++"")
ubq = ubq.script(source="ctx._source.likes += params.f"",
lang="expression",
params={'f': 3})
"""
ubq = self._clone()
if ubq._script:
ubq._script = {}
ubq._script.update(kwargs)
return ubq
[docs] def to_dict(self, **kwargs: Any) -> Any:
"""
Serialize the search into the dictionary that will be sent over as the
request'ubq body.
All additional keyword arguments will be included into the dictionary.
"""
d = {}
if self.query:
d["query"] = self.query.to_dict()
if self._script:
d["script"] = self._script
d.update(recursive_to_dict(self._extra))
d.update(recursive_to_dict(kwargs))
return d
[docs] def execute(self) -> Any:
"""
Execute the search and return an instance of ``Response`` wrapping all
the data.
"""
opensearch = get_connection(self._using)
self._response = self._response_class(
self,
opensearch.update_by_query(
index=self._index, body=self.to_dict(), **self._params
),
)
return self._response