# SPDX-License-Identifier: Apache-2.0
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
# Any modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import re
import sys
import warnings
from io import StringIO
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Sequence, Tuple, Union
import numpy as np
import pandas as pd # type: ignore
from pandas.core.common import apply_if_callable, is_bool_indexer # type: ignore
from pandas.core.computation.eval import eval # type: ignore
from pandas.core.dtypes.common import is_list_like # type: ignore
from pandas.core.indexing import check_bool_indexer # type: ignore
from pandas.io.common import _expand_user, stringify_path # type: ignore
from pandas.io.formats import console # type: ignore
from pandas.io.formats import format as fmt
from pandas.io.formats.printing import pprint_thing # type: ignore
from pandas.util._validators import validate_bool_kwarg # type: ignore
import opensearch_py_ml.plotting as gfx
from opensearch_py_ml.common import DEFAULT_NUM_ROWS_DISPLAYED, docstring_parameter
from opensearch_py_ml.filter import BooleanFilter
from opensearch_py_ml.groupby import DataFrameGroupBy
from opensearch_py_ml.ndframe import NDFrame
from opensearch_py_ml.series import Series
from opensearch_py_ml.utils import is_valid_attr_name
if TYPE_CHECKING:
from opensearchpy import OpenSearch
from .query_compiler import QueryCompiler
[docs]
class DataFrame(NDFrame):
"""
Two-dimensional size-mutable, potentially heterogeneous tabular data structure with labeled axes
(rows and columns) referencing data stored in OpenSearch indices.
Where possible APIs mirror pandas.DataFrame APIs.
The underlying data is stored in OpenSearch rather than core memory.
Parameters
----------
os_client: OpenSearch client
os_index_pattern: str OpenSearch index pattern. This can contain wildcards. (e.g. 'flights')
columns: list of str, optional List of DataFrame columns. A subset of the OpenSearch index's fields.
os_index_field: str, optional The OpenSearch index field to use as the DataFrame index. Defaults to _id if None is used.
See Also
--------
:pandas_api_docs:`pandas.DataFrame`
Examples
--------
Constructing DataFrame from an OpenSearch configuration arguments and an OpenSearch index
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights')
>>> df.head()
AvgTicketPrice Cancelled ... dayOfWeek timestamp
0 841.265642 False ... 0 2018-01-01 00:00:00
1 882.982662 False ... 0 2018-01-01 18:27:00
2 190.636904 False ... 0 2018-01-01 17:11:14
3 181.694216 True ... 0 2018-01-01 10:33:28
4 730.041778 False ... 0 2018-01-01 05:13:00
<BLANKLINE>
[5 rows x 27 columns]
Constructing DataFrame from an OpenSearch client and an OpenSearch index
>>> df = oml.DataFrame(os_client=OPENSEARCH_TEST_CLIENT, os_index_pattern='flights', columns=['AvgTicketPrice', 'Cancelled'])
>>> df.head()
AvgTicketPrice Cancelled
0 841.265642 False
1 882.982662 False
2 190.636904 False
3 181.694216 True
4 730.041778 False
<BLANKLINE>
[5 rows x 2 columns]
Constructing DataFrame from an OpenSearch client and an OpenSearch index, with 'timestamp' as the DataFrame
index field
(TODO - currently index_field must also be a field if not _id)
>>> df = oml.DataFrame(
... os_client=OPENSEARCH_TEST_CLIENT,
... os_index_pattern='flights',
... columns=['AvgTicketPrice', 'timestamp'],
... os_index_field='timestamp'
... )
>>> df.head()
AvgTicketPrice timestamp
2018-01-01T00:00:00 841.265642 2018-01-01 00:00:00
2018-01-01T00:02:06 772.100846 2018-01-01 00:02:06
2018-01-01T00:06:27 159.990962 2018-01-01 00:06:27
2018-01-01T00:33:31 800.217104 2018-01-01 00:33:31
2018-01-01T00:36:51 803.015200 2018-01-01 00:36:51
<BLANKLINE>
[5 rows x 2 columns]
"""
def __init__(
self,
os_client: "OpenSearch" = None,
os_index_pattern: Optional[str] = None,
columns: Optional[List[str]] = None,
os_index_field: Optional[str] = None,
_query_compiler: Optional["QueryCompiler"] = None,
) -> None:
"""
There are effectively 2 constructors:
1. client, index_pattern, columns, index_field
2. query_compiler (opensearch_py_ml.QueryCompiler)
The constructor with 'query_compiler' is for internal use only.
"""
if _query_compiler is None:
if os_client is None or os_index_pattern is None:
raise ValueError(
"client and index_pattern must be defined in DataFrame constructor"
)
# python 3 syntax
super().__init__(
os_client=os_client,
os_index_pattern=os_index_pattern,
columns=columns,
os_index_field=os_index_field,
_query_compiler=_query_compiler,
)
@property
def columns(self) -> pd.Index:
"""
The column labels of the DataFrame.
Returns
-------
pandas.Index
OpenSearch field names as pandas.Index
See Also
--------
:pandas_api_docs:`pandas.DataFrame.columns`
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights')
>>> assert isinstance(df.columns, pd.Index)
>>> df.columns
Index(['AvgTicketPrice', 'Cancelled', 'Carrier', 'Dest', 'DestAirportID', 'DestCityName',
... 'DestCountry', 'DestLocation', 'DestRegion', 'DestWeather', 'DistanceKilometers',
... 'DistanceMiles', 'FlightDelay', 'FlightDelayMin', 'FlightDelayType', 'FlightNum',
... 'FlightTimeHour', 'FlightTimeMin', 'Origin', 'OriginAirportID', 'OriginCityName',
... 'OriginCountry', 'OriginLocation', 'OriginRegion', 'OriginWeather', 'dayOfWeek',
... 'timestamp'],
... dtype='object')
"""
return self._query_compiler.columns
@property
def empty(self) -> bool:
"""Determines if the DataFrame is empty.
Returns
-------
bool
If DataFrame is empty, return True, if not return False.
See Also
--------
:pandas_api_docs:`pandas.DataFrame.empty`
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights')
>>> df.empty
False
"""
return len(self.columns) == 0 or len(self.index) == 0
[docs]
def head(self, n: int = 5) -> "DataFrame":
"""
Return the first n rows.
This function returns the first n rows for the object based on position.
The row order is sorted by index field.
It is useful for quickly testing if your object has the right type of data in it.
Parameters
----------
n: int, default 5
Number of rows to select.
Returns
-------
opensearch_py_ml.DataFrame
opensearch_py_ml DataFrame filtered on first n rows sorted by index field
See Also
--------
:pandas_api_docs:`pandas.DataFrame.head`
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights', columns=['Origin', 'Dest'])
>>> df.head(3)
Origin Dest
0 Frankfurt am Main Airport Sydney Kingsford Smith International Airport
1 Cape Town International Airport Venice Marco Polo Airport
2 Venice Marco Polo Airport Venice Marco Polo Airport
<BLANKLINE>
[3 rows x 2 columns]
"""
return DataFrame(_query_compiler=self._query_compiler.head(n))
[docs]
def tail(self, n: int = 5) -> "DataFrame":
"""
Return the last n rows.
This function returns the last n rows for the object based on position.
The row order is sorted by index field.
It is useful for quickly testing if your object has the right type of data in it.
Parameters
----------
n: int, default 5
Number of rows to select.
Returns
-------
opensearch_py_ml.DataFrame:
opensearch_py_ml DataFrame filtered on last n rows sorted by index field
See Also
--------
:pandas_api_docs:`pandas.DataFrame.tail`
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights', columns=['Origin', 'Dest'])
>>> df.tail()
Origin \\
13054 Pisa International Airport...
13055 Winnipeg / James Armstrong Richardson International Airport...
13056 Licenciado Benito Juarez International Airport...
13057 Itami Airport...
13058 Adelaide International Airport...
<BLANKLINE>
Dest...
13054 Xi'an Xianyang International Airport...
13055 Zurich Airport...
13056 Ukrainka Air Base...
13057 Ministro Pistarini International Airport...
13058 Washington Dulles International Airport...
<BLANKLINE>
[5 rows x 2 columns]
"""
return DataFrame(_query_compiler=self._query_compiler.tail(n))
[docs]
def sample(
self,
n: Optional[int] = None,
frac: Optional[float] = None,
random_state: Optional[int] = None,
) -> "DataFrame":
"""
Return n randomly sample rows or the specify fraction of rows
Parameters
----------
n : int, optional
Number of documents from index to return. Cannot be used with `frac`.
Default = 1 if `frac` = None.
frac : float, optional
Fraction of axis items to return. Cannot be used with `n`.
random_state : int, optional
Seed for the random number generator.
Returns
-------
opensearch_py_ml.DataFrame:
opensearch_py_ml DataFrame filtered containing n rows randomly sampled
See Also
--------
:pandas_api_docs:`pandas.DataFrame.sample`
"""
if frac is not None and not (0.0 < frac <= 1.0):
raise ValueError("`frac` must be between 0. and 1.")
elif n is not None and frac is None and n % 1 != 0:
raise ValueError("Only integers accepted as `n` values")
elif (n is not None) == (frac is not None):
raise ValueError("Please enter a value for `frac` OR `n`, not both")
return DataFrame(
_query_compiler=self._query_compiler.sample(
n=n, frac=frac, random_state=random_state
)
)
[docs]
def drop(
self,
labels=None,
axis=0,
index=None,
columns=None,
level=None,
inplace=False,
errors="raise",
):
"""Return new object with labels in requested axis removed.
Parameters
----------
labels:
Index or column labels to drop.
axis:
Whether to drop labels from the index (0 / 'index') or columns (1 / 'columns').
index, columns:
Alternative to specifying axis (labels, axis=1 is equivalent to columns=labels).
level:
For MultiIndex - not supported
inplace:
If True, do operation inplace and return None.
errors:
If 'ignore', suppress error and existing labels are dropped.
Returns
-------
dropped:
type of caller
See Also
--------
:pandas_api_docs:`pandas.DataFrame.drop`
Examples
--------
Drop a column
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'ecommerce', columns=['customer_first_name', 'email', 'user'])
>>> df.drop(columns=['user'])
customer_first_name email
0 Eddie eddie@underwood-family.zzz
1 Mary mary@bailey-family.zzz
2 Gwen gwen@butler-family.zzz
3 Diane diane@chandler-family.zzz
4 Eddie eddie@weber-family.zzz
... ... ...
4670 Mary mary@lambert-family.zzz
4671 Jim jim@gilbert-family.zzz
4672 Yahya yahya@rivera-family.zzz
4673 Mary mary@hampton-family.zzz
4674 Jackson jackson@hopkins-family.zzz
<BLANKLINE>
[4675 rows x 2 columns]
Drop rows by index value (axis=0)
>>> df.drop(['1', '2'])
customer_first_name email user
0 Eddie eddie@underwood-family.zzz eddie
3 Diane diane@chandler-family.zzz diane
4 Eddie eddie@weber-family.zzz eddie
5 Diane diane@goodwin-family.zzz diane
6 Oliver oliver@rios-family.zzz oliver
... ... ... ...
4670 Mary mary@lambert-family.zzz mary
4671 Jim jim@gilbert-family.zzz jim
4672 Yahya yahya@rivera-family.zzz yahya
4673 Mary mary@hampton-family.zzz mary
4674 Jackson jackson@hopkins-family.zzz jackson
<BLANKLINE>
[4673 rows x 3 columns]
"""
# Level not supported
if level is not None:
raise NotImplementedError(f"level not supported {level}")
inplace = validate_bool_kwarg(inplace, "inplace")
if labels is not None:
if index is not None or columns is not None:
raise ValueError("Cannot specify both 'labels' and 'index'/'columns'")
axis = pd.DataFrame._get_axis_name(axis)
axes = {axis: labels}
elif index is not None or columns is not None:
axes, _ = pd.DataFrame()._construct_axes_from_arguments(
(index, columns), {}
)
else:
raise ValueError(
"Need to specify at least one of 'labels', 'index' or 'columns'"
)
# TODO Clean up this error checking
if "index" not in axes:
axes["index"] = None
elif axes["index"] is not None:
if not is_list_like(axes["index"]):
axes["index"] = [axes["index"]]
if errors == "raise":
# Check if axes['index'] values exists in index
count = self._query_compiler._index_matches_count(axes["index"])
if count != len(axes["index"]):
raise ValueError(
f"number of labels {count}!={len(axes['index'])} not contained in axis"
)
else:
"""
axes["index"] = self._query_compiler.index_matches(axes["index"])
# If the length is zero, we will just do nothing
if not len(axes["index"]):
axes["index"] = None
"""
raise NotImplementedError()
if "columns" not in axes:
axes["columns"] = None
elif axes["columns"] is not None:
if not is_list_like(axes["columns"]):
axes["columns"] = [axes["columns"]]
if errors == "raise":
non_existant = [
obj for obj in axes["columns"] if obj not in self.columns
]
if len(non_existant):
raise ValueError(f"labels {non_existant} not contained in axis")
else:
axes["columns"] = [
obj for obj in axes["columns"] if obj in self.columns
]
# If the length is zero, we will just do nothing
if not len(axes["columns"]):
axes["columns"] = None
new_query_compiler = self._query_compiler.drop(
index=axes["index"], columns=axes["columns"]
)
return self._create_or_update_from_compiler(new_query_compiler, inplace)
def __getitem__(self, key):
return self._getitem(key)
def __dir__(self):
"""
Provide autocompletion on field names in interactive environment.
"""
return super().__dir__() + [
column_name
for column_name in self._query_compiler.columns.to_list()
if is_valid_attr_name(column_name)
]
def __repr__(self) -> None:
"""
From pandas
"""
buf = StringIO()
# max_rows and max_cols determine the maximum size of the pretty printed tabular
# representation of the dataframe. pandas defaults are 60 and 20 respectively.
# dataframes where len(df) > max_rows shows a truncated view with 10 rows shown.
max_rows = pd.get_option("display.max_rows")
max_cols = pd.get_option("display.max_columns")
min_rows = pd.get_option("display.min_rows")
if max_rows and len(self) > max_rows:
max_rows = min_rows
show_dimensions = pd.get_option("display.show_dimensions")
if pd.get_option("display.expand_frame_repr"):
width, _ = console.get_console_size()
else:
width = None
self.to_string(
buf=buf,
max_rows=max_rows,
max_cols=max_cols,
line_width=width,
show_dimensions=show_dimensions,
)
return buf.getvalue()
def _info_repr(self) -> bool:
"""
True if the repr should show the info view.
"""
info_repr_option = pd.get_option("display.large_repr") == "info"
return info_repr_option and not (
self._repr_fits_horizontal_() and self._repr_fits_vertical_()
)
def _repr_html_(self) -> Optional[str]:
"""
From pandas - this is called by notebooks
"""
if self._info_repr():
buf = StringIO("")
self.info(buf=buf)
# need to escape the <class>, should be the first line.
val = buf.getvalue().replace("<", r"<", 1)
val = val.replace(">", r">", 1)
return "<pre>" + val + "</pre>"
if pd.get_option("display.notebook_repr_html"):
max_rows = pd.get_option("display.max_rows")
max_cols = pd.get_option("display.max_columns")
min_rows = pd.get_option("display.min_rows")
show_dimensions = pd.get_option("display.show_dimensions")
if max_rows and len(self) > max_rows:
max_rows = min_rows
return self.to_html(
max_rows=max_rows,
max_cols=max_cols,
show_dimensions=show_dimensions,
notebook=True,
) # set for consistency with pandas output
else:
return None
[docs]
def count(self) -> pd.Series:
"""
Count non-NA cells for each column.
Counts are based on exists queries against ES.
This is inefficient, as it creates N queries (N is number of fields).
An alternative approach is to use value_count aggregations. However, they have issues in that:
- They can only be used with aggregatable fields (e.g. keyword not text)
- For list fields they return multiple counts. E.g. tags=['opensearch-project', 'ml'] returns value_count=2 for a
single document.
TODO - add additional pandas.DataFrame.count features
Returns
-------
pandas.Series:
Summary of column counts
See Also
--------
:pandas_api_docs:`pandas.DataFrame.count`
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'ecommerce', columns=['customer_first_name', 'geoip.city_name'])
>>> df.count()
customer_first_name 4675
geoip.city_name 4094
dtype: int64
"""
return self._query_compiler.count()
[docs]
def os_info(self):
# noinspection PyPep8
"""
A debug summary of an opensearch_py_ml DataFrame internals.
This includes the OpenSearch search queries and query compiler task list.
Returns
-------
str
A debug summary of an opensearch_py_ml DataFrame internals.
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights')
>>> df = df[(df.OriginAirportID == 'AMS') & (df.FlightDelayMin > 60)]
>>> df = df[['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']]
>>> df = df.tail()
>>> df
timestamp OriginAirportID DestAirportID FlightDelayMin
12608 2018-02-10 01:20:52 AMS CYEG 120
12720 2018-02-10 14:09:40 AMS BHM 255
12725 2018-02-10 00:53:01 AMS ATL 360
12823 2018-02-10 15:41:20 AMS NGO 120
12907 2018-02-11 20:08:25 AMS LIM 225
<BLANKLINE>
[5 rows x 4 columns]
>>> print(df.os_info())
os_index_pattern: flights
Index:
os_index_field: _id
is_source_field: False
Mappings:
capabilities:
os_field_name is_source os_dtype os_date_format pd_dtype is_searchable is_aggregatable is_scripted aggregatable_os_field_name
timestamp timestamp True date strict_date_hour_minute_second datetime64[ns] True True False timestamp
OriginAirportID OriginAirportID True keyword None object True True False OriginAirportID
DestAirportID DestAirportID True keyword None object True True False DestAirportID
FlightDelayMin FlightDelayMin True integer None int64 True True False FlightDelayMin
Operations:
tasks: [('boolean_filter': ('boolean_filter': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}})), ('tail': ('sort_field': '_doc', 'count': 5))]
size: 5
sort_params: {'_doc': 'desc'}
_source: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']
body: {'query': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}}
post_processing: [('sort_index')]
<BLANKLINE>
"""
buf = StringIO()
super()._os_info(buf)
return buf.getvalue()
[docs]
def os_match(
self,
text: str,
*,
columns: Optional[Union[str, Sequence[str]]] = None,
match_phrase: bool = False,
must_not_match: bool = False,
multi_match_type: Optional[str] = None,
match_only_text_fields: bool = True,
analyzer: Optional[str] = None,
fuzziness: Optional[Union[int, str]] = None,
**kwargs: Any,
) -> "DataFrame":
"""Filters data with an OpenSearch ``match``, ``match_phrase``, or
``multi_match`` query depending on the given parameters and columns.
Read more about `Full-Text Queries in OpenSearch <https://opensearch.org/docs/latest/opensearch/query-dsl/full-text/>`_
By default all fields of type 'text' within OpenSearch are queried
otherwise specific columns can be specified via the ``columns`` parameter
or a single column can be filtered on with :py:meth:`opensearch_py_ml.Series.os_match`
All additional keyword arguments are passed in the body of the match query.
Parameters
----------
text: str
String of text to search for
columns: str, List[str], optional
List of columns to search over. Defaults to all 'text' fields in OpenSearch
match_phrase: bool, default False
If True will use ``match_phrase`` instead of ``match`` query which takes into account
the order of the ``text`` parameter.
must_not_match: bool, default False
If True will apply a boolean NOT (~) to the
query. Instead of requiring a match the query
will require text to not match.
multi_match_type: str, optional
If given and matching against multiple columns will set the ``multi_match.type`` setting
match_only_text_fields: bool, default True
When True this function will raise an error if any non-text fields
are queried to prevent fields that aren't analyzed from not working properly.
Set to False to ignore this preventative check.
analyzer: str, optional
Specify which analyzer to use for the match query
fuzziness: int, str, optional
Specify the fuzziness option for the match query
Returns
-------
DataFrame
A filtered :py:class:`opensearch_py_ml.DataFrame` with the given match query
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, "ecommerce")
>>> df.os_match("Men's", columns=["category"])
category currency ... type user
0 [Men's Clothing] EUR ... order eddie
4 [Men's Clothing, Men's Accessories] EUR ... order eddie
6 [Men's Clothing] EUR ... order oliver
7 [Men's Clothing, Men's Accessories, Men's Shoes] EUR ... order abd
11 [Men's Accessories, Men's Clothing] EUR ... order eddie
... ... ... ... ... ...
4663 [Men's Shoes, Men's Clothing] EUR ... order samir
4667 [Men's Clothing, Men's Shoes] EUR ... order sultan
4671 [Men's Clothing] EUR ... order jim
4672 [Men's Clothing] EUR ... order yahya
4674 [Women's Accessories, Men's Clothing] EUR ... order jackson
<BLANKLINE>
[2310 rows x 45 columns]
"""
# Determine which columns will be used
os_dtypes = self.os_dtypes.to_dict()
if columns is None:
columns = [
column for column, os_dtype in os_dtypes.items() if os_dtype == "text"
]
elif isinstance(columns, str):
columns = [columns]
columns = list(columns)
qc = self._query_compiler
filter = qc.os_match(
text,
columns,
match_phrase=match_phrase,
match_only_text_fields=match_only_text_fields,
multi_match_type=multi_match_type,
analyzer=analyzer,
fuzziness=fuzziness,
**kwargs,
)
if must_not_match:
filter = ~filter
return DataFrame(_query_compiler=qc._update_query(filter))
[docs]
def os_query(self, query) -> "DataFrame":
"""Applies an OpenSearch DSL query to the current DataFrame.
Parameters
----------
query:
Dictionary of the OpenSearch DSL query to apply
Returns
-------
opensearch_py_ml.DataFrame:
opensearch_py_ml DataFrame with the query applied
Examples
--------
Apply a `geo-distance query`_ to a dataset with a geo-point column ``geoip.location``.
.. _geo-distance query documentation from Elasticsearch: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-geo-distance-query.html
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'ecommerce', columns=['customer_first_name', 'geoip.city_name'])
>>> df.os_query({"bool": {"filter": {"geo_distance": {"distance": "1km", "geoip.location": [55.3, 25.3]}}}}).head()
customer_first_name geoip.city_name
1 Mary Dubai
9 Rabbia Al Dubai
10 Rabbia Al Dubai
22 Mary Dubai
30 Robbie Dubai
<BLANKLINE>
[5 rows x 2 columns]
If using an occurrence like ``must`` or ``filter`` you must
nest it within ``bool``:
.. code-block:: python
# Correct:
df.os_query({
"bool": {
"filter": {...}
}
})
# Incorrect, needs to be nested under 'bool':
df.os_query({
"filter": {...}
})
"""
# Unpack the {'query': ...} which some
# users may use due to documentation.
if not isinstance(query, dict):
raise TypeError("'query' must be of type 'dict'")
if tuple(query) == ("query",):
query = query["query"]
return DataFrame(_query_compiler=self._query_compiler.os_query(query))
def _index_summary(self):
# Print index summary e.g.
# Index: 103 entries, 0 to 102
# Do this by getting head and tail of dataframe
if self.empty:
# index[0] is out of bounds for empty df
head = self.head(1).to_pandas()
tail = self.tail(1).to_pandas()
else:
head = self.head(1).to_pandas().index[0]
tail = self.tail(1).to_pandas().index[0]
index_summary = f", {pprint_thing(head)} to {pprint_thing(tail)}"
name = "Index"
return f"{name}: {len(self)} entries{index_summary}"
[docs]
def info(
self,
verbose: Optional[bool] = None,
buf: Optional[StringIO] = None,
max_cols: Optional[int] = None,
memory_usage: Optional[bool] = None,
show_counts: Optional[bool] = None,
) -> None:
"""
Print a concise summary of a DataFrame.
This method prints information about a DataFrame including
the index dtype and column dtypes, non-null values and memory usage.
See :pandas_api_docs:`pandas.DataFrame.info` for full details.
Notes
-----
This copies a lot of code from pandas.DataFrame.info as it is difficult
to split out the appropriate code or creating a SparseDataFrame gives
incorrect results on types and counts.
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'ecommerce', columns=['customer_first_name', 'geoip.city_name'])
>>> df.info()
<class 'opensearch_py_ml.dataframe.DataFrame'>
Index: 4675 entries, 0 to 4674
Data columns (total 2 columns):
# Column Non-Null Count Dtype...
--- ------ -------------- -----...
0 customer_first_name 4675 non-null object
1 geoip.city_name 4094 non-null object
dtypes: object(2)
memory usage: ...
OpenSearch storage usage: ...
"""
if buf is None: # pragma: no cover
buf = sys.stdout
lines = [str(type(self)), self._index_summary()]
columns: pd.Index = self.columns
number_of_columns: int = len(columns)
if number_of_columns == 0:
lines.append(f"Empty {type(self).__name__}")
fmt.buffer_put_lines(buf, lines)
return
# hack
if max_cols is None:
max_cols = pd.get_option("display.max_info_columns", number_of_columns + 1)
max_rows = pd.get_option("display.max_info_rows", len(self) + 1)
if show_counts is None:
show_counts = (number_of_columns <= max_cols) and (len(self) < max_rows)
exceeds_info_cols = number_of_columns > max_cols
# From pandas.DataFrame
def _put_str(s, space) -> str:
return f"{s}"[:space].ljust(space)
def _verbose_repr(number_of_columns: int) -> None:
lines.append(f"Data columns (total {number_of_columns} columns):")
id_head = " # "
column_head = "Column"
col_space = 2
max_col = max(len(pprint_thing(k)) for k in columns)
len_column = len(pprint_thing(column_head))
space = max(max_col, len_column) + col_space
max_id = len(pprint_thing(number_of_columns))
len_id = len(pprint_thing(id_head))
space_num = max(max_id, len_id) + col_space
counts = None
header = _put_str(id_head, space_num) + _put_str(column_head, space)
if show_counts:
counts = self.count()
if number_of_columns != len(counts): # pragma: no cover
raise AssertionError(
f"Columns must equal counts ({number_of_columns:d} != {len(counts):d})"
)
count_header = "Non-Null Count"
len_count = len(count_header)
non_null = " non-null"
max_count = max(len(pprint_thing(k)) for k in counts) + len(non_null)
space_count = max(len_count, max_count) + col_space
count_temp = "{count}" + non_null
else:
count_header = ""
space_count = len(count_header)
len_count = space_count
count_temp = "{count}"
dtype_header = "Dtype"
len_dtype = len(dtype_header)
max_dtypes = max(len(pprint_thing(k)) for k in self.dtypes)
space_dtype = max(len_dtype, max_dtypes)
header += _put_str(count_header, space_count) + _put_str(
dtype_header, space_dtype
)
lines.append(header)
lines.append(
_put_str("-" * len_id, space_num)
+ _put_str("-" * len_column, space)
+ _put_str("-" * len_count, space_count)
+ _put_str("-" * len_dtype, space_dtype)
)
dtypes = self.dtypes
for i, col in enumerate(columns):
dtype = dtypes.iloc[i]
col = pprint_thing(col)
line_no = _put_str(f" {i}", space_num)
count = ""
if show_counts:
count = counts.iloc[i]
lines.append(
line_no
+ _put_str(col, space)
+ _put_str(count_temp.format(count=count), space_count)
+ _put_str(dtype, space_dtype)
)
def _non_verbose_repr() -> None:
lines.append(columns._summary(name="Columns"))
def _sizeof_fmt(num: float, size_qualifier: str) -> str:
# returns size in human readable format
for x in ["bytes", "KB", "MB", "GB", "TB"]:
if num < 1024.0:
return f"{num:3.3f}{size_qualifier} {x}"
num /= 1024.0
return f"{num:3.3f}{size_qualifier} PB"
if verbose:
_verbose_repr(number_of_columns)
elif verbose is False: # specifically set to False, not nesc None
_non_verbose_repr()
else:
(
_non_verbose_repr()
if exceeds_info_cols
else _verbose_repr(number_of_columns)
)
# pandas 0.25.1 uses get_dtype_counts() here. This
# returns a Series with strings as the index NOT dtypes.
# Therefore, to get consistent ordering we need to
# align types with pandas method.
counts = self.dtypes.value_counts()
counts.index = counts.index.astype(str)
dtypes = [f"{k}({v:d})" for k, v in sorted(counts.items())]
lines.append(f"dtypes: {', '.join(dtypes)}")
if memory_usage is None:
memory_usage = pd.get_option("display.memory_usage")
if memory_usage:
# append memory usage of df to display
size_qualifier = ""
# TODO - this is different from pd.DataFrame as we shouldn't
# really hold much in memory. For now just approximate with getsizeof + ignore deep
mem_usage = sys.getsizeof(self)
lines.append(f"memory usage: {_sizeof_fmt(mem_usage, size_qualifier)}")
storage_usage = self._query_compiler._client.indices.stats(
index=self._query_compiler._index_pattern, metric=["store"]
)["_all"]["total"]["store"]["size_in_bytes"]
lines.append(
f"OpenSearch storage usage: {_sizeof_fmt(storage_usage, size_qualifier)}\n"
)
fmt.buffer_put_lines(buf, lines)
[docs]
@docstring_parameter(DEFAULT_NUM_ROWS_DISPLAYED)
def to_html(
self,
buf=None,
columns=None,
col_space=None,
header=True,
index=True,
na_rep="NaN",
formatters=None,
float_format=None,
sparsify=None,
index_names=True,
justify=None,
max_rows=None,
max_cols=None,
show_dimensions=False,
decimal=".",
bold_rows=True,
classes=None,
escape=True,
notebook=False,
border=None,
table_id=None,
render_links=False,
) -> Any:
"""
Render a OpenSearch data as an HTML table.
Follows pandas implementation except when ``max_rows=None``. In this scenario, we set ``max_rows={0}`` to avoid
accidentally dumping an entire index. This can be overridden by explicitly setting ``max_rows``.
See Also
--------
:pandas_api_docs:`pandas.DataFrame.to_html`
"""
# In pandas calling 'to_string' without max_rows set, will dump ALL rows - we avoid this
# by limiting rows by default.
num_rows = len(self) # avoid multiple calls
if num_rows <= DEFAULT_NUM_ROWS_DISPLAYED:
if max_rows is None:
max_rows = num_rows
else:
max_rows = min(num_rows, max_rows)
elif max_rows is None:
warnings.warn(
f"DataFrame.to_string called without max_rows set "
f"- this will return entire index results. "
f"Setting max_rows={DEFAULT_NUM_ROWS_DISPLAYED}"
f" overwrite if different behaviour is required.",
UserWarning,
)
max_rows = DEFAULT_NUM_ROWS_DISPLAYED
# because of the way pandas handles max_rows=0, not having this throws an error
# see opensearch_py_ml issue #56
if max_rows == 0:
max_rows = 1
# Create a slightly bigger dataframe than display
df = self._build_repr(max_rows + 1)
if buf is not None:
_buf = _expand_user(stringify_path(buf))
else:
_buf = StringIO()
df.to_html(
buf=_buf,
columns=columns,
col_space=col_space,
header=header,
index=index,
na_rep=na_rep,
formatters=formatters,
float_format=float_format,
sparsify=sparsify,
index_names=index_names,
justify=justify,
max_rows=max_rows,
max_cols=max_cols,
show_dimensions=False,
decimal=decimal,
bold_rows=bold_rows,
classes=classes,
escape=escape,
notebook=notebook,
border=border,
table_id=table_id,
render_links=render_links,
)
# Our fake dataframe has incorrect number of rows (max_rows*2+1) - write out
# the correct number of rows
if show_dimensions:
# TODO - this results in different output to pandas
# TODO - the 'x' character is different and this gets added after the </div>
_buf.write(f"\n<p>{len(self.index)} rows × {len(self.columns)} columns</p>")
if buf is None:
result = _buf.getvalue()
return result
[docs]
@docstring_parameter(DEFAULT_NUM_ROWS_DISPLAYED)
def to_string(
self,
buf=None,
columns=None,
col_space=None,
header=True,
index=True,
na_rep="NaN",
formatters=None,
float_format=None,
sparsify=None,
index_names=True,
justify=None,
max_rows=None,
max_cols=None,
show_dimensions=False,
decimal=".",
line_width=None,
):
"""
Render a DataFrame to a console-friendly tabular output.
Follows pandas implementation except when ``max_rows=None``. In this scenario, we set ``max_rows={0}`` to avoid
accidentally dumping an entire index. This can be overridden by explicitly setting ``max_rows``.
See Also
--------
:pandas_api_docs:`pandas.DataFrame.to_string`
"""
# In pandas calling 'to_string' without max_rows set, will dump ALL rows - we avoid this
# by limiting rows by default.
num_rows = len(self) # avoid multiple calls
if num_rows <= DEFAULT_NUM_ROWS_DISPLAYED:
if max_rows is None:
max_rows = num_rows
else:
max_rows = min(num_rows, max_rows)
elif max_rows is None:
warnings.warn(
f"DataFrame.to_string called without max_rows set "
f"- this will return entire index results. "
f"Setting max_rows={DEFAULT_NUM_ROWS_DISPLAYED}"
f" overwrite if different behaviour is required.",
UserWarning,
)
max_rows = DEFAULT_NUM_ROWS_DISPLAYED
# because of the way pandas handles max_rows=0, not having this throws an error
# see opensearch_py_ml issue #56
if max_rows == 0:
max_rows = 1
# Create a slightly bigger dataframe than display
df = self._build_repr(max_rows + 1)
if buf is not None:
_buf = _expand_user(stringify_path(buf))
else:
_buf = StringIO()
df.to_string(
buf=_buf,
columns=columns,
col_space=col_space,
na_rep=na_rep,
formatters=formatters,
float_format=float_format,
sparsify=sparsify,
justify=justify,
index_names=index_names,
header=header,
index=index,
max_rows=max_rows,
max_cols=max_cols,
show_dimensions=False, # print this outside of this call
decimal=decimal,
line_width=line_width,
)
# Our fake dataframe has incorrect number of rows (max_rows*2+1) - write out
# the correct number of rows
if show_dimensions:
_buf.write(f"\n\n[{len(self.index)} rows x {len(self.columns)} columns]")
if buf is None:
result = _buf.getvalue()
return result
def __getattr__(self, key: str) -> Any:
"""After regular attribute access, looks up the name in the columns
Parameters
----------
key: str
Attribute name.
Returns
-------
The value of the attribute.
"""
try:
return object.__getattribute__(self, key)
except AttributeError as e:
if key in self.columns:
return self[key]
raise e
def _getitem(
self,
key: Union[
"DataFrame", "Series", pd.Index, List[str], str, BooleanFilter, np.ndarray
],
) -> Union["Series", "DataFrame"]:
"""Get the column specified by key for this DataFrame.
Args:
key : The column name.
Returns:
A Pandas Series representing the value for the column.
"""
key = apply_if_callable(key, self)
# Shortcut if key is an actual column
try:
if key in self.columns:
return self._getitem_column(key)
except (KeyError, ValueError, TypeError):
pass
if isinstance(key, (Series, np.ndarray, pd.Index, list)):
return self._getitem_array(key)
elif isinstance(key, DataFrame):
return self.where(key)
elif isinstance(key, BooleanFilter):
return DataFrame(_query_compiler=self._query_compiler._update_query(key))
else:
return self._getitem_column(key)
def _getitem_column(self, key: str) -> "Series":
if key not in self.columns:
raise KeyError(f"Requested column [{key}] is not in the DataFrame.")
s = self._reduce_dimension(self._query_compiler.getitem_column_array([key]))
return s
def _getitem_array(self, key: Union[str, pd.Series]) -> "DataFrame":
if isinstance(key, Series):
key = key.to_pandas()
if is_bool_indexer(key):
if isinstance(key, pd.Series) and not key.index.equals(self.index):
warnings.warn(
"Boolean Series key will be reindexed to match DataFrame index.",
PendingDeprecationWarning,
stacklevel=3,
)
elif len(key) != len(self.index):
raise ValueError(
f"Item wrong length {len(key)} instead of {len(self.index)}."
)
key = check_bool_indexer(self.index, key)
# We convert to a RangeIndex because getitem_row_array is expecting a list
# of indices, and RangeIndex will give us the exact indices of each boolean
# requested.
key = pd.RangeIndex(len(self.index))[key]
if len(key):
return DataFrame(
_query_compiler=self._query_compiler.getitem_row_array(key)
)
else:
return DataFrame(columns=self.columns)
else:
if any(k not in self.columns for k in key):
raise KeyError(
f"{str([k for k in key if k not in self.columns]).replace(',', '')}"
f" not index"
)
return DataFrame(
_query_compiler=self._query_compiler.getitem_column_array(key)
)
def _create_or_update_from_compiler(
self, new_query_compiler: "QueryCompiler", inplace: bool = False
) -> Union["QueryCompiler", "DataFrame"]:
"""Returns or updates a DataFrame given new query_compiler"""
assert (
isinstance(new_query_compiler, type(self._query_compiler))
or type(new_query_compiler) in self._query_compiler.__class__.__bases__
), f"Invalid Query Compiler object: {type(new_query_compiler)}"
if not inplace:
return DataFrame(_query_compiler=new_query_compiler)
else:
self._query_compiler: "QueryCompiler" = new_query_compiler
@staticmethod
def _reduce_dimension(query_compiler: "QueryCompiler") -> "Series":
return Series(_query_compiler=query_compiler)
[docs]
def to_csv(
self,
path_or_buf=None,
sep=",",
na_rep="",
float_format=None,
columns=None,
header=True,
index=True,
index_label=None,
mode="w",
encoding=None,
compression="infer",
quoting=None,
quotechar='"',
line_terminator=None,
chunksize=None,
tupleize_cols=None,
date_format=None,
doublequote=True,
escapechar=None,
decimal=".",
) -> Optional[str]:
"""
Write OpenSearch data to a comma-separated values (csv) file.
See Also
--------
:pandas_api_docs:`pandas.DataFrame.to_csv`
"""
kwargs = {
"path_or_buf": path_or_buf,
"sep": sep,
"na_rep": na_rep,
"float_format": float_format,
"columns": columns,
"header": header,
"index": index,
"index_label": index_label,
"mode": mode,
"encoding": encoding,
"compression": compression,
"quoting": quoting,
"quotechar": quotechar,
"line_terminator": line_terminator,
"chunksize": chunksize,
"date_format": date_format,
"doublequote": doublequote,
"escapechar": escapechar,
"decimal": decimal,
}
return self._query_compiler.to_csv(**kwargs)
[docs]
def to_pandas(self, show_progress: bool = False) -> pd.DataFrame:
"""
Utility method to convert opensearch_py_ml.Dataframe to pandas.Dataframe
Returns
-------
pandas.DataFrame
"""
return self._query_compiler.to_pandas(show_progress=show_progress)
def _empty_pd_df(self) -> pd.DataFrame:
return self._query_compiler._empty_pd_ef()
[docs]
def select_dtypes(self, include=None, exclude=None) -> "DataFrame":
"""
Return a subset of the DataFrame's columns based on the column dtypes.
Compatible with :pandas_api_docs:`pandas.DataFrame.select_dtypes`
Returns
-------
opensearch_py_ml.DataFrame
DataFrame contains only columns of selected dtypes
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights',
... columns=['AvgTicketPrice', 'Dest', 'Cancelled', 'timestamp', 'dayOfWeek'])
>>> df.dtypes
AvgTicketPrice float64
Dest object
Cancelled bool
timestamp datetime64[ns]
dayOfWeek int64
dtype: object
>>> df = df.select_dtypes(include=[np.number, 'datetime'])
>>> df.dtypes
AvgTicketPrice float64
timestamp datetime64[ns]
dayOfWeek int64
dtype: object
"""
empty_df = self._empty_pd_df()
empty_df = empty_df.select_dtypes(include=include, exclude=exclude)
return self._getitem_array(empty_df.columns)
@property
def shape(self) -> Tuple[int, int]:
"""
Return a tuple representing the dimensionality of the DataFrame.
Returns
-------
shape: tuple
0. number of rows
1. number of columns
Notes
-----
- number of rows ``len(df)`` queries OpenSearch
- number of columns ``len(df.columns)`` is cached. If mappings are updated, DataFrame must be updated.
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'ecommerce')
>>> df.shape
(4675, 45)
"""
num_rows = len(self)
num_columns = len(self.columns)
return num_rows, num_columns
@property
def ndim(self) -> int:
"""
Returns 2 by definition of a DataFrame
Returns
-------
int
By definition 2
See Also
--------
:pandas_api_docs:`pandas.DataFrame.ndim`
"""
return 2
[docs]
def keys(self) -> pd.Index:
"""
Return columns
See :pandas_api_docs:`pandas.DataFrame.keys`
Returns
-------
pandas.Index
OpenSearch field names as pandas.Index
"""
return self.columns
[docs]
def iterrows(
self, sort_index: Optional["str"] = "_doc"
) -> Iterable[Tuple[Union[str, Tuple[str, ...]], pd.Series]]:
"""
Iterate over opensearch_py_ml.DataFrame rows as (index, pandas.Series) pairs.
Parameters
----------
sort_index: str, default '_doc'
What field to sort the OpenSearch data by.
Yields
------
index: index
The index of the row.
data: pandas Series
The data of the row as a pandas Series.
See Also
--------
opensearch_py_ml.DataFrame.itertuples: Iterate over opensearch_py_ml.DataFrame rows as namedtuples.
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights', columns=['AvgTicketPrice', 'Cancelled']).head()
>>> df
AvgTicketPrice Cancelled
0 841.265642 False
1 882.982662 False
2 190.636904 False
3 181.694216 True
4 730.041778 False
<BLANKLINE>
[5 rows x 2 columns]
>>> for index, row in df.iterrows():
... print(row)
AvgTicketPrice 841.265642
Cancelled False
Name: 0, dtype: object
AvgTicketPrice 882.982662
Cancelled False
Name: 1, dtype: object
AvgTicketPrice 190.636904
Cancelled False
Name: 2, dtype: object
AvgTicketPrice 181.694216
Cancelled True
Name: 3, dtype: object
AvgTicketPrice 730.041778
Cancelled False
Name: 4, dtype: object
"""
for df in self._query_compiler.search_yield_pandas_dataframes(
sort_index=sort_index
):
yield from df.iterrows()
[docs]
def itertuples(
self,
index: bool = True,
name: Union[str, None] = "opensearch_py_ml",
sort_index: Optional[str] = "_doc",
) -> Iterable[Tuple[Any, ...]]:
"""
Iterate over opensearch_py_ml.DataFrame rows as namedtuples.
Parameters
----------
index: bool, default True
If True, return the index as the first element of the tuple.
name: str or None, default "opensearch-py-ml"
The name of the returned namedtuples or None to return regular tuples.
sort_index: str, default '_doc'
What field to sort the OpenSearch data by.
Returns
-------
iterator
An object to iterate over namedtuples for each row in the
DataFrame with the first field possibly being the index and
following fields being the column values.
See Also
--------
opensearch_py_ml.DataFrame.iterrows: Iterate over opensearch_py_ml.DataFrame rows as (index, pandas.Series) pairs.
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights', columns=['AvgTicketPrice', 'Cancelled']).head()
>>> df
AvgTicketPrice Cancelled
0 841.265642 False
1 882.982662 False
2 190.636904 False
3 181.694216 True
4 730.041778 False
<BLANKLINE>
[5 rows x 2 columns]
>>> for row in df.itertuples():
... print(row)
opensearch_py_ml(Index='0', AvgTicketPrice=841.2656419677076, Cancelled=False)
opensearch_py_ml(Index='1', AvgTicketPrice=882.9826615595518, Cancelled=False)
opensearch_py_ml(Index='2', AvgTicketPrice=190.6369038508356, Cancelled=False)
opensearch_py_ml(Index='3', AvgTicketPrice=181.69421554118, Cancelled=True)
opensearch_py_ml(Index='4', AvgTicketPrice=730.041778346198, Cancelled=False)
By setting the `index` parameter to False we can remove the index as the first element of the tuple:
>>> for row in df.itertuples(index=False):
... print(row)
opensearch_py_ml(AvgTicketPrice=841.2656419677076, Cancelled=False)
opensearch_py_ml(AvgTicketPrice=882.9826615595518, Cancelled=False)
opensearch_py_ml(AvgTicketPrice=190.6369038508356, Cancelled=False)
opensearch_py_ml(AvgTicketPrice=181.69421554118, Cancelled=True)
opensearch_py_ml(AvgTicketPrice=730.041778346198, Cancelled=False)
With the `name` parameter set we set a custom name for the yielded namedtuples:
>>> for row in df.itertuples(name='Flight'):
... print(row)
Flight(Index='0', AvgTicketPrice=841.2656419677076, Cancelled=False)
Flight(Index='1', AvgTicketPrice=882.9826615595518, Cancelled=False)
Flight(Index='2', AvgTicketPrice=190.6369038508356, Cancelled=False)
Flight(Index='3', AvgTicketPrice=181.69421554118, Cancelled=True)
Flight(Index='4', AvgTicketPrice=730.041778346198, Cancelled=False)
"""
for df in self._query_compiler.search_yield_pandas_dataframes(
sort_index=sort_index
):
yield from df.itertuples(index=index, name=name)
[docs]
def aggregate(
self,
func: Union[str, List[str]],
axis: int = 0,
numeric_only: Optional[bool] = None,
*args,
**kwargs,
) -> Union[pd.Series, pd.DataFrame]:
"""
Aggregate using one or more operations over the specified axis.
Parameters
----------
func: function, str, list or dict
Function to use for aggregating the data. If a function, must either
work when passed a %(klass)s or when passed to %(klass)s.apply.
Accepted combinations are:
- function
- string function name
- list of functions and/or function names, e.g. ``[np.sum, 'mean']``
- dict of axis labels -> functions, function names or list of such.
Currently, we only support ``['count', 'mad', 'max', 'mean', 'median', 'min', 'mode', 'quantile',
'rank', 'sem', 'skew', 'sum', 'std', 'var']``
axis: int
Currently, we only support axis=0 (index)
numeric_only: {True, False, None} Default is None
Which datatype to be returned
- True: returns all values with float64, NaN/NaT are ignored.
- False: returns all values with float64.
- None: returns all values with default datatype.
*args
Positional arguments to pass to `func`
**kwargs
Keyword arguments to pass to `func`
Returns
-------
DataFrame, Series or scalar
if DataFrame.agg is called with a single function, returns a Series
if DataFrame.agg is called with several functions, returns a DataFrame
if Series.agg is called with single function, returns a scalar
if Series.agg is called with several functions, returns a Series
See Also
--------
:pandas_api_docs:`pandas.DataFrame.aggregate`
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights', columns=['AvgTicketPrice', 'DistanceKilometers', 'timestamp', 'DestCountry'])
>>> df.aggregate(['sum', 'min', 'std'], numeric_only=True).astype(int)
AvgTicketPrice DistanceKilometers
sum 8204364 92616288
min 100 0
std 266 4578
>>> df.aggregate(['sum', 'min', 'std'], numeric_only=True)
AvgTicketPrice DistanceKilometers
sum 8.204365e+06 9.261629e+07
min 1.000205e+02 0.000000e+00
std 2.664071e+02 4.578614e+03
>>> df.aggregate(['sum', 'min', 'std'], numeric_only=False)
AvgTicketPrice DistanceKilometers timestamp DestCountry
sum 8.204365e+06 9.261629e+07 NaT NaN
min 1.000205e+02 0.000000e+00 2018-01-01 NaN
std 2.664071e+02 4.578614e+03 NaT NaN
>>> df.aggregate(['sum', 'min', 'std'], numeric_only=None)
AvgTicketPrice DistanceKilometers timestamp DestCountry
sum 8.204365e+06 9.261629e+07 NaT NaN
min 1.000205e+02 0.000000e+00 2018-01-01 NaN
std 2.664071e+02 4.578614e+03 NaT NaN
"""
axis = pd.DataFrame._get_axis_number(axis)
if axis == 1:
raise NotImplementedError(
"Aggregating via index not currently implemented - needs index transform"
)
# currently we only support a subset of functions that aggregate columns.
# ['count', 'mad', 'max', 'mean', 'median', 'min', 'mode', 'quantile',
# 'rank', 'sem', 'skew', 'sum', 'std', 'var', 'nunique']
if isinstance(func, str):
# Wrap in list
return (
self._query_compiler.aggs([func], numeric_only=numeric_only)
.squeeze()
.rename(None)
)
elif is_list_like(func):
# we have a list!
return self._query_compiler.aggs(func, numeric_only=numeric_only)
agg = aggregate
hist = gfx.oml_hist_frame
[docs]
def groupby(
self, by: Optional[Union[str, List[str]]] = None, dropna: bool = True
) -> "DataFrameGroupBy":
"""
Used to perform groupby operations
Parameters
----------
by:
column or list of columns used to groupby
Currently accepts column or list of columns
dropna: default True
If True, and if group keys contain NA values, NA values together with row/column will be dropped.
Returns
-------
opensearch_py_ml.groupby.DataFrameGroupBy
See Also
--------
:pandas_api_docs:`pandas.DataFrame.groupby`
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> oml_flights = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights', columns=["AvgTicketPrice", "Cancelled", "dayOfWeek", "timestamp", "DestCountry"])
>>> oml_flights.groupby(["DestCountry", "Cancelled"]).agg(["min", "max"], numeric_only=True) # doctest: +NORMALIZE_WHITESPACE
AvgTicketPrice dayOfWeek
min max min max
DestCountry Cancelled
AE False 110.799911 1126.148682 0.0 6.0
True 132.443756 817.931030 0.0 6.0
AR False 125.589394 1199.642822 0.0 6.0
True 251.389603 1172.382568 0.0 6.0
AT False 100.020531 1181.835815 0.0 6.0
... ... ... ... ...
TR True 307.915649 307.915649 0.0 0.0
US False 100.145966 1199.729004 0.0 6.0
True 102.153069 1192.429932 0.0 6.0
ZA False 102.002663 1196.186157 0.0 6.0
True 121.280296 1175.709961 0.0 6.0
<BLANKLINE>
[63 rows x 4 columns]
>>> oml_flights.groupby(["DestCountry", "Cancelled"]).mean(numeric_only=True) # doctest: +NORMALIZE_WHITESPACE
AvgTicketPrice dayOfWeek
DestCountry Cancelled
AE False 643.956793 2.717949
True 388.828809 2.571429
AR False 673.551677 2.746154
True 682.197241 2.733333
AT False 647.158290 2.819936
... ... ...
TR True 307.915649 0.000000
US False 598.063146 2.752014
True 579.799066 2.767068
ZA False 636.998605 2.738589
True 677.794078 2.928571
<BLANKLINE>
[63 rows x 2 columns]
>>> oml_flights.groupby(["DestCountry", "Cancelled"]).min(numeric_only=False) # doctest: +NORMALIZE_WHITESPACE
AvgTicketPrice dayOfWeek timestamp
DestCountry Cancelled
AE False 110.799911 0 2018-01-01 19:31:30
True 132.443756 0 2018-01-06 13:03:25
AR False 125.589394 0 2018-01-01 01:30:47
True 251.389603 0 2018-01-01 02:13:17
AT False 100.020531 0 2018-01-01 05:24:19
... ... ... ...
TR True 307.915649 0 2018-01-08 04:35:10
US False 100.145966 0 2018-01-01 00:06:27
True 102.153069 0 2018-01-01 09:02:36
ZA False 102.002663 0 2018-01-01 06:44:44
True 121.280296 0 2018-01-04 00:37:01
<BLANKLINE>
[63 rows x 3 columns]
"""
if by is None:
raise ValueError("by parameter should be specified to groupby")
if isinstance(by, str):
by = [by]
if isinstance(by, (list, tuple)):
remaining_columns = sorted(set(by) - set(self._query_compiler.columns))
if remaining_columns:
raise KeyError(
f"Requested columns {repr(remaining_columns)[1:-1]} not in the DataFrame"
)
return DataFrameGroupBy(
by=by, query_compiler=self._query_compiler.copy(), dropna=dropna
)
[docs]
def mode(
self,
numeric_only: bool = False,
dropna: bool = True,
os_size: int = 10,
) -> pd.DataFrame:
"""
Calculate mode of a DataFrame
Parameters
----------
numeric_only: {True, False} Default is False
Which datatype to be returned
- True: Returns all numeric or timestamp columns
- False: Returns all columns
dropna: {True, False} Default is True
- True: Don’t consider counts of NaN/NaT.
- False: Consider counts of NaN/NaT.
os_size: default 10
number of rows to be returned if mode has multiple values
See Also
--------
:pandas_api_docs:`pandas.DataFrame.mode`
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> oml_ecommerce = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'ecommerce')
>>> oml_df = oml_ecommerce.filter(["total_quantity", "geoip.city_name", "customer_birth_date", "day_of_week", "taxful_total_price"])
>>> oml_df.mode(numeric_only=False)
total_quantity geoip.city_name customer_birth_date day_of_week taxful_total_price
0 2 New York NaT Thursday 53.98
>>> oml_df.mode(numeric_only=True)
total_quantity taxful_total_price
0 2 53.98
>>> oml_df = oml_ecommerce.filter(["products.tax_amount","order_date"])
>>> oml_df.mode()
products.tax_amount order_date
0 0.0 2016-12-02 20:36:58
1 NaN 2016-12-04 23:44:10
2 NaN 2016-12-08 06:21:36
3 NaN 2016-12-08 09:38:53
4 NaN 2016-12-12 11:38:24
5 NaN 2016-12-12 19:46:34
6 NaN 2016-12-14 18:00:00
7 NaN 2016-12-15 11:38:24
8 NaN 2016-12-22 19:39:22
9 NaN 2016-12-24 06:21:36
>>> oml_df.mode(os_size = 3)
products.tax_amount order_date
0 0.0 2016-12-02 20:36:58
1 NaN 2016-12-04 23:44:10
2 NaN 2016-12-08 06:21:36
"""
# TODO dropna=False
return self._query_compiler.mode(
numeric_only=numeric_only, dropna=True, is_dataframe=True, os_size=os_size
)
[docs]
def quantile(
self,
q: Union[int, float, List[int], List[float]] = 0.5,
numeric_only: Optional[bool] = True,
) -> "pd.DataFrame":
"""
Used to calculate quantile for a given DataFrame.
Parameters
----------
q:
float or array like, default 0.5
Value between 0 <= q <= 1, the quantile(s) to compute.
numeric_only: {True, False, None} Default is True
Which datatype to be returned
- True: Returns all values as float64, NaN/NaT values are removed
- None: Returns all values as the same dtype where possible, NaN/NaT are removed
- False: Returns all values as the same dtype where possible, NaN/NaT are preserved
Returns
-------
pandas.DataFrame
quantile value for each column
See Also
--------
:pandas_api_docs:`pandas.DataFrame.quantile`
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> oml_df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights')
>>> oml_flights = oml_df.filter(["AvgTicketPrice", "FlightDelayMin", "dayOfWeek", "timestamp"])
>>> oml_flights.quantile() # doctest: +SKIP
AvgTicketPrice 640.387285
FlightDelayMin 0.000000
dayOfWeek 3.000000
Name: 0.5, dtype: float64
>>> oml_flights.quantile([.2, .5, .75]) # doctest: +SKIP
AvgTicketPrice FlightDelayMin dayOfWeek
0.20 361.040768 0.0 1.0
0.50 640.387285 0.0 3.0
0.75 842.213490 15.0 4.0
>>> oml_flights.quantile([.2, .5, .75], numeric_only=False) # doctest: +SKIP
AvgTicketPrice FlightDelayMin dayOfWeek timestamp
0.20 361.040768 0.0 1.0 2018-01-09 04:43:55.296587520
0.50 640.387285 0.0 3.0 2018-01-21 23:51:57.637076736
0.75 842.213490 15.0 4.0 2018-02-01 04:46:16.658119680
"""
return self._query_compiler.quantile(quantiles=q, numeric_only=numeric_only)
[docs]
def idxmax(self, axis: int = 0) -> pd.Series:
"""
Return index of first occurrence of maximum over requested axis.
NA/null values are excluded.
Parameters
----------
axis : {0, 1}, default 0
The axis to filter on, expressed as index (int).
Returns
-------
pandas.Series
See Also
--------
:pandas_api_docs:`pandas.DataFrame.idxmax`
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> oml_df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights')
>>> oml_flights = oml_df.filter(["AvgTicketPrice", "FlightDelayMin", "dayOfWeek", "timestamp"])
>>> oml_flights.idxmax()
AvgTicketPrice 1843
FlightDelayMin 109
dayOfWeek 1988
dtype: object
"""
return self._query_compiler.idx(axis=axis, sort_order="desc")
[docs]
def idxmin(self, axis: int = 0) -> pd.Series:
"""
Return index of first occurrence of minimum over requested axis.
NA/null values are excluded.
Parameters
----------
axis : {0, 1}, default 0
The axis to filter on, expressed as index (int).
Returns
-------
pandas.Series
See Also
--------
:pandas_api_docs:`pandas.DataFrame.idxmin`
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> oml_df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights')
>>> oml_flights = oml_df.filter(["AvgTicketPrice", "FlightDelayMin", "dayOfWeek", "timestamp"])
>>> oml_flights.idxmin()
AvgTicketPrice 5454
FlightDelayMin 0
dayOfWeek 0
dtype: object
"""
return self._query_compiler.idx(axis=axis, sort_order="asc")
[docs]
def query(self, expr) -> "DataFrame":
"""
Query the columns of a DataFrame with a boolean expression.
TODO - add additional pandas arguments
Parameters
----------
expr: str
A boolean expression
Returns
-------
opensearch_py_ml.DataFrame:
DataFrame populated by results of the query
TODO - add link to opensearch_py_ml user guide
See Also
--------
:pandas_api_docs:`pandas.DataFrame.query`
:pandas_user_guide:`indexing`
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights')
>>> df.shape
(13059, 27)
>>> df.query('FlightDelayMin > 60').shape
(2730, 27)
"""
if isinstance(expr, BooleanFilter):
return DataFrame(
_query_compiler=self._query_compiler._update_query(BooleanFilter(expr))
)
elif isinstance(expr, str):
column_resolver = {}
for key in self.keys():
column_resolver[key] = self.get(key)
# Create fake resolvers - index resolver is empty
resolvers = column_resolver, {}
# Use pandas eval to parse query - TODO validate this further
filter = eval(expr, target=self, resolvers=tuple(tuple(resolvers)))
return DataFrame(_query_compiler=self._query_compiler._update_query(filter))
else:
raise NotImplementedError(expr, type(expr))
[docs]
def get(
self, key: Any, default: Optional[Any] = None
) -> Union["Series", "DataFrame"]:
"""
Get item from object for given key (ex: DataFrame column).
Returns default value if not found.
Parameters
----------
key: object
default: default value if not found
Returns
-------
value: same type as items contained in object
See Also
--------
:pandas_api_docs:`pandas.DataFrame.get`
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights')
>>> df.get('Carrier')
0 Kibana Airlines
1 Logstash Airways
2 Logstash Airways
3 Kibana Airlines
4 Kibana Airlines
...
13054 Logstash Airways
13055 Logstash Airways
13056 Logstash Airways
13057 JetBeats
13058 JetBeats
Name: Carrier, Length: 13059, dtype: object
"""
if key in self.keys():
return self._getitem(key)
else:
return default
[docs]
def filter(
self,
items: Optional[Sequence[str]] = None,
like: Optional[str] = None,
regex: Optional[str] = None,
axis: Optional[Union[int, str]] = None,
) -> "DataFrame":
"""
Subset the dataframe rows or columns according to the specified index labels.
Note that this routine does not filter a dataframe on its
contents. The filter is applied to the labels of the index.
Parameters
----------
items : list-like
Keep labels from axis which are in items.
like : str
Keep labels from axis for which "like in label == True".
regex : str (regular expression)
Keep labels from axis for which re.search(regex, label) == True.
axis : {0 or ‘index’, 1 or ‘columns’, None}, default None
The axis to filter on, expressed either as an index (int) or axis name (str). By default this is the info axis, ‘index’ for Series, ‘columns’ for DataFrame.
Returns
-------
opensearch_py_ml.DataFrame
See Also
--------
:pandas_api_docs:`pandas.DataFrame.filter`
Notes
-----
The ``items``, ``like``, and ``regex`` parameters are
enforced to be mutually exclusive.
"""
filter_options_passed = sum([items is not None, bool(like), bool(regex)])
if filter_options_passed > 1:
raise TypeError(
"Keyword arguments `items`, `like`, or `regex` "
"are mutually exclusive"
)
elif filter_options_passed == 0:
raise TypeError("Must pass either 'items', 'like', or 'regex'")
# axis defaults to 'columns' for DataFrame, 'index' for Series
if axis is None:
axis = "columns"
axis = pd.DataFrame._get_axis_name(axis)
if axis == "index":
new_query_compiler = self._query_compiler.filter(
items=items, like=like, regex=regex
)
return self._create_or_update_from_compiler(
new_query_compiler, inplace=False
)
else: # axis == "columns"
if items is not None:
# Pandas skips over columns that don't exist
# and maintains order of items=[...]
existing_columns = set(self.columns.to_list())
return self[[column for column in items if column in existing_columns]]
elif like is not None:
def matcher(x: str) -> bool:
return like in x
else:
matcher = re.compile(regex).search
return self[[column for column in self.columns if matcher(column)]]
@property
def values(self) -> None:
"""
Not implemented.
In pandas this returns a Numpy representation of the DataFrame. This would involve scan/scrolling the
entire index.
If this is required, call ``oml.opensearch_to_pandas(oml_df).values``, *but beware this will scan/scroll the entire
OpenSearch index(s) into memory.*
See Also
--------
:pandas_api_docs:`pandas.DataFrame.values`
opensearch_to_pandas
to_numpy
"""
return self.to_numpy()
[docs]
def to_numpy(self) -> None:
"""
Not implemented.
In pandas this returns a Numpy representation of the DataFrame. This would involve scan/scrolling the
entire index.
If this is required, call ``oml.opensearch_to_pandas(oml_df).values``, *but beware this will scan/scroll the
entire OpenSearch index(s) into memory.*
See Also
--------
:pandas_api_docs:`pandas.DataFrame.to_numpy`
opensearch_to_pandas
Examples
--------
>>> from tests import OPENSEARCH_TEST_CLIENT
>>> oml_df = oml.DataFrame(OPENSEARCH_TEST_CLIENT, 'flights', columns=['AvgTicketPrice', 'Carrier']).head(5)
>>> pd_df = oml.opensearch_to_pandas(oml_df)
>>> print(f"type(oml_df)={type(oml_df)}\\ntype(pd_df)={type(pd_df)}")
type(oml_df)=<class 'opensearch_py_ml.dataframe.DataFrame'>
type(pd_df)=<class 'pandas.core.frame.DataFrame'>
>>> oml_df
AvgTicketPrice Carrier
0 841.265642 Kibana Airlines
1 882.982662 Logstash Airways
2 190.636904 Logstash Airways
3 181.694216 Kibana Airlines
4 730.041778 Kibana Airlines
<BLANKLINE>
[5 rows x 2 columns]
>>> pd_df.values
array([[841.2656419677076, 'Kibana Airlines'],
[882.9826615595518, 'Logstash Airways'],
[190.6369038508356, 'Logstash Airways'],
[181.69421554118, 'Kibana Airlines'],
[730.041778346198, 'Kibana Airlines']], dtype=object)
"""
raise AttributeError(
"This method would scan/scroll the entire OpenSearch index(s) into memory. "
"If this is explicitly required, and there is sufficient memory, call `oml.opensearch_to_pandas(oml_df).values`"
)