Source code for niimpy.preprocessing.util

import contextlib
from dateutil.tz import tzlocal
import numpy as np
import os
import pandas as pd
import re
import sys
import warnings

from scipy import stats


[docs] def ensure_dataframe(df): if df is None: return pd.DataFrame() assert isinstance(df, pd.DataFrame), "Please input data as a pandas DataFrame type" return df
[docs] def date_range(df, start, end): """Extract out a certain date range from a DataFrame. Extract out a certain data range from a dataframe. The index must be the dates, and the index must be sorted. """ # TODO: is this needed? Do normal pandas operation, timestamp # checking is not really needed (and limits the formats that can # be used, pandas can take more than pd.Timestamp) # Move this function to utils # Deal with pandas timestamp compatibility if(start!=None): assert isinstance(start,pd.Timestamp),"start not given in timestamp format" else: start = df.index[0] if(end!= None): assert isinstance(end,pd.Timestamp),"end not given in timestamp format" else: end = df.index[-1] df_new = df.loc[start:end] return df_new
#SYSTEM_TZ = tzlocal() # the operating system timezone - for sqlite output compat SYSTEM_TZ = 'Europe/Helsinki' TZ = tzlocal() TZ = 'Europe/Helsinki'
[docs] def set_tz(tz): """Globally set the preferred local timezone""" global TZ TZ = tz
[docs] @contextlib.contextmanager def tmp_timezone(new_tz): """Temporarily override the global timezone for a black. This is used as a context manager:: with tmp_timezone('Europe/Berlin'): .... Note: this overrides the global timezone. In the future, there will be a way to handle timezones as non-global variables, which should be preferred. """ global TZ old_tz = TZ TZ = new_tz yield TZ = old_tz
SQLITE3_EXTENSIONS_BASENAME = os.path.join(os.path.dirname(__file__), 'sqlite-extension-functions.c') SQLITE3_EXTENSIONS_FILENAME = os.path.join(os.path.dirname(__file__), 'sqlite-extension-functions.so')
[docs] def install_extensions(): """Automatically install sqlite extension functions. Only works on Linux for now, improvements welcome.""" import hashlib if not os.path.exists(SQLITE3_EXTENSIONS_BASENAME): import urllib.request extension_url = 'https://sqlite.org/contrib/download/extension-functions.c?get=25' urllib.request.urlretrieve(extension_url, SQLITE3_EXTENSIONS_BASENAME) expected_digest = '991b40fe8b2799edc215f7260b890f14a833512c9d9896aa080891330ffe4052' if hashlib.sha256(open(SQLITE3_EXTENSIONS_BASENAME, 'rb').read()).hexdigest() != expected_digest: print("sqlite-extension-functions.c has wrong sha256 hash", file=sys.stderr) os.system('cd %s; gcc -lm -shared -fPIC sqlite-extension-functions.c -o sqlite-extension-functions.so'% os.path.dirname(__file__)) print("Sqlite extension successfully compiled.")
[docs] def uninstall_extensions(): """Uninstall any installed extensions""" def unlink_if_exists(x): if os.path.exists(x): os.unlink(x) unlink_if_exists(SQLITE3_EXTENSIONS_FILENAME)
[docs] def read_preprocess(df, add_group=None): """Standard preprocessing arguments when reading. This is a preprocessing filter which handles some standard arguments when reading files. This should be considered a private, unstable function. Parameters ---------- df: pandas.DataFrame Input data frame add_group: string, optional If given, add a new 'group' column with all values set to this given identifier. Returns ------- df: dataframe Resulting dataframe (modified in-place if possible, but may also be a copy) """ if add_group is not None: df['group'] = add_group #df['group'] = df['group'].astype('category') #pd.Categorical(add_group) return df
[docs] def df_normalize(df, tz=None, old_tz=None): """Normalize a df (from sql) before presenting it to the user. This sets the dataframe index to the time values, and converts times to pandas.TimeStamp:s. Modifies the data frame inplace. """ if tz is None: warnings.warn(DeprecationWarning("From now on, you should explicitely specify timezone with e.g. tz='Europe/Helsinki'. Specify as part of the reading function.")) tz = TZ if 'time' in df: df.index = to_datetime(df['time']) df.index.name = None df['datetime'] = df.index elif 'day' in df and 'hour' in df: index = df[['day', 'hour']].apply(lambda row: pd.Timestamp('%s %s:00'%(row['day'], row['hour'])), axis=1) if old_tz is not None: # old_tz is given - e.g. sqlite already converts it to localtime index = index.dt.tz_localize(old_tz).dt.tz_convert(tz) else: index = index.dt.tz_localize(tz) df.index = index df.index.name = None
[docs] def to_datetime(value): times = pd.to_datetime(value, unit='s', utc=True) if isinstance(times, pd.Series): return times.dt.tz_convert(TZ) else: return times.tz_convert(TZ)
[docs] def identifier_columns(df, id_columns = ["user", "device", "group"]): """ build a list of standard Niimpy identifier columns in the dataframe. """ columns = list(set(id_columns) & set(df.columns)) return columns
[docs] def select_columns(df, columns, id_columns = ["user", "device", "group"]): """ Select Niimpy identifier columns and listed feature columns """ columns = identifier_columns(df, id_columns + columns) return df[columns]
[docs] def group_data(df, additional_columns=None, id_columns=["user", "device", "group"]): """ Group the dataframe by Niimpy standard user identifier columns present in the dataframe. The columns are 'user', 'device', and 'group'. An addional column may be added and used for grouping. """ if type(additional_columns) is str: additional_columns = [additional_columns] elif additional_columns is None: additional_columns = [] columns = identifier_columns(df, id_columns + additional_columns) return df.groupby(columns)
[docs] def reset_groups(df, additional_columns=None, id_columns = ["user", "device", "group"]): """ Reset id columns and optional addional columns in the dataframe index. """ if type(additional_columns) is str: additional_columns = [additional_columns] elif additional_columns is None: additional_columns = [] columns = list(set(id_columns + additional_columns) & set(df.index.names)) return df.reset_index(columns)
[docs] def set_conserved_index(df, additional_columns=None, id_columns = ["user", "device", "group"]): """ Set standard id columns as index. This allows concatenating dataframes with different measurements. """ if type(additional_columns) is str: additional_columns = [additional_columns] elif additional_columns is None: additional_columns = [] index_by = list(set(id_columns + additional_columns) & set(df.columns)) df = df.set_index(index_by, append=True) return df
[docs] def set_encoding(df, to_encoding = 'utf-8', from_encoding = 'iso-8859-1'): """ Recode the dataframe to a different encoding. This is useful when the encoding in a data file is set incorrectly and utf characters are garbled. Parameters ---------- df : pandas.DataFrame Dataframe to recode to_encoding : str Encoding to convert to. Default is 'utf-8'. from_encoding : str Encoding to convert from. Default is 'iso-8859-1'. Returns ------- pandas.DataFrame Recoded dataframe. """ for column in df.columns: if df[column].dtype == 'object': df[column] = df[column].str.encode(from_encoding).str.decode(to_encoding) return df
[docs] def occurrence(series, bins=5, interval="1h"): """ Resamples by grouping_width and aggregates by the number of bins with data. With default options, this reproduces the logic of the "occurrence" database function, without needing the database. Parameters ---------- series : pandas.Series A pandas series of pandas.Timestamps. bins : int The number of bins each time interval is divided into. interval : str Length of the time interval. Default is "1h". Returns ------- pandas.DataFrame Dataframe with timestamp index and 'occurance' column. """ if not isinstance(series, (pd.Series, pd.Index)): raise ValueError("The input to niimpy.util.occurrence must be a " "pandas Series or Index, not a DataFrame. " "(your input type was: %s)"%type(series)) if not np.issubdtype(series.dtype.base, np.datetime64): series = pd.to_datetime(series, unit='s') dt = pd.to_timedelta(interval) bin_width = dt/bins df = pd.DataFrame({"time": series}) df.set_index('time', inplace=True) df["occurrence"] = 1 df = df.resample(bin_width).count() df = df[df['occurrence'] > 0] df = df.resample(interval).count() return df
[docs] def aggregate(df, freq, method_numerical='mean', method_categorical='first', groups=['user'], **resample_kwargs): """ Grouping and resampling the data. This function performs separated resampling for different types of columns: numerical and categorical. Parameters ---------- df : pandas Dataframe Dataframe to resample freq : string Frequency to resample the data. Requires the dataframe to have datetime-like index. method_numerical : str Resampling method for numerical columns. Possible values: 'sum', 'mean', 'median'. Default value is 'mean'. method_categorical : str Resampling method for categorical columns. Possible values: 'first', 'mode', 'last'. groups : list Columns used for groupby operation. resample_kwargs : dict keywords to pass pandas resampling function Returns ------- An aggregated and resampled multi-index dataframe. """ #Groupby user groupby = df.groupby(groups) #Resample numerical columns -> sub_df1 assert method_numerical in ['mean', 'sum', 'median'], \ 'Cannot recognize sampling method. Possible values: "mean", "sum", "median".' if method_numerical == 'sum': sub_df1 = groupby.resample(freq, **resample_kwargs, include_groups=False).sum(numeric_only=True) elif method_numerical == 'mean': sub_df1 = groupby.resample(freq, **resample_kwargs, include_groups=False).mean(numeric_only=True) elif method_numerical == 'median': sub_df1 = groupby.resample(freq, **resample_kwargs, include_groups=False).median(numeric_only=True) else: print("Can't recognize sampling method") #Resample cat columns -> sub_df2 cat_cols = df.select_dtypes(include=['object']).columns.to_list() cat_cols.extend(groups) cat_cols = list(set(cat_cols)) groupby = df[cat_cols].groupby(groups) assert method_categorical in ['first', 'mode', 'last'] if method_categorical == 'first': sub_df2 = groupby.resample(freq, **resample_kwargs, include_groups=False).first() elif method_categorical == 'last': sub_df2 = groupby.resample(freq, **resample_kwargs, include_groups=False).last() elif method_categorical == 'mode': sub_df2 = groupby.resample(freq, **resample_kwargs, include_groups=False).agg(lambda x: tuple(stats.mode(x)[0])) #Merge sub_df1 and sub_df2 sub_df1 = sub_df1.drop(groups, axis=1, errors='ignore') sub_df2 = sub_df2.drop(groups, axis=1, errors='ignore') final_df = sub_df1.join(sub_df2) # Reset the user index, user should be a column final_df.reset_index(groups, inplace=True) return final_df