Source code for pyllars.pandas_utils

"""
This module contains utilities for data frame manipulation. 
 
This module differs from `ml_utils` and others because this module
treats pandas data frames more like database tables which hold various
types of records. The other modules tend to treat data frames as data
matrices (in a statistical/machine learning sense).
"""
import logging
logger = logging.getLogger(__name__)

import functools
import gzip
import os
import shutil

import numpy as np
import pandas as pd
import tqdm

import openpyxl

import pyllars.shell_utils as shell_utils
import pyllars.utils as utils
import pyllars.validation_utils as validation_utils

import typing

from typing import Callable, Dict, Generator, Iterable, List, Optional, Sequence, Set, Union
StrOrList = Union[str,List[str]]


[docs]def apply(df:pd.DataFrame, func:Callable, *args, progress_bar:bool=False, **kwargs) -> List: """ Apply func to each row in the data frame Unlike :py:meth:`pandas.DataFrame.apply`, this function does not attempt to "interpret" the results and cast them back to a data frame, etc. Parameters ---------- df: pandas.DataFrame the data frame func: typing.Callable The function to apply to each row in `data_frame` args Positional arguments to pass to `func` kwargs Keyword arguments to pass to `func` progress_bar: bool Whether to show a progress bar when waiting for results. Returns ------- results: typing.List The result of each function call """ it = df.iterrows() if progress_bar: it = tqdm.tqdm(it, total=len(df)) ret_list = [ func(*(row[1], *args), **kwargs) for row in it ] return ret_list
[docs]def apply_groups(groups:pd.core.groupby.DataFrameGroupBy, func:Callable, *args, progress_bar:bool=False, **kwargs) -> List: """ Apply `func` to each group in `groups` Unlike :py:meth:`pandas.core.groupby.GroupBy.apply`, this function does not attempt to "interpret" the results by casting to a data frame, etc. Parameters ---------- groups: pandas.core.groupby.GroupBy The result of a call to `groupby` on a data frame func: function pointer The function to apply to each group in `groups` args Positional arguments to pass to `func` kwargs Keyword arguments to pass to `func` progress_bar: bool Whether to show a progress bar when waiting for results. Returns ------- results: typing.List The result of each function call """ it = groups if progress_bar: it = tqdm.tqdm(it, total=len(groups)) ret_list = [ func(*(group, *args), **kwargs) for name, group in it ] return ret_list
[docs]def dict_to_dataframe(dic:Dict, key_name:str='key', value_name:str='value') -> pd.DataFrame: """ Convert a dictionary into a two-column data frame using the given column names. Each entry in the data frame corresponds to one row. Parameters ---------- dic: typing.Dict A dictionary key_name: str The name to use for the column for the keys value_name: str The name to use for the column for the values Returns ------- df: pandas.DataFrame A data frame in which each row corresponds to one entry in dic """ df = pd.Series(dic, name=value_name) df.index.name = key_name df = df.reset_index() return df
[docs]def dataframe_to_dict(df:pd.DataFrame, key_field:str, value_field:str) -> Dict: """ Convert two columns of a data frame into a dictionary Parameters ---------- df : pandas.DataFrame The data frame key_field : str The field to use as the keys in the dictionary value_field : str The field to use as the values Returns ------- the_dict: typing.Dict A dictionary which has one entry for each row in the data frame, with the keys and values as indicated by the fields """ dic = dict(zip(df[key_field], df[value_field])) return dic
[docs]def get_series_union(*pd_series:Iterable[pd.Series]) -> Set: """ Take the union of values from the list of series Parameters ---------- pd_series : typing.Iterable[pandas.Series] The list of pandas series Returns ------- set_union : typing.Set The union of the values in all series """ res = set.union( *[set(s) for s in pd_series] ) return res
excel_extensions = ('xls', 'xlsx') hdf5_extensions = ('hdf', 'hdf5', 'h5', 'he5') parquet_extensions = ('parq', ) def _guess_df_filetype(filename:str): """ Guess the filetype of `filename`. The supported types and extensions used for guessing are: * excel: xls, xlsx * hdf5: hdf, hdf5, h5, he5 * parquet: parq * csv: all other extensions Additionally, if filename is a pd.ExcelWriter object, then the guessed filetype will be 'excel_writer' Parameters ---------- filename :str The name of the file (including extension) one which we will guess Returns ------- guessed_type : str The guessed file type. See above for the supported types and extensions. """ msg = "Attempting to guess the extension. Filename: {}".format(filename) logger.debug(msg) if isinstance(filename, pd.ExcelWriter): filetype = 'excel_writer' elif filename.endswith(excel_extensions): filetype = 'excel' elif filename.endswith(hdf5_extensions): filetype= 'hdf5' elif filename.endswith(parquet_extensions): filetype= 'parquet' else: filetype = 'csv' msg = "The guessed filetype was: {}".format(filetype) logger.debug(msg) return filetype
[docs]def read_df(filename:str, filetype:str='AUTO', sheet:str=None, **kwargs) -> pd.DataFrame: """ Read a data frame from a file By default, this function attempts to guess the type of the file based on its extension. Alternatively, the filetype can be exlicitly specified. The supported types and extensions used for guessing are: * excel: xls, xlsx * hdf5: hdf, hdf5, h5, he5 * parquet: parq * csv: all other extensions **N.B.** In principle, matlab data files are hdf5, so this function should be able to read them. This has not been thoroughly tested, though. Parameters ---------- filename : str The input file filetype : str The type of file, which determines which pandas read function will be called. If `AUTO`, the function uses the extensions mentioned above to guess the filetype. sheet: str For excel or hdf5 files, this will be passed to extract the desired information from the file. Please see :py:func:`pandas.read_excel` or :py:func:`pandas.read_hdf` for more information on how values are interpreted. kwargs Keyword arguments to pass to the appropriate `read` function. Returns ------- df : pandas.DataFrame The data frame """ # first, see if we want to guess the filetype if filetype == 'AUTO': filetype = _guess_df_filetype(filename) # now, parse the file if filetype == 'csv': df = pd.read_csv(filename, **kwargs) elif filetype == 'excel': df = pd.read_excel(filename, sheet_name=sheet, **kwargs) elif filetype == 'hdf5': df = pd.read_hdf(filename, key=sheet, **kwargs) elif filetype == "parquet": caller = "pandas_utils.read_df" validation_utils.validate_packages_installed(['fastparquet'], caller) import fastparquet pf = fastparquet.ParquetFile(filename, **kwargs) # multi-indices are not yet supported, so we always have to turn # off indices to avoid a NotImplementedError df = pf.to_pandas(index=False) else: msg = "Could not read dataframe. Invalid filetype: {}".format(filetype) raise ValueError(msg) return df
[docs]def write_df(df:pd.DataFrame, out, create_path:bool=False, filetype:str='AUTO', sheet:str='Sheet_1', compress:bool=True, **kwargs) -> None: """ Writes a data frame to a file of the specified type Unless otherwise specified, csv files are gzipped when written. By default, the filetype will be guessed based on the extension. The supported types and extensions used for guessing are: * excel: xls, xlsx * hdf5: hdf, hdf5, h5, he5 * parquet: parq * csv: all other extensions (e.g., "gz" or "bed") Additionally, the filetype can be specified as 'excel_writer'. In this case, the out object is taken to be a pd.ExcelWriter, and the df is appended to the writer. AUTO will also guess this correctly. **N.B.** The hdf5 filetype has not been thoroughly tested. Parameters ---------- df : pandas.DataFrame The data frame out : str or pandas.ExcelWriter The (complete) path to the file. The file name WILL NOT be modified. In particular, ".gz" WILL NOT be added if the file is to be zipped. As mentioned above, if the filetype is passed as 'excel_writer', then this is taken to be a pd.ExcelWriter object. create_path : bool Whether to create the path directory structure to the file if it does not already exist. N.B. This will not attempt to create the path to an excel_writer since it is possible that it does not yet have one specified. filetype : str The type of output file to write. If `AUTO`, the function uses the extensions mentioned above to guess the filetype. sheet : str The name of the sheet (excel) or key (hdf5) to use when writing the file. This argument is not used for csv. For excel, the sheet is limited to 31 characters. It will be trimmed if necessary. compress : bool Whether to compress the output. This is only used for csv files. kwargs Keyword arguments to pass to the appropriate "write" function. Returns ------- None : None The file is created as specified """ # check for a deprecated keyword if 'do_not_compress' in kwargs: msg = ("[pd_utils.write_df] `do_not_compress` keyword has been removed. " "Please use `compress` instead.") raise DeprecationWarning(msg) # first, see if we want to guess the filetype if filetype == 'AUTO': filetype = _guess_df_filetype(out) # check if we want to and can create the path if create_path: if filetype != 'excel_writer': shell_utils.ensure_path_to_file_exists(out) else: msg = ("[pd_utils.write_df]: create_path was passed as True, but the " "filetype is 'excel_writer'. This combination does not work. " "The path to the writer will not be created.") logger.warning(msg) if filetype == 'csv': if compress: with gzip.open(out, 'wt') as out: df.to_csv(out, **kwargs) else: df.to_csv(out, **kwargs) elif filetype == 'excel': with pd.ExcelWriter(out) as out: df.to_excel(out, sheet[:31], **kwargs) elif filetype == 'excel_writer': df.to_excel(out, sheet[:31], **kwargs) elif filetype == 'hdf5': df.to_hdf(out, sheet, **kwargs) elif filetype == 'parquet': if compress: kwargs['compression'] = 'GZIP' # handle "index=False" kwarg if 'index' in kwargs: index = kwargs.pop('index') # if index is true, then no need to do anything # that is the default if not index: kwargs['write_index'] = False # if a parquet "file" exists, delete it if os.path.exists(out): # it could be either a folder or a file if os.path.isfile(out): # delete file os.remove(out) else: # delete directory shutil.rmtree(out) caller = "pandas_utils.read_df" validation_utils.validate_packages_installed(['fastparquet'], caller) import fastparquet fastparquet.write(out, df, **kwargs) else: msg = ("Could not write the dataframe. Invalid filetype: {}".format( filetype)) raise ValueError(msg)
[docs]def append_to_xlsx(df:pd.DataFrame, xlsx:str, sheet='Sheet_1', **kwargs) -> None: """ Append `df` to `xlsx` If the sheet already exists, it will be overwritten. If the file does not exist, it will be created. **N.B.** This *will not* work with an open file handle! The xlsx argument *must be* the path to the file. Parameters ---------- df : pandas.DataFrame The data frame to write xlsx : str The path to the excel file sheet :str The name of the sheet, which will be truncated to 31 characters kwargs Keyword arguments to pass to the appropriate "write" function. Returns ------- None : None The sheet is appended to the excel file """ # check if the file already exists if os.path.exists(xlsx): book = openpyxl.load_workbook(xlsx) with pd.ExcelWriter(xlsx, engine='openpyxl') as writer: writer.book = book writer.sheets = dict((ws.title, ws) for ws in book.worksheets) write_df(df, writer, sheet=sheet, **kwargs) else: # then we can just create it fresh write_df(df, xlsx, sheet=sheet, **kwargs)
[docs]def split_df(df:pd.DataFrame, num_groups:int=None, chunk_size:int=None) -> pd.core.groupby.DataFrameGroupBy: """ Split `df` into roughly equal-sized groups The size of the groups can be specified by either giving the number of groups (`num_groups`) or the size of each group (`chunk_size`). The groups are contiguous rows in the data frame. Parameters ---------- df : pandas.DataFrame The data frame num_groups : int The number of groups chunk_size : int The size of each group. If given, `num_groups` groups has precedence over chunk_size Returns -------- groups : pandas.core.groupby.GroupBy The groups """ if num_groups is None: if chunk_size is not None: num_groups = int(df.shape[0] / chunk_size) else: msg = ("[pd_utils.split_df] one of `num_groups` and `chunk_size` " "must be provided") raise ValueError(msg) parallel_indices = np.arange(len(df)) // (len(df) / num_groups) split_groups = df.groupby(parallel_indices) return split_groups
[docs]def group_and_chunk_df(df:pd.DataFrame, groupby_field:str, chunk_size:int) -> pd.core.groupby.DataFrameGroupBy: """ Group `df` using then given field, and then create "groups of groups" with `chunk_size` groups in each outer group Parameters ---------- df: pandas.DataFrame The data frame groupby_field: str The field for creating the initial grouping chunk_size: int The size of each outer group Returns -------- groups : pandas.core.groupby.GroupBy The groups """ # first, pull out the unique values for the groupby field df_chunks = pd.DataFrame(columns=[groupby_field],data=df[groupby_field].unique()) # now, create a map from each unique groupby value to its chunk chunk_indices = np.arange(len(df_chunks)) // chunk_size df_chunks['chunk'] = chunk_indices stays_chunk_map = dataframe_to_dict( df_chunks, key_field=groupby_field, value_field='chunk' ) # finally, determine the chunk of each row in the original data frame group_chunks = df[groupby_field].map(stays_chunk_map) # and create the group chunks group_chunks = df.groupby(group_chunks) return group_chunks
[docs]def get_group_sizes( df:Optional[pd.DataFrame]=None, group_fields:Optional[StrOrList]=None, groups:Optional[pd.core.groupby.GroupBy]=None) -> pd.DataFrame: """ Create a data frame containing the size of each group Parameters ---------- df : pandas.DataFrame The data frame. If `groups` are given, then `df` is not needed. group_fields: typing.Optional[typing.Union[str, typing.Sequence[str]]] If not `None`, then the field(s) by which to group the data frame. This value must be something which can be interpreted by pd.DataFrame.groupby. groups: typing.Optional[pandas.core.groupby.GroupBy] If not None, then these groups will be used to find the counts. Returns ------- df_counts : pandas.DataFrame The data frame containing the counts. It contains one column for each of the `group_fields` (or whatever the index is if `groups` is instead provided), as well as a `count` column. """ if (group_fields is None) and (groups is None): msg = ("[pandas_utils.get_group_sizes]: No groups or group field " "provided") raise ValueError(msg) # we also can't have both if (group_fields is not None) and (groups is not None): msg = ("[pandas_utils.get_group_sizes]: Both groups and group field " "provided") raise ValueError(msg) # so we either have groups or group_field if group_fields is not None: groups = df.groupby(group_fields) df_counts = groups.size() df_counts = df_counts.reset_index() df_counts = df_counts.rename(columns={0:'count'}) df_counts = df_counts.sort_values('count', ascending=False) df_counts = df_counts.reset_index(drop=True) return df_counts
[docs]def get_group_extreme( df:pd.DataFrame, ex_field:str, ex_type:str="max", group_fields:Optional[StrOrList]=None, groups:Optional[pd.core.groupby.GroupBy]=None) -> pd.DataFrame: """ Find the row in each group of `df` with an extreme value for `ex_field` "ex_type" must be either "max" or "min" and indicated which type of extreme to consider. Either the "group_field" or "groups" must be given. Parameters ---------- df: pd.DataFrame The original data frame. Even if the groups are created externally, the original data frame must be given. ex_field: str The field to find for which to find the extreme values ex_type: str {"max" or "min"}, case-insensitive The type of extreme to consider. groups: typing.Optional[pandas.core.groupby.GroupBy] If not None, then these groups will be used to find the maximum values. group_fields: typing.Optional[typing.Union[str, typing.Sequence[str]]] If not `None`, then the field(s) by which to group the data frame. This value must be something which can be interpreted by pd.DataFrame.groupby. Returns ------- ex_df: pandas.DataFrame A data frame with rows which contain the extreme values for the indicated groups. """ # make sure we were given something by which to group if (group_fields is None) and (groups is None): msg = ("[pandas_utils.get_group_extreme]: No groups or group field " "provided") raise ValueError(msg) # we also can't have both if (group_fields is not None) and (groups is not None): msg = ("[pandas_utils.get_group_extreme]: Both groups and group field " "provided") raise ValueError(msg) # and that we have a valid exteme op is_max = ex_type.lower() == "max" is_min = ex_type.lower() == "min" if not (is_max or is_min): msg = ("[pandas_utils.get_group_extreme]: Invalid ex_type given. " "Choices: \"max\" or \"min\"") raise ValueError(msg) # so we either have groups or group_field if group_fields is not None: groups = df.groupby(group_fields) if is_max: ex_vals = groups[ex_field].idxmax() elif is_min: ex_vals = groups[ex_field].idxmin() ex_rows = df.loc[ex_vals] return ex_rows
[docs]def groupby_to_generator(groups:pd.core.groupby.GroupBy) -> Generator: """ Convert the groupby object to a generator of data frames Parameters ----------- groups : pandas.core.groupby.GroupBy The groups Returns -------- group_generator : typing.Generator A generator over the data frames in `groups` """ for k, g in groups: yield g
[docs]def join_df_list(dfs:List[pd.DataFrame], join_col:StrOrList, *args, **kwargs) -> pd.DataFrame: """ Join a list of data frames on a common column Parameters ---------- dfs: typing.Iterable[pandas.DataFrame] The data frames join_col: str or typing.List[str] The name of the column(s) to use for joining. All of the data frames in `dfs` must have this column (or all columns in the list). args Positional arguments to pass to :py:func:`pandas.merge` kwargs Keyword arguments to pass to :py:func:`pandas.merge` Returns ------- joined_df: pandas.DataFrame The data frame from joining all of those in the list on `join_col`. This function does not especially handle other columns which appear in all data frames, and their names in the joined data frame will be adjusted according to the standard pandas suffix approach. """ joined_df = functools.reduce( lambda left,right: pd.merge(left,right,on=join_col, *args, **kwargs), dfs) return joined_df
[docs]def filter_rows( df_filter:pd.DataFrame, df_to_keep:pd.DataFrame, filter_on:List[str], to_keep_on:List[str], drop_duplicates:bool=True) -> pd.DataFrame: """ Filter rows from `df_to_keep` which have matches in `df_filter` **N.B.** The order of the the columns in `filter_on` and `to_keep_on` *must* match. This is adapted from: https://stackoverflow.com/questions/44706485. Parameters ---------- df_filter : pandas.DataFrame The rows which will be used *as the filter* df_to_keep : pandas.DataFrame The rows which will be kept, unless they appear in `df_filter` filter_on : typing.List[str] The columns from `df_filter` to use for matching to_keep_on : typing.List[str] The columns from `df_to_keep` to use for matching drop_duplicates : bool Whether to remove duplicate rows from the filtered data frame Returns ------- df_filtered : pandas.DataFrame The rows of `df_to_keep` which do not appear in `df_filter` (considering only the given columns) """ d = df_filter.merge( df_to_keep, left_on=filter_on, right_on=to_keep_on, indicator=True, how='outer' ) m_right_only = d['_merge'] == 'right_only' d = d[m_right_only] if drop_duplicates: d = d.drop_duplicates() d = d.reset_index(drop=True) return d
[docs]def intersect_masks(masks:Sequence[np.ndarray]) -> np.ndarray: """ Take the intersection sof all masks in the list """ m_intersect = np.all(list(m for m in masks), axis=0) return m_intersect
[docs]def union_masks(masks:Sequence[np.ndarray]) -> np.ndarray: """ Take the union of all masks in the list """ m_union = np.any(list(m for m in masks), axis=0) return m_union