Source code for niimpy.preprocessing.util

import contextlib
from dateutil.tz import tzlocal
import numpy as np
import os
import pandas as pd
import re
import sys
import warnings

from scipy import stats


[docs] def date_range(df, start, end): """Extract out a certain date range from a DataFrame. Extract out a certain data range from a dataframe. The index must be the dates, and the index must be sorted. """ # TODO: is this needed? Do normal pandas operation, timestamp # checking is not really needed (and limits the formats that can # be used, pandas can take more than pd.Timestamp) # Move this function to utils # Deal with pandas timestamp compatibility if(start!=None): assert isinstance(start,pd.Timestamp),"start not given in timestamp format" else: start = df.index[0] if(end!= None): assert isinstance(end,pd.Timestamp),"end not given in timestamp format" else: end = df.index[-1] df_new = df.loc[start:end] return df_new
#SYSTEM_TZ = tzlocal() # the operating system timezone - for sqlite output compat SYSTEM_TZ = 'Europe/Helsinki' TZ = tzlocal() TZ = 'Europe/Helsinki'
[docs] def set_tz(tz): """Globally set the preferred local timezone""" global TZ TZ = tz
[docs] @contextlib.contextmanager def tmp_timezone(new_tz): """Temporarily override the global timezone for a black. This is used as a context manager:: with tmp_timezone('Europe/Berlin'): .... Note: this overrides the global timezone. In the future, there will be a way to handle timezones as non-global variables, which should be preferred. """ global TZ old_tz = TZ TZ = new_tz yield TZ = old_tz
SQLITE3_EXTENSIONS_BASENAME = os.path.join(os.path.dirname(__file__), 'sqlite-extension-functions.c') SQLITE3_EXTENSIONS_FILENAME = os.path.join(os.path.dirname(__file__), 'sqlite-extension-functions.so')
[docs] def install_extensions(): """Automatically install sqlite extension functions. Only works on Linux for now, improvements welcome.""" import hashlib if not os.path.exists(SQLITE3_EXTENSIONS_BASENAME): import urllib.request extension_url = 'https://sqlite.org/contrib/download/extension-functions.c?get=25' urllib.request.urlretrieve(extension_url, SQLITE3_EXTENSIONS_BASENAME) expected_digest = '991b40fe8b2799edc215f7260b890f14a833512c9d9896aa080891330ffe4052' if hashlib.sha256(open(SQLITE3_EXTENSIONS_BASENAME, 'rb').read()).hexdigest() != expected_digest: print("sqlite-extension-functions.c has wrong sha256 hash", file=sys.stderr) os.system('cd %s; gcc -lm -shared -fPIC sqlite-extension-functions.c -o sqlite-extension-functions.so'% os.path.dirname(__file__)) print("Sqlite extension successfully compiled.")
[docs] def uninstall_extensions(): """Uninstall any installed extensions""" def unlink_if_exists(x): if os.path.exists(x): os.unlink(x) unlink_if_exists(SQLITE3_EXTENSIONS_FILENAME)
#TODO: reanme to data.py
[docs] def df_normalize(df, tz=None, old_tz=None): """Normalize a df (from sql) before presenting it to the user. This sets the dataframe index to the time values, and converts times to pandas.TimeStamp:s. Modifies the data frame inplace. """ if tz is None: warnings.warn(DeprecationWarning("From now on, you should explicitely specify timezone with e.g. tz='Europe/Helsinki'. Specify as part of the reading function.")) tz = TZ if 'time' in df: df.index = to_datetime(df['time']) df.index.name = None df['datetime'] = df.index elif 'day' in df and 'hour' in df: index = df[['day', 'hour']].apply(lambda row: pd.Timestamp('%s %s:00'%(row['day'], row['hour'])), axis=1) if old_tz is not None: # old_tz is given - e.g. sqlite already converts it to localtime index = index.dt.tz_localize(old_tz).dt.tz_convert(tz) else: index = index.dt.tz_localize(tz) df.index = index df.index.name = None
[docs] def to_datetime(value): times = pd.to_datetime(value, unit='s', utc=True) if isinstance(times, pd.Series): return times.dt.tz_convert(TZ) else: return times.tz_convert(TZ)
[docs] def format_column_names(df): # Replace special characters, including space and ., with _ # (keeping parenthesis and /, which are used in units, e.g. "temperature (C)") # Convert to lower case column_map = {} for column in df.columns: formatted_name = column.replace(" ", "_").lower() formatted_name = re.sub(r'[^a-zA-Z0-9_()/]+', '_', formatted_name) column_map[column] = formatted_name df.rename(columns=column_map, inplace=True)
[docs] def set_encoding(df, to_encoding = 'utf-8', from_encoding = 'iso-8859-1'): """ Recode the dataframe to a different encoding. This is useful when the encoding in a data file is set incorrectly and utf characters are garbled. Parameters ---------- df : pandas.DataFrame Dataframe to recode to_encoding : str Encoding to convert to. Default is 'utf-8'. from_encoding : str Encoding to convert from. Default is 'iso-8859-1'. Returns ------- pandas.DataFrame Recoded dataframe. """ for column in df.columns: if df[column].dtype == 'object': df[column] = df[column].str.encode(from_encoding).str.decode(to_encoding) return df
[docs] def occurrence(series, bins=5, interval="1h"): """ Resamples by grouping_width and aggregates by the number of bins with data. With default options, this reproduces the logic of the "occurrence" database function, without needing the database. Parameters ---------- series : pandas.Series A pandas series of pandas.Timestamps. bins : int The number of bins each time interval is divided into. interval : str Length of the time interval. Default is "1h". Returns ------- pandas.DataFrame Dataframe with timestamp index and 'occurance' column. """ if not isinstance(series, (pd.Series, pd.Index)): raise ValueError("The input to niimpy.util.occurrence must be a " "pandas Series or Index, not a DataFrame. " "(your input type was: %s)"%type(series)) if not np.issubdtype(series.dtype.base, np.datetime64): series = pd.to_datetime(series, unit='s') dt = pd.to_timedelta(interval) bin_width = dt/bins df = pd.DataFrame({"time": series}) df.set_index('time', inplace=True) df["occurrence"] = 1 df = df.resample(bin_width).count() df = df[df['occurrence'] > 0] df = df.resample(interval).count() return df
[docs] def aggregate(df, freq, method_numerical='mean', method_categorical='first', groups=['user'], **resample_kwargs): """ Grouping and resampling the data. This function performs separated resampling for different types of columns: numerical and categorical. Parameters ---------- df : pandas Dataframe Dataframe to resample freq : string Frequency to resample the data. Requires the dataframe to have datetime-like index. method_numerical : str Resampling method for numerical columns. Possible values: 'sum', 'mean', 'median'. Default value is 'mean'. method_categorical : str Resampling method for categorical columns. Possible values: 'first', 'mode', 'last'. groups : list Columns used for groupby operation. resample_kwargs : dict keywords to pass pandas resampling function Returns ------- An aggregated and resampled multi-index dataframe. """ #Groupby user groupby = df.groupby(groups) #Resample numerical columns -> sub_df1 assert method_numerical in ['mean', 'sum', 'median'], \ 'Cannot recognize sampling method. Possible values: "mean", "sum", "median".' if method_numerical == 'sum': sub_df1 = groupby.resample(freq, **resample_kwargs, include_groups=False).sum(numeric_only=True) elif method_numerical == 'mean': sub_df1 = groupby.resample(freq, **resample_kwargs, include_groups=False).mean(numeric_only=True) elif method_numerical == 'median': sub_df1 = groupby.resample(freq, **resample_kwargs, include_groups=False).median(numeric_only=True) else: print("Can't recognize sampling method") #Resample cat columns -> sub_df2 cat_cols = df.select_dtypes(include=['object']).columns.to_list() cat_cols.extend(groups) cat_cols = list(set(cat_cols)) groupby = df[cat_cols].groupby(groups) assert method_categorical in ['first', 'mode', 'last'] if method_categorical == 'first': sub_df2 = groupby.resample(freq, **resample_kwargs, include_groups=False).first() elif method_categorical == 'last': sub_df2 = groupby.resample(freq, **resample_kwargs, include_groups=False).last() elif method_categorical == 'mode': sub_df2 = groupby.resample(freq, **resample_kwargs, include_groups=False).agg(lambda x: tuple(stats.mode(x)[0])) #Merge sub_df1 and sub_df2 sub_df1 = sub_df1.drop(groups, axis=1, errors='ignore') sub_df2 = sub_df2.drop(groups, axis=1, errors='ignore') final_df = sub_df1.join(sub_df2) # Reset the user index, user should be a column final_df.reset_index(groups, inplace=True) return final_df