# coding=utf-8 # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # Please refer to the encode_categorical.sql_in file for the documentation """ @file encode_categorical.py_in """ from itertools import count from bisect import bisect import plpy from control import MinWarning from utilities import _assert from utilities import strip_end_quotes from utilities import split_quoted_delimited_str from utilities import extract_keyvalue_params from utilities import add_postfix from utilities import is_platform_pg from validate_args import input_tbl_valid, output_tbl_valid, is_var_valid from validate_args import unquote_ident, quote_ident from validate_args import get_expr_type, get_cols_and_types import math # If there are more than 1600 columns for the output table, # it might lead to a database error. MAX_OUTPUT_COLUMN_COUNT = 1600 # If a column name has more than 63 characters it gets trimmed automatically, # which may cause an exception. Enable the output dictionary in this case. MAX_COLUMN_LENGTH = 63 class CategoricalEncoder(object): """Encoding class to encode categorical variables""" def __init__(self, schema_madlib, source_table, output_table, categorical_cols, categorical_cols_to_exclude=None, row_id=None, top=None, value_to_drop=None, encode_null=False, output_type=None, output_dictionary=False, distributed_by=None, **kwargs): super(CategoricalEncoder, self).__init__() self.schema_madlib = schema_madlib self.source_table = source_table self.output_table = output_table self.categorical_cols = categorical_cols self.categorical_cols_to_exclude = categorical_cols_to_exclude self.row_id = row_id self.top = top self.value_to_drop = value_to_drop self.encode_null = encode_null self.output_type = 'column' if not output_type else output_type.lower() self.output_dictionary = output_dictionary self.distributed_by = distributed_by if not is_platform_pg() else None self._name_others_col = "_misc__" self._array_out_name = "__encoded_variables__" # create new parameters after validating and parsing inputs # (order of below statements is relevant) self._parse_parameters() self._validate_parameters() self._output_cols, self._col_to_type = self._get_cols_to_encode() if not self._output_cols: plpy.error("Encoding categorical: No categorical columns available " "to encode (or all have been excluded)") # _parse_dictlike_parameters uses _output_cols and has to be below # _get_cols_to_encode self._parse_dictlike_parameters() # ---------------------------------------------------------------------- def build_output_table(self): if self._top: distinct_values = self._get_top_values() else: distinct_values = self._get_distinct_values() categorical_col_str = self._build_encoding_str(distinct_values) if self._output_dictionary: self._build_output_dictionary(distinct_values) if self._distributed_by: if self._distributed_by[0].lower() == 'randomly': distribution_str = 'DISTRIBUTED RANDOMLY' else: distribution_str = 'DISTRIBUTED BY ({0})'.format(', '.join(self._distributed_by)) else: distribution_str = '' if self._row_id_cols: other_cols = ', '.join(self._row_id_cols) else: all_cols = [k for k, v in self._all_cols_types] other_cols = ', '.join([c for c in all_cols if (c not in self._output_cols and unquote_ident(c) not in self._output_cols)]) if self.output_type == 'array': categorical_col_str = ("ARRAY[{0}] AS {1}". format(categorical_col_str, self._array_out_name)) elif self.output_type == 'svec': categorical_col_str = ("ARRAY[{0}]::float8[]::{1}.svec AS {2}". format(categorical_col_str, self.schema_madlib, self._array_out_name)) out_sql = """ CREATE TABLE {out} AS ( SELECT {other_cols}, {cols} FROM {src} ) {dist} """.format(out=self.output_table, other_cols=other_cols, cols=categorical_col_str, src=self.source_table, dist=distribution_str) plpy.execute(out_sql) # ------------------------------------------------------------------------- def _parse_parameters(self): # columns to encode if not self.categorical_cols or self.categorical_cols.strip() == '*': self._categorical_cols = [] else: self._categorical_cols = split_quoted_delimited_str(self.categorical_cols) self._categorical_cols_to_exclude = split_quoted_delimited_str(self.categorical_cols_to_exclude) # columns that determine the index for output table self._row_id_cols = split_quoted_delimited_str(self.row_id) # output type for specific supported types all_output_types = sorted(['array', 'column', 'svec']) try: # allow user to specify a prefix substring of # supported output types. This works because the supported # output types have unique prefixes. self.output_type = next(s for s in all_output_types if s.startswith(self.output_type)) except StopIteration: # next() returns a StopIteration if no element found plpy.error("Encoding categorical: Output type should be one of {0}". format(','.join(all_output_types))) # flag to build a dictionary table self._output_dictionary = (True if self.output_type in ('array', 'svec') else self.output_dictionary) # how to distribute the output table (for distributed platforms) if not is_platform_pg(): if self.distributed_by: self._distributed_by = split_quoted_delimited_str(self.distributed_by.strip()) else: self._distributed_by = self._row_id_cols if self._row_id_cols else [] else: self._distributed_by = [] # ------------------------------------------------------------------------- def _parse_dictlike_parameters(self): def _cast_validate_top(val_str): # each value of top can be either a float in (0.0, 1.0) or an integer try: val = float(val_str) _assert(0 < val, "Encoding categorical error: top value should be positive") if val >= 1: # if val >= 1 then it should be input as a valid integer # (e.g. 2 is valid but 2.0 is not) _assert(val_str.isdigit(), "Encoding categorical error: top value should " "be an integer if greater than or equal to 1") val = int(val_str) except ValueError: plpy.error("Encoding categorical error: top value ({0})" " is an invalid numeric value".format(val_str)) return val if self.top: # top can be a mapping between col name and a value or ... out_param = extract_keyvalue_params(self.top, allow_duplicates=False, lower_case_names=False) if not out_param: # ... a global value (without =) that applies to all columns val = _cast_validate_top(self.top) out_param = dict([(i, val) for i in self._output_cols]) else: for k in out_param: out_param[k] = _cast_validate_top(out_param[k]) self._top = out_param else: self._top = {} if self.value_to_drop: # value_to_drop can be a mapping between col name and a value or ... out_param = extract_keyvalue_params(self.value_to_drop, allow_duplicates=False) if not out_param: # ... a global value (without =) that applies to all columns out_param = dict([(i, str(self.value_to_drop)) for i in self._output_cols]) self._value_to_drop = out_param else: self._value_to_drop = {} # ------------------------------------------------------------------------- def _validate_parameters(self): input_tbl_valid(self.source_table, "Encoding categorical") output_tbl_valid(self.output_table, "Encoding categorical") output_tbl_valid(add_postfix(self.output_table, "_dictionary"), "Encoding categorical") if self._categorical_cols: _assert(is_var_valid(self.source_table, ','.join(self._categorical_cols)), "Encoding categorical: Not all columns from ({0}) present in source table ({1})" .format(self._categorical_cols, self.source_table)) if self._row_id_cols: _assert(is_var_valid(self.source_table, ','.join(self._row_id_cols)), "Encoding categorical: Not all columns from ({0}) present in source table ({1})" .format(self._row_id_cols, self.source_table)) # ------------------------------------------------------------------------------ def _is_col_name_long(self, col_to_values): col_len = [] for col, values in col_to_values.items(): # Max col name length calculation: # the name of column (col) + # name of longest value in column (item) + # underscore (1) values_len = [] for v in values: if v: if not isinstance(v, (list, tuple)): values_len.append(len(str(v))) else: values_len.append(len(self._name_others_col)) col_len.append(len(col) + max(values_len) + 1) return max(col_len) > MAX_COLUMN_LENGTH # ------------------------------------------------------------------------- def _get_quoted_unquoted(self, col_dict, col): """Special get function to check for quoted and unquoted col names in the given dictionary, since user input is not guaranteed to follow quote_ident rules It is assumed that 'col' is originally quoted - the end quotes are stripped to obtain the unquoted form. """ return col_dict.get(col, col_dict.get( quote_ident(col), col_dict.get( unquote_ident(col), None))) # ------------------------------------------------------------------------- def _build_encoding_str(self, col_to_values): """ Build string to create categorical columns Returns: str. The string that goes into the select clause of a query to obtain categorical column encodings """ def _build_case_stmt(col, v, seq): """ Return a CASE statement that compares 'col' with the value v If v is a list then col is compared to be any one of the elements in v (with special handling for NULL value). """ col_no_quotes = strip_end_quotes(col.strip()) if isinstance(v, (list, tuple)): # all values collected in a list are to be treated as a single # categorical factor if v: non_null_v_str = ','.join(["'%s'" % (i) for i in v if i is not None]) if non_null_v_str: value_str = "IN ({0})".format(non_null_v_str) if None in v: value_str += "OR {0} IS NULL".format(col) else: value_str = "IS NULL" else: return '' v_type = list cast_str = '' elif v is None: value_str = "IS NULL" v_type = None cast_str = '' else: # assume v is a string if not list/tuple and not None value_str = "= '{v}'".format(v=str(v)) v_type = str cast_str = '::TEXT' if self.output_type not in ('array', 'svec'): if not self._output_dictionary: value_names = {None: 'null', list: self._name_others_col, str: strip_end_quotes(v)} alias_val = value_names[v_type] else: alias_val = str(seq) alias = 'AS ' + quote_ident('{0}_{1}'.format(col_no_quotes, alias_val)) else: # if output_type is array-like then each case does not # require an alias alias = "" return ("(CASE WHEN ({schema_madlib}.__to_char({col}){cast_str} {value_str}) " "THEN 1 ELSE 0 END)::INTEGER {alias}". format(schema_madlib=self.schema_madlib, col=col, cast_str=cast_str, value_str=value_str, alias=alias)) self._output_dictionary = (self._output_dictionary or self._is_col_name_long(col_to_values)) col_switch_list = [] for col in self._output_cols: value_switch_list = [_build_case_stmt(col, v, i + 1) for i, v in enumerate(col_to_values[col])] # value_switch_list could have '' strings or empty lists which # need to be filtered out before adding to the case switch list col_switch_list.append(','.join([i for i in value_switch_list if i])) return ',\n'.join(col_switch_list) # ---------------------------------------------------------------------- def _build_output_dictionary(self, col_to_values): """ Create a mapping between column names in output table and their corresponding meaning""" def _get_value_name(v): if v is None: return "NULL" elif isinstance(v, (list, tuple)): return ",".join([_get_value_name(i) for i in v]) else: return str(v) dict_tbl_name = add_postfix(self.output_table, "_dictionary") plpy.execute(""" CREATE TABLE {tbl} ( encoded_column_name TEXT, index INTEGER, variable TEXT, value TEXT ) """.format(tbl=dict_tbl_name)) global_seq = count(1) for col in self._output_cols: values = col_to_values[col] local_seq = count(1) col_no_quotes = strip_end_quotes(col) if self.output_type != 'column': encoded_col_name = "__encoded_variables__" seq = global_seq else: encoded_col_name = '"{col_no_quotes}_{seq}"' seq = local_seq insert_template = "('%s', {seq}, '{col}', '{value_str}'::TEXT)" % (encoded_col_name) insert_values = [insert_template. format(col=col, col_no_quotes=col_no_quotes, seq=next(seq), value_str=_get_value_name(v)) for v in values] plpy.execute("""INSERT INTO {tbl} VALUES {insert_str} """. format(tbl=dict_tbl_name, insert_str=',\n'.join(insert_values))) # ---------------------------------------------------------------------- def _find_cat_features(self): self._all_cols_types = get_cols_and_types(self.source_table) # any column belonging to the following types are considered categorical int_types = ['integer', 'smallint'] text_types = ['text', 'varchar', 'character varying', 'char', 'character'] boolean_types = ['boolean'] self._cat_types = set(int_types + text_types + boolean_types) self._cat_features = [c for (c, t) in self._all_cols_types if t in self._cat_types] # ------------------------------------------------------------------------- def _get_cols_to_encode(self): """ Expand '*' syntax and exclude some categorical columns We also exclude from row_id columns """ self._find_cat_features() # include the quoted name and the unquoted name in exclusion set # to allow user to provide either form exclude_set = set(self._categorical_cols_to_exclude + self._row_id_cols) exclude_set |= set(quote_ident(i) for i in self._categorical_cols_to_exclude + self._row_id_cols) if not self._categorical_cols: features = [f for f in self._cat_features if f not in exclude_set] col_to_type = dict([(c, get_expr_type(c, self.source_table)) for c in features]) else: col_to_type = {} features = [] ignored_cols = [] for col in self._categorical_cols: col_type = get_expr_type(col, self.source_table).lower() if col_type in self._cat_types: features.append(col) col_to_type[col] = col_type else: ignored_cols.append(col) if ignored_cols: plpy.warning("Encoding categorical: Ignoring non-categorical columns ({0})". format(','.join(ignored_cols))) return features, col_to_type # ------------------------------------------------------------------------- def _get_top_values(self): """ Get the top values for each column. Note: this function computes the frequencies for values of each column even if that column is not part of the 'top' argument. An improvement here is to compute top values only for requested columns and use distinct values for others. """ def _cum_sum(values, start=0): for v in values: start += v yield start top_val_sql_list = [] for col in self._output_cols: if not self.encode_null: filter_str = 'WHERE ({col}) IS NOT NULL'.format(col=col) else: filter_str = '' # get value distribution for each column independently top_val_sql_list.append(""" SELECT '{col}' as col_name, array_agg(f order by c desc) as value, array_agg(c order by c desc) as freq FROM ( SELECT {schema_madlib}.__to_char({col})::text as f, count(*)::integer as c FROM {tbl} {filter_str} GROUP BY {col} ) q """.format(schema_madlib=self.schema_madlib, col=col, tbl=self.source_table, filter_str=filter_str)) top_values = plpy.execute('\n UNION ALL \n'.join(top_val_sql_list)) # top_values is now a list of dictionary, each element # giving the frequency of the values in a column top_distinct_values = {} for each_col_data in top_values: col, ordered_values, ordered_freq = [each_col_data[i] for i in ('col_name', 'value', 'freq')] # drop reference from distinct values if (col in self._value_to_drop and self._value_to_drop[col] in ordered_values): drop_index = ordered_values.index(self._value_to_drop[col]) ordered_values.pop(drop_index) ordered_freq.pop(drop_index) if not ordered_values: plpy.error("Encoding categorical error: " "No top values found for {0} or " "all values dropped as per function arguments" "(value_to_drop, encode_null)".format(col)) if col in self._top: # for each column find at which point does the top 'k' values occur top_arg = self._top[col] if top_arg >= 1: # top >= 1 is considered a integer count of values to pick ... k = int(top_arg) else: # ..., top < 1 is considered a percent of total count threshold = int(math.ceil(top_arg * sum(ordered_freq))) cum_freq = list(_cum_sum(ordered_freq)) k = bisect(cum_freq, threshold, 0, len(ordered_values)-2) + 1 top_distinct_values[col] = ordered_values[:k] if k < len(ordered_values): # putting this into an if check avoids empty list # at the end when k >= actual number of values top_distinct_values[col] += [list(ordered_values[k:])] else: # need all values for this column since no top provided top_distinct_values[col] = ordered_values return top_distinct_values # ------------------------------------------------------------------------- def _get_distinct_values(self): """ Find distinct values of each categorical column """ # Boolean variables when passed to Python will refer to the values as # 'True', 'False' with the first letter as capital, which will cause the # generated column name as _True/False that needs # double quoting. To ensure the boolean values remain lower case, cast # the column to text format before copying to Python so that boolean The # same logic is applied generated column name with _null and _misc array_agg_str = ',\n'.join('array_agg(DISTINCT ({c})::TEXT) AS {c_quoted}'. format(c=c, c_quoted=quote_ident(c)) for c in self._output_cols) if self.encode_null: # Some platforms don't include NULL values as part of the # array_agg(DISTINCT ...). Below checks explicitly for NULL values null_str = ', ' + ',\n'.join( 'bool_or(CASE WHEN {c} IS NULL THEN True ELSE False END)' ' AS "{c_}_isnull"'.format(c=c, c_=strip_end_quotes(c.strip())) for c in self._output_cols) else: null_str = '' col_values_data = plpy.execute("SELECT {0} {1} FROM {2}". format(array_agg_str, null_str, self.source_table))[0] # Collect the distinct values (possibly including null) for every column distinct_values = {} for col in self._output_cols: dv = [str(i) if i is not None else i for i in col_values_data[col]] # drop reference from distinct values if (col in self._value_to_drop and self._value_to_drop[col] in dv): dv.remove(self._value_to_drop[col]) # check for NULL values if not self.encode_null: if None in dv: # ignore NULL if encode_null = False dv.remove(None) else: null_col_name = '"{c}_isnull"'.format(c=strip_end_quotes(col.strip())) col_contains_null = self._get_quoted_unquoted(col_values_data, null_col_name) if col_contains_null and None not in dv: dv.append(None) if not dv: plpy.error("Encoding categorical error: " "No distinct values found for {0} or " "all distinct values dropped as per function arguments" "(value_to_drop, encode_null)".format(col)) distinct_values[col] = dv return distinct_values # ------------------------------------------------------------------------------ def encode_categorical_variables( schema_madlib, source_table, output_table, categorical_cols, categorical_cols_to_exclude=None, row_id=None, top=None, value_to_drop=None, encode_null=False, output_type='column', output_dictionary=False, distributed_by=None, **kwargs): """ Main function to encode categorical variables Args: @param source_table:str, Name of table containing categorical variable @param output_table:str, Name of table to output dummy variables @param categorical_cols:str, Comma-separated list of column names to dummy code (can be '*') @param categorical_cols_to_exclude:str, Comma-separated list of column names to exclude (if categorical_cols = '*') @param row_id: str, Columns from source table to index output table @param top: str, Parameter to include only top values of a categorical variable @param value_to_drop: str, Parameter to set reference column in dummy coding @param encode_null: bool, If True, NULL is treated as a categorical value @param output_type: str, Parameter to determine if output should be an array, svec or individual columns Can take values ('column', 'array', 'svec') @param output_dictionary: bool, If True columns names are simplified and a separate mapping table is created to understand the names @param distributed_by: str, Comma-separated list of column names to use for distribution of output """ with MinWarning('warning'): encoder = CategoricalEncoder(schema_madlib, source_table, output_table, categorical_cols, categorical_cols_to_exclude, row_id, top, value_to_drop, encode_null, output_type, output_dictionary, distributed_by) encoder.build_output_table() return None # --------------------------------------------------------------- def encode_categorical_help(schema_madlib, message, **kwargs): """ Help function for encode_categorical_variables Args: @param schema_madlib @param message: string, Help message string @param kwargs Returns: String. Help/usage information """ if not message: help_string = """ ----------------------------------------------------------------------- SUMMARY ----------------------------------------------------------------------- Categorical variables [1] require special attention in regression analysis because, unlike dichotomous or continuous variables, they cannot be entered into the regression equation just as they are. For example, if you have a variable called race that is coded with 1=Hispanic, 2=Asian, 3=Black, 4=White, then entering race in your regression will look at the linear effect of the race variable, which is probably not what you intended. Instead, categorical variables like this need to be coded into a series of indicator variables which can then be entered into the regression model. There are a variety of coding systems that can be used for coding categorical variables, including one-hot, dummy, effects, orthogonal, and Helmert. We currently support one-hot and dummy coding techniques. Dummy coding is used when a researcher wants to compare other groups of the predictor variable with one specific group of the predictor variable. Often, the specific group to compare with is called the reference group. One-hot encoding is similar to dummy coding except it builds indicator (0/1) columns (cast as numeric) for each value of each category. Only one of these columns could take on the value 1 for each row (data point). There is no reference category for this function. For more details on function usage: SELECT {madlib}.encode_categorical_variables('usage') """ elif message in ['usage', 'help', '?']: help_string = """ ----------------------------------------------------------------------- USAGE ----------------------------------------------------------------------- SELECT {madlib}.encode_categorical_variables ( source_table, -- Name of source table output_table, -- Name of table to output encoded data categorical_cols, -- Comma-separated list of columns to encode -- (can be *) categorical_cols_to_exclude, -- (Optional) Columns to exclude if using '*' above row_id, -- (Optional) Columns corresponding to -- primary keys of source table top, -- (Optional) Parameter to encode only top values value_to_drop, -- (Optional) Reference value to drop for each column encode_null, -- (Optional) Whether NULL should be treated as one of the -- values of the categorical variable. output_type, -- (Optional) Get encoded variables in individual columns -- or as an array (Can be 'column', 'array', or 'svec') output_dictionary, -- (Optional) Simplify output column naming and provide -- a mapping between simple names and meaning distributed_by -- (Optional) Columns to use for the distribution policy of -- the output table (does not apply for Postgresql) ) Refer to online documentation for details on above parameters. ----------------------------------------------------------------------- OUTPUT ----------------------------------------------------------------------- If there are index columns in the 'source_table' specified by the parameter 'row_id' (see below), then the output table will contain only the index columns 'row_id' and the encoded columns. If the parameter 'row_id' is not specified, then all columns from the 'source_table', with the exception of the original categorical columns, will be included in the 'output_table'. """ else: help_string = """No such option. For more details on function usage: SELECT {madlib}.encode_categorical_variables('usage') """ return help_string.format(madlib=schema_madlib) # ---------------------------------------------------------------------