# coding=utf-8 # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ @file minibatch_preprocessing.py_in """ from math import ceil import plpy from control import MinWarning from utilities import add_postfix from utilities import _assert from utilities import get_seg_number from utilities import is_platform_pg from utilities import is_psql_boolean_type from utilities import is_psql_char_type from utilities import is_psql_numeric_type from utilities import is_psql_int_type from utilities import is_string_formatted_as_array_expression from utilities import py_list_to_sql_string from utilities import split_quoted_delimited_str from utilities import unique_string from utilities import _string_to_array from utilities import validate_module_input_params from mean_std_dev_calculator import MeanStdDevCalculator from validate_args import get_expr_type from validate_args import output_tbl_valid from validate_args import _tbl_dimension_rownum m4_changequote(`') # These are readonly variables, do not modify MINIBATCH_OUTPUT_DEPENDENT_COLNAME = "dependent_varname" MINIBATCH_OUTPUT_INDEPENDENT_COLNAME = "independent_varname" class MiniBatchPreProcessor: """ This class is responsible for executing the main logic of mini batch preprocessing, which packs multiple rows of selected columns from the source table into one row based on the buffer size """ @MinWarning("error") def __init__(self, schema_madlib, source_table, output_table, dependent_varname, independent_varname, grouping_cols, buffer_size, one_hot_encode_int_dep_var=False, **kwargs): self.schema_madlib = schema_madlib self.source_table = source_table self.output_table = output_table self.dependent_varname = dependent_varname self.independent_varname = independent_varname self.buffer_size = buffer_size self.grouping_cols = grouping_cols self.one_hot_encode_int_dep_var = one_hot_encode_int_dep_var self.module_name = "minibatch_preprocessor" self.output_standardization_table = add_postfix(self.output_table, "_standardization") self.output_summary_table = add_postfix(self.output_table, "_summary") self._validate_minibatch_preprocessor_params() @MinWarning("error") def minibatch_preprocessor(self): # Get array expressions for both dep and indep variables from the # MiniBatchQueryFormatter class qry_formatter = MiniBatchQueryFormatter(self.source_table) dependent_var_dbtype = get_expr_type(self.dependent_varname, self.source_table) dep_var_array_str, dep_var_classes_str = qry_formatter.\ get_dep_var_array_and_classes(self.dependent_varname, dependent_var_dbtype, self.one_hot_encode_int_dep_var) indep_var_array_str = qry_formatter.get_indep_var_array_str( self.independent_varname) standardizer = MiniBatchStandardizer(self.schema_madlib, self.source_table, dep_var_array_str, indep_var_array_str, self.grouping_cols, self.output_standardization_table) total_num_rows_processed, avg_num_rows_processed, \ num_missing_rows_skipped = self._get_skipped_rows_processed_count( dep_var_array_str, indep_var_array_str) calculated_buffer_size = MiniBatchBufferSizeCalculator.\ calculate_default_buffer_size(self.buffer_size, avg_num_rows_processed, standardizer.independent_var_dimension) """ This query does the following: 1. Standardize the independent variables in the input table (see MiniBatchStandardizer for more details) 2. Filter out rows with null values either in dependent/independent variables 3. Converts the input dependent/independent variables into arrays (see MiniBatchQueryFormatter for more details) 4. Based on the buffer size, pack the dependent/independent arrays into matrices Notes 1. we are ignoring null in x because a. matrix_agg does not support null b. __utils_normalize_data returns null if any element of the array contains NULL 2. Please keep the null checking where clause of this query in sync with the query in _get_skipped_rows_processed_count. We are doing this null check in two places to prevent another pass of the entire dataset. """ # This ID is the unique row id that get assigned to each row after # preprocessing unique_row_id = "__id__" standardize_query = standardizer.get_query_for_standardizing() if self.grouping_cols: partition_by = 'PARTITION BY {0}'.format(self.grouping_cols) grouping_cols_select_col = self.grouping_cols + ',' grouping_cols_group_by = ',' + self.grouping_cols else: partition_by = '' grouping_cols_select_col = '' grouping_cols_group_by = '' sql = """ CREATE TABLE {self.output_table} AS SELECT {row_id}, {grouping_cols_select_col} {self.schema_madlib}.matrix_agg({dep_colname}) as {dep_colname}, {self.schema_madlib}.matrix_agg({ind_colname}) as {ind_colname} FROM ( SELECT (row_number() OVER ({partition_by} ORDER BY random()) - 1) / {buffer_size} as {row_id}, * FROM ( {standardize_query} ) sub_query_1 WHERE NOT {self.schema_madlib}.array_contains_null({dep_colname}) AND NOT {self.schema_madlib}.array_contains_null({ind_colname}) ) sub_query_2 GROUP BY {row_id} {grouping_cols_group_by} {distributed_by_clause} """.format(buffer_size=calculated_buffer_size, dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME, ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME, row_id=unique_row_id, distributed_by_clause='' if is_platform_pg() else 'DISTRIBUTED RANDOMLY', **locals()) plpy.execute(sql) standardizer.create_output_standardization_table() standardizer.drop_standardized_table() MiniBatchSummarizer.create_output_summary_table( self.output_summary_table, self.source_table, self.output_table, self.dependent_varname, self.independent_varname, dependent_var_dbtype, calculated_buffer_size, dep_var_classes_str, total_num_rows_processed, num_missing_rows_skipped, self.grouping_cols) def _validate_minibatch_preprocessor_params(self): # Test if the independent variable can be typecasted to a double # precision array and let postgres validate the expression # Note that this will not fail for 2d arrays but the standardizer will # fail because utils_normalize_data will throw an error typecasted_ind_varname = "{0}::double precision[]".format( self.independent_varname) validate_module_input_params(self.source_table, self.output_table, typecasted_ind_varname, self.dependent_varname, self.module_name, self.grouping_cols, [self.output_summary_table, self.output_standardization_table]) num_of_dependent_cols = split_quoted_delimited_str( self.dependent_varname) _assert(len(num_of_dependent_cols) == 1, "Invalid dependent_varname: only one column name is allowed " "as input.") if self.buffer_size is not None: _assert(self.buffer_size > 0, """minibatch_preprocessor: The buffer size has to be a positive integer or NULL.""") def _get_skipped_rows_processed_count(self, dep_var_array, indep_var_array): # Note: Keep the null checking where clause of this query in sync with # the main create output table query. query = """ SELECT SUM(source_table_row_count_by_group) AS source_table_row_count, SUM(num_rows_processed_by_group) AS total_num_rows_processed, AVG(num_rows_processed_by_group) AS avg_num_rows_processed FROM ( SELECT COUNT(*) AS source_table_row_count_by_group, SUM(CASE WHEN NOT {sm}.array_contains_null({dep_array}) AND NOT {sm}.array_contains_null({indep_array}) THEN 1 ELSE 0 END) AS num_rows_processed_by_group FROM {source_table} {group_by_clause} ) AS s """.format(sm=self.schema_madlib, source_table=self.source_table, dep_array=dep_var_array, indep_array=indep_var_array, group_by_clause="GROUP BY {0}".format(self.grouping_cols) if self.grouping_cols else '') result = plpy.execute(query) ## SUM and AVG both return float, and we have to cast them into int fo ## summary table. For avg_num_rows_processed we need to ceil first so ## that the minimum won't be 0 source_table_row_count = int(result[0]['source_table_row_count']) total_num_rows_processed = int(result[0]['total_num_rows_processed']) avg_num_rows_processed = int(ceil(result[0]['avg_num_rows_processed'])) if (not source_table_row_count or not total_num_rows_processed or not avg_num_rows_processed): plpy.error("Error while getting the row count of the source table " "({0})".format(self.source_table)) num_missing_rows_skipped = source_table_row_count - total_num_rows_processed return (total_num_rows_processed, avg_num_rows_processed, num_missing_rows_skipped) class MiniBatchQueryFormatter: """ This class is responsible for formatting the independent and dependent variables into arrays so that they can be matrix agged by the preprocessor class. """ def __init__(self, source_table): self.source_table = source_table def get_dep_var_array_and_classes(self, dependent_varname, dependent_var_dbtype, to_one_hot_encode_int=False): """ :param dependent_varname: Name of the dependent variable :param to_one_hot_encode_int: Boolean to determine if dependent variable needs to be one hot encoded (independent of type) :return: This function returns a tuple of 1. A string with transformed dependent varname depending on it's type 2. All the distinct dependent class levels encoded as a string If dep_type == numeric , do not encode 1. dependent_varname = rings transformed_value = ARRAY[[rings1], [rings2], []] class_level_str = ARRAY[rings = 'rings1', rings = 'rings2']::integer[] 2. dependent_varname = ARRAY[a, b, c] transformed_value = ARRAY[[a1, b1, c1], [a2, b2, c2], []] class_level_str = 'NULL::TEXT' else if dep_type in ("text", "boolean"), encode: 3. dependent_varname = rings (encoding) transformed_value = ARRAY[[rings1=1, rings1=2], [rings2=1, rings2=2], []] class_level_str = 'NULL::TEXT' """ dep_var_class_value_str = 'NULL::TEXT' is_dep_var_int_type = is_psql_int_type(dependent_var_dbtype) to_one_hot_encode = (is_psql_char_type(dependent_var_dbtype) or is_psql_boolean_type(dependent_var_dbtype) or (to_one_hot_encode_int and is_dep_var_int_type)) if to_one_hot_encode: # for encoding, since dependent_varname can also be a logical # expression, there is a () around it dep_level_sql = """ SELECT DISTINCT ({dependent_varname}) AS class FROM {source_table} WHERE ({dependent_varname}) is NOT NULL """.format(dependent_varname=dependent_varname, source_table=self.source_table) dep_levels = plpy.execute(dep_level_sql) dep_var_classes = sorted(l["class"] for l in dep_levels) dep_var_array_str = \ self._get_one_hot_encoded_str(dependent_varname, dep_var_classes, to_quote=not is_dep_var_int_type) dep_var_class_value_str = \ py_list_to_sql_string(dep_var_classes, array_type=dependent_var_dbtype) elif "[]" in dependent_var_dbtype: dep_var_array_str = dependent_varname elif is_psql_numeric_type(dependent_var_dbtype): dep_var_array_str = 'ARRAY[{0}]'.format(dependent_varname) else: plpy.error("""Invalid dependent variable type. It should be character, boolean, numeric, or array.""") return dep_var_array_str, dep_var_class_value_str def _get_one_hot_encoded_str(self, var_name, var_classes, to_quote=True): def add_quote(c): return "'{0}'".format(c) if to_quote else c one_hot_list = ["({0}) = {1}".format(var_name, add_quote(c)) for c in var_classes] return 'ARRAY[{0}]::INTEGER[]'.format(', '.join(one_hot_list)) def get_indep_var_array_str(self, independent_varname): """ we assume that all the independent features are either numeric or already encoded by the user. Supported formats 1. ‘ARRAY[x1,x2,x3]’ , where x1,x2,x3 are columns in source table with scalar values 2. ‘x1’, where x1 is a single column in source table, with value as an array, like ARRAY[1,2,3] or {1,2,3} we don't deal with a mixture of scalar and array independent variables """ return "({0})::DOUBLE PRECISION[]".format(independent_varname) class MiniBatchStandardizer: """ This class is responsible for 1. Calculating the mean and std dev for independent variables 2. Format the query to standardize the input table based on the calculated mean/std dev 3. Creating the output standardization table """ def __init__(self, schema_madlib, source_table, dep_var_array_str, indep_var_array_str, grouping_cols, output_standardization_table): self.schema_madlib = schema_madlib self.source_table = source_table self.dep_var_array_str = dep_var_array_str self.indep_var_array_str = indep_var_array_str self.grouping_cols = grouping_cols self.output_standardization_table = output_standardization_table self.x_mean_table = unique_string(desp='x_mean_table') self.x_mean_str = None self.x_std_dev_str = None self.standardized_table = unique_string(desp='std_table') self._calculate_mean_and_std_dev_str() def _calculate_mean_and_std_dev_str(self): self.independent_var_dimension, _ = _tbl_dimension_rownum( self.schema_madlib, self.source_table, self.indep_var_array_str, skip_row_count=True) calculator = MeanStdDevCalculator(self.schema_madlib, self.source_table, self.indep_var_array_str, self.independent_var_dimension) """ For grouping, we have to create a temp mean table because we have to join the mean table and the source table by grouping cols. It's easier to call utils_normalize_data with a table instead of storing this information in memory in a data structure. When if there is no grouping, a simple python string is enough to store the mean and std_dev. """ if self.grouping_cols: calculator.create_mean_std_table_for_ind_var_grouping( self.x_mean_table, self.grouping_cols) else: self.x_mean_str, self.x_std_dev_str = calculator.\ get_mean_and_std_dev_for_ind_var() def get_query_for_standardizing(self): if self.grouping_cols: query = self._get_query_for_standardizing_with_grouping() else: query = self._get_query_for_standardizing_without_grouping() plpy.execute(query) return "select * from {0}".format(self.standardized_table) def _get_query_for_standardizing_without_grouping(self): return """ CREATE TEMP TABLE {self.standardized_table} AS SELECT {self.dep_var_array_str} AS {dep_colname}, {self.schema_madlib}.utils_normalize_data( {self.indep_var_array_str}, '{self.x_mean_str}'::double precision[], '{self.x_std_dev_str}'::double precision[] ) AS {ind_colname} FROM {self.source_table} """.format(dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME, ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME, self=self) def _get_query_for_standardizing_with_grouping(self): return """ CREATE TEMP TABLE {self.standardized_table} AS SELECT {self.dep_var_array_str} AS {dep_colname}, {self.schema_madlib}.utils_normalize_data( {self.indep_var_array_str}, __x__.mean::double precision[], __x__.std::double precision[] ) AS {ind_colname}, {self.source_table}.{self.grouping_cols} FROM {self.source_table} INNER JOIN {self.x_mean_table} AS __x__ ON {self.source_table}.{self.grouping_cols} = __x__.{self.grouping_cols} """.format( self=self, dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME, ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME) def create_output_standardization_table(self): if self.grouping_cols: query = """ ALTER TABLE {self.x_mean_table} RENAME TO {self.output_standardization_table} """.format(self=self) else: query = """ CREATE TABLE {self.output_standardization_table} AS SELECT '{self.x_mean_str}'::double precision[] AS mean, '{self.x_std_dev_str}'::double precision[] AS std """.format(self=self) plpy.execute(query) def drop_standardized_table(self): plpy.execute("DROP TABLE IF EXISTS {0}".format(self.standardized_table)) class MiniBatchSummarizer: @staticmethod def create_output_summary_table(output_summary_table, source_table, output_table, dep_var_array_str, indep_var_array_str, dependent_var_dbtype, buffer_size, class_values, total_num_rows_processed, num_missing_rows_skipped, grouping_cols): # 1. All the string columns are surrounded by "$$" to take care of # special characters in the column name. # 2. We have to typecast all the string column names to ::TEXT because # otherwise there is a warning from psql # WARNING: column "independent_varname" has type "unknown" query = """ CREATE TABLE {output_summary_table} AS SELECT $${source_table}$$::TEXT AS source_table, $${output_table}$$::TEXT AS output_table, $${dependent_varname}$$::TEXT AS dependent_varname, $${independent_varname}$$::TEXT AS independent_varname, $${dependent_var_dbtype}$$::TEXT AS dependent_vartype, {buffer_size} AS buffer_size, {class_values} AS class_values, {total_num_rows_processed} AS num_rows_processed, {num_missing_rows_skipped} AS num_missing_rows_skipped, {grouping_cols_str}::TEXT AS grouping_cols """.format(dependent_varname=dep_var_array_str, independent_varname=indep_var_array_str, grouping_cols_str="$$" + grouping_cols + "$$" if grouping_cols else "NULL", **locals()) plpy.execute(query) class MiniBatchBufferSizeCalculator: """ This class is responsible for calculating the buffer size. This is a work in progress, final formula might change. """ @staticmethod def calculate_default_buffer_size(buffer_size, avg_num_rows_processed, independent_var_dimension): if buffer_size is not None: return buffer_size num_of_segments = get_seg_number() default_buffer_size = min(75000000.0/independent_var_dimension, float(avg_num_rows_processed)/num_of_segments) """ 1. For float number, we need at least one more buffer for the fraction part, e.g. if default_buffer_size = 0.25, we need to round it to 1. 2. Ceiling returns a float in python2. So after ceiling, we cast default_buffer_size to int, because it will be used to calculate the row id of the packed input. The query looks like this SELECT (row_number() OVER (ORDER BY random()) - 1) / {buffer_size} This calculation has to return an int for which buffer_size has to be an int """ return int(ceil(default_buffer_size)) class MiniBatchDocumentation: @staticmethod def minibatch_preprocessor_help(schema_madlib, message): method = "minibatch_preprocessor" summary = """ ---------------------------------------------------------------- SUMMARY ---------------------------------------------------------------- MiniBatch Preprocessor is a utility function to pre process the input data for use with models that support mini-batching as an optimization #TODO add more here For more details on function usage: SELECT {schema_madlib}.{method}('usage') For a small example on using the function: SELECT {schema_madlib}.{method}('example') """.format(**locals()) usage = """ --------------------------------------------------------------------------- USAGE --------------------------------------------------------------------------- SELECT {schema_madlib}.{method}( source_table, -- TEXT. Name of the table containing input data. Can also be a view output_table , -- TEXT. Name of the output table for mini-batching dependent_varname, -- TEXT. Name of the dependent variable column independent_varname, -- TEXT. Name of the independent variable column buffer_size -- INTEGER. Number of source input rows to pack into batch ); --------------------------------------------------------------------------- OUTPUT --------------------------------------------------------------------------- The output table produced by MiniBatch Preprocessor contains the following columns: id -- INTEGER. Unique id for packed table. dependent_varname -- FLOAT8[]. Packed array of dependent variables. independent_varname -- FLOAT8[]. Packed array of independent variables. --------------------------------------------------------------------------- The algorithm also creates a summary table named _summary that has the following columns: source_table -- Source table name. output_table -- Output table name from preprocessor. dependent_varname -- Dependent variable from the original table. independent_varname -- Independent variables from the original table. buffer_size -- Buffer size used in preprocessing step. class_values -- Class values of the dependent variable (‘NULL’(as TEXT type) for non categorical vars). num_rows_processed -- The total number of rows that were used in the computation. num_missing_rows_skipped -- The total number of rows that were skipped because of NULL values in them. grouping_cols -- NULL if no grouping_col was specified during training, and a comma separated list of grouping column names if not. --------------------------------------------------------------------------- The algorithm also creates a standardization table that stores some metadata used during the model training and prediction, and is named _standardization. It has the following columns: grouping_cols -- If grouping_col is specified during training, a column for each grouping column is created. mean -- The mean for all input features (used for normalization). std -- The standard deviation for all input features (used for normalization). """.format(**locals()) example = """ -- Create input table CREATE TABLE iris_data( id INTEGER, attributes NUMERIC[], class_text text, class INTEGER, state VARCHAR ); COPY iris_data (attributes, class_text, class, state) FROM STDIN NULL '?' DELIMITER '|'; {4.4,3.2,1.3,0.2}|Iris_setosa|1|Alaska {5.0,3.5,1.6,0.6}|Iris_setosa|1|Alaska {5.1,3.8,1.9,0.4}|Iris_setosa|1|Alaska {4.8,3.0,1.4,0.3}|Iris_setosa|1|Alaska {5.1,3.8,1.6,0.2}|Iris_setosa|1|Alaska {5.7,2.8,4.5,1.3}|Iris_versicolor|2|Alaska {6.3,3.3,4.7,1.6}|Iris_versicolor|2|Alaska {4.9,2.4,3.3,1.0}|Iris_versicolor|2|Alaska {6.6,2.9,4.6,1.3}|Iris_versicolor|2|Alaska {5.2,2.7,3.9,1.4}|Iris_versicolor|2|Alaska {5.0,2.0,3.5,1.0}|Iris_versicolor|2|Alaska {4.8,3.0,1.4,0.1}|Iris_setosa|1|Tennessee {4.3,3.0,1.1,0.1}|Iris_setosa|1|Tennessee {5.8,4.0,1.2,0.2}|Iris_setosa|1|Tennessee {5.7,4.4,1.5,0.4}|Iris_setosa|1|Tennessee {5.4,3.9,1.3,0.4}|Iris_setosa|1|Tennessee {6.0,2.9,4.5,1.5}|Iris_versicolor|2|Tennessee {5.7,2.6,3.5,1.0}|Iris_versicolor|2|Tennessee {5.5,2.4,3.8,1.1}|Iris_versicolor|2|Tennessee {5.5,2.4,3.7,1.0}|Iris_versicolor|2|Tennessee {5.8,2.7,3.9,1.2}|Iris_versicolor|2|Tennessee {6.0,2.7,5.1,1.6}|Iris_versicolor|2|Tennessee \. -- #TODO add description here DROP TABLE IF EXISTS iris_data_batch, iris_data_batch_standardization, iris_data_batch_summary; SELECT madlib.minibatch_preprocessor('iris_data', 'iris_data_batch', 'class_text', 'attributes', 3); -- #TODO add description here NULL buffer size DROP TABLE IF EXISTS iris_data_batch, iris_data_batch_standardization, iris_data_batch_summary; SELECT madlib.minibatch_preprocessor('iris_data', 'iris_data_batch', 'class_text', 'attributes'); """ if not message: return summary elif message.lower() in ('usage', 'help', '?'): return usage elif message.lower() == 'example': return example return """ No such option. Use "SELECT {schema_madlib}.minibatch_preprocessor()" for help. """.format(**locals()) # ---------------------------------------------------------------------