""" @file pca_project.py_in @namespace pca """ import plpy import time from linalg.matrix_ops import cast_dense_input_table_to_correct_columns from linalg.matrix_ops import create_temp_sparse_matrix_table_with_dims from linalg.matrix_ops import get_dims from linalg.matrix_ops import validate_dense from linalg.matrix_ops import validate_sparse from utilities.utilities import __mad_version from utilities.utilities import unique_string from utilities.utilities import _assert from utilities.utilities import _array_to_string from utilities.validate_args import columns_exist_in_table from utilities.validate_args import table_exists from utilities.utilities import add_postfix from utilities.validate_args import get_cols, get_cols_and_types from utilities.control import MinWarning version_wrapper = __mad_version() string_to_array = version_wrapper.select_vecfunc() array_to_string = version_wrapper.select_vec_return() ZERO_THRESHOLD = 1e-6 # Dense PCA project help function def pca_project_help(schema_madlib, usage_string=None, **kwargs): """ Given a usage string, give out function usage information. """ if usage_string is not None and \ usage_string.lower() in ("usage", "help", "?"): return """ ---------------------------------------------------------------- Usage ---------------------------------------------------------------- SELECT {schema_madlib}.pca_project ( 'tbl_source', -- Data table 'pc_table', -- Table with principal componenents (obtained as output from pca_train) 'tbl_result', -- Result table 'row_id', -- Name of the column containing the row_id -- Optional Parameters ---------------------------------------------------------------- 'tbl_residual', -- Residual table (Default: NULL) 'tbl_result_summary', -- Result summary table (Default : NULL) ); Note that if the principal components in pc_table were learnt using grouping_cols in {schema_madlib}.pca_train(), the tbl_source used here must also have those grouping columns. This will fail otherwise. Output Tables -------------------------------------------------------------------- The output is divided into three tables (two of which are optional) -------------------------------------------------------------------- The output table ('tbl_result' above) encodes a dense matrix with the projection onto the principal components. The matrix contains the following columns: 'row_id' INTEGER, -- Row id of the output matrix 'row_vec' DOUBLE PRECISION[], -- A vector containing elements in the row of the matrix grouping_col -- The grouping columns present in the 'pc_table', if any -------------------------------------------------------------------- The residual table ('tbl_residual' above) encodes a dense residual matrix which has the following columns 'row_id' INTEGER, -- Row id of the output matrix 'row_vec' DOUBLE PRECISION[], -- A vector containing elements in the row of the matrix grouping_col -- The grouping columns present in the 'pc_table', if any -------------------------------------------------------------------- The result summary table ('tbl_result_summary' above) has the following columns 'exec_time' INTEGER, -- Wall clock time (ms) of the function. 'residual_norm' DOUBLE PRECISION, -- Absolute error of the residuals 'relative_residual_norm' DOUBLE PRECISION, -- Relative error of the residuals grouping_col -- The grouping columns present in the 'pc_table', if any ---------------------------------------------------------------- """.format(schema_madlib=schema_madlib) else: if usage_string is not None and \ usage_string.lower() in ("example", "examples"): return """ ---------------------------------------------------------------- Examples ---------------------------------------------------------------- -- Run pca_project() using a model table generated without grouping_cols. -- Create input table for pca_project() DROP TABLE IF EXISTS mat_proj; CREATE TABLE mat_proj ( row_id integer, row_vec double precision[] ); COPY mat_proj (row_id, row_vec) FROM stdin DELIMITER '|'; 1|{{1,2,3}} 2|{{2,1,2}} 3|{{3,2,1}} 11|{{1,2,3}} 21|{{2,1,2}} 31|{{3,2,1}} 41|{{1,2,4}} 12|{{1,3,3}} \. -- NOTE: Use the 'result_table' created using the example shown in -- {schema_madlib}.pca_train('examples'), as the 'pc_table' parameter here. DROP TABLE IF EXISTS mat_proj_out; SELECT {schema_madlib}.pca_project( 'mat_proj', 'result_table', 'mat_proj_out', 'row_id' ); SELECT * FROM mat_proj_out; ----------------------------------------------------------------------- -- Run pca_project() using a model table generated with grouping_cols. -- Create input table for pca_project(), with grouping DROP TABLE IF EXISTS mat_proj_grouped; CREATE TABLE mat_proj_grouped ( row_id integer, row_vec double precision[], matrix_id integer ); COPY mat_proj_grouped (row_id, row_vec, matrix_id) FROM stdin DELIMITER '|'; 1|{{1,2,3}}|1 2|{{2,1,2}}|1 3|{{3,2,1}}|1 4|{{1,2,3,4,5}}|2 5|{{2,1,2,4,5}}|2 6|{{3,2,1,4,5}}|2 \. -- NOTE: Use the 'result_table_grp' created using the example shown -- in {schema_madlib}.pca_train('examples'), as the 'pc_table' parameter -- here. 'result_table_grp' was created with 'matrix_id' as the -- grouping column, and the table 'mat_proj_grouped' should also have the -- 'matrix_id' column in it. DROP TABLE IF EXISTS mat_proj_grouped_out; SELECT {schema_madlib}.pca_project( 'mat_proj_grouped', 'result_table_grp', 'mat_proj_grouped_out', 'row_id' ); SELECT * FROM mat_proj_grouped_out; """.format(schema_madlib=schema_madlib) else: return """ ---------------------------------------------------------------- Summary: PCA Projection ---------------------------------------------------------------- PCA Projection: Projects a dataset to an already trained space of principal components. -- For function usage information, run SELECT {schema_madlib}.pca_project('usage'); -- """.format(schema_madlib=schema_madlib) # Sparse PCA help function # ------------------------------------------------------------------------ def pca_sparse_project_help(schema_madlib, usage_string=None, **kwargs): """ Given a usage string, give out function usage information. """ if usage_string is not None and \ usage_string.lower() in ("usage", "help", "?"): return """ ---------------------------------------------------------------- Usage ---------------------------------------------------------------- SELECT {schema_madlib}.pca_sparse_project ( 'tbl_source', -- Data table 'pc_table', -- Table with principal componenents (obtained as output from pca_train) 'tbl_result', -- Result table 'row_id', -- Name of the column containing the row_id 'col_id', -- Name of the column containing the col_id 'val_id', -- Name of the column containing the val_id 'row_dim' -- Row dimension of the sparse matrix 'col_dim' -- Column dimension of the sparse matrix -- Optional Parameters ---------------------------------------------------------------- 'tbl_residual', -- Residual table (Default: NULL) 'tbl_result_summary', -- Result summary table (Default : NULL) ); Note that if the principal components in 'pc_table' were learnt using grouping_cols in {schema_madlib}.pca_train(), the tbl_source used here must also have those grouping columns. This will fail otherwise. Output Tables ---------------------------------------------------------------- The output is divided into three tables (two of which are optional) ----------------------------------------------------------------------------------------- The output table ('tbl_result' above) encodes a dense matrix with the projection onto the principal components. The matrix contains the following columns: 'row_id' INTEGER, -- Row id of the output matrix 'row_vec' DOUBLE PRECISION[], -- A vector containing elements in the row of the matrix grouping_col -- The grouping columns present in the 'pc_table', if any ----------------------------------------------------------------------------------------- The residual table ('tbl_residual' above) encodes a dense residual matrix which has the following columns 'row_id' INTEGER, -- Row id of the output matrix 'row_vec' DOUBLE PRECISION[], -- A vector containing elements in the row of the matrix grouping_col -- The grouping columns present in the 'pc_table', if any ----------------------------------------------------------------------------------------- The result summary table ('tbl_result_summary' above) has the following columns 'exec_time' INTEGER, -- Wall clock time (ms) of the function. 'residual_norm' DOUBLE PRECISION, -- Absolute error of the residuals 'relative_residual_norm' DOUBLE PRECISION, -- Relative error of the residuals grouping_col -- The grouping columns present in the 'pc_table', if any ---------------------------------------------------------------- """.format(schema_madlib=schema_madlib) else: if usage_string is not None and \ usage_string.lower() in ("example", "examples"): return """ ---------------------------------------------------------------- Examples ---------------------------------------------------------------- -- Run pca_sparse_project() using a model table generated without grouping_cols. -- Create input table for pca_sparse_project() DROP TABLE IF EXISTS sparse_proj_mat; CREATE TABLE sparse_proj_mat ( row_id integer, col_id integer, val_id integer ); COPY sparse_proj_mat (row_id, col_id, val_id) FROM stdin delimiter '|'; 1|2|4 1|5|6 3|8|4 8|1|2 8|7|2 9|3|4 9|8|2 \. -- NOTE: Use the 'result_table_sparse' created using the example shown in -- {schema_madlib}.pca_sparse_train('examples'), as the 'pc_table' parameter here. SELECT {schema_madlib}.pca_sparse_project( 'sparse_proj_mat', 'result_table_sparse', 'sparse_proj_mat_out', 'row_id', 'col_id', 'val_id', 10, 10 ); SELECT * FROM sparse_proj_mat_out; -- Run pca_sparse_project() using a model table generated with grouping_cols. -- Create input table for pca_sparse_project(), with grouping DROP TABLE IF EXISTS sparse_proj_mat_with_grouping; CREATE TABLE sparse_proj_mat_with_grouping ( row_id integer, col_id integer, val_id integer, matrix_id integer ); COPY sparse_proj_mat_with_grouping (row_id, col_id, val_id, matrix_id) FROM stdin delimiter '|'; 8|7|2|1 9|3|4|1 9|8|2|1 1|2|4|2 1|5|6|2 6|6|12|2 \. -- NOTE: Use the 'result_table_sparsed_grouped' created using the example shown -- in {schema_madlib}.pca_sparse_train('examples'), as the 'pc_table' parameter -- here. 'result_table_sparsed_grouped' was created with 'matrix_id' as the -- grouping column, and the table 'sparse_proj_mat_with_grouping' should also have -- the 'matrix_id' column in it. SELECT {schema_madlib}.pca_sparse_project( 'sparse_proj_mat_with_grouping', 'result_table_sparsed_grouped', 'sparse_proj_mat_with_grouping_out', 'row_id', 'col_id', 'val_id', 10, 10 ); SELECT * FROM sparse_proj_mat_with_grouping_out; """.format(schema_madlib=schema_madlib) else: return """ ---------------------------------------------------------------- Summary: PCA Projection ---------------------------------------------------------------- PCA Projection: Projects a dataset to an already trained space of principal components. -- For function usage information, run: SELECT {schema_madlib}.pca_sparse_project('usage'); -- """.format(schema_madlib=schema_madlib) def _validate_args(schema_madlib, source_table, pc_table, out_table, row_id, col_id=None, val_id=None, row_dim=None, col_dim=None, residual_table=None, result_summary_table=None): """ Validates all arguments passed to the PCA function Args: @param schema_madlib Name of MADlib schema @param source_table Name of the input table (containing data to project) @param pc_table Name of table with principal components (output by the training function) @param out_table Name of output table to store projection result @param row_id Name of the row_id column @param col_id Name of the col_id column (only for sparse matrices) @param val_id Name of the val_id column (only for sparse matrices) @param row_dim Number of rows in input matrix (only for sparse matrices) @param col_dim Number of columns in input matrix (only for sparse matrices) @param residual_table Name of the residual table (to store error in projection) @param result_summary_table Name of result summary table Returns: None Throws: plpy.error if any argument is invalid """ _assert(source_table is not None and table_exists(source_table), "PCA error: Source data table does not exist!") _assert(pc_table is not None and table_exists(pc_table), "PCA error: Principal comp. table does not exist!") _assert(table_exists(add_postfix(pc_table, "_mean")), "PCA error: Source data table column means does not exist!") # Make sure that the output table does not exist # Also check that the output table is not null _assert(out_table and out_table.strip(), "PCA error: Invalid output table name.") _assert(not table_exists(out_table, only_first_schema=True), "PCA error: Output table {0} already exists!".format(str(out_table))) # Check that the result summary table is not empty if result_summary_table is not None: _assert(result_summary_table.strip(), "PCA error: Invalid result summary table name!") _assert(not table_exists(result_summary_table, only_first_schema=True), "PCA error: Result summary table {0} already exists!". format(result_summary_table)) # Check that the result summary table is not empty if residual_table is not None: _assert(residual_table.strip(), "PCA error: Invalid residual table name!") _assert(not table_exists(residual_table, only_first_schema=True), "PCA error: Residual table {0} already exists!". format(residual_table)) # Check that the row_id exists _assert(columns_exist_in_table(source_table, [row_id], schema_madlib), "PCA error: {1} column does not exist in {0}!". format(source_table, "NULL" if row_id is None else row_id)) # For sparse inputs: Check that the row_id exists if col_id or val_id: _assert(col_id, "PCA error: Column ID should be provided if value ID is input!") _assert(val_id, "PCA error: Value ID should be provided if column ID is input!") _assert(columns_exist_in_table(source_table, [col_id], schema_madlib), "PCA error: {1} column does not exist in {0}!". format(source_table, col_id)) _assert(columns_exist_in_table(source_table, [val_id], schema_madlib), "PCA error: {1} column does not exist in {0}!". format(source_table, val_id)) _assert(row_dim > 0 and col_dim > 0, "PCA error: row_dim/col_dim should be positive integer") # ------------------------------------------------------------------------ def pca_sparse_project(schema_madlib, source_table, pc_table, out_table, row_id, col_id, val_id, row_dim, col_dim, residual_table, result_summary_table, **kwargs): """ PCA projection of the matrix in source_table. This function is the specific call for pca projection. It projects the input matrix into the principal components. Args: @param schema_madlib Name of MADlib schema @param source_table Name of the input table (containing data to project) @param pc_table Name of table with principal components (output by the training function) @param out_table Name of output table to store projection result @param row_id Name of the row_id column @param col_id Name of the col_id column @param val_id Name of the val_id column @param row_dim Number of rows in input matrix @param col_dim Number of columns in input matrix @param residual_table Name of the residual table (to store error in projection) @param result_summary_table Name of result summary table Returns: None Throws: plpy.error if any argument is invalid """ pca_project_wrap(schema_madlib, source_table, pc_table, out_table, row_id, residual_table, result_summary_table, True, col_id, val_id, row_dim, col_dim) # ------------------------------------------------------------------------ def pca_project(schema_madlib, source_table, pc_table, out_table, row_id, residual_table, result_summary_table, **kwargs): """ PCA projection of the matrix in source_table. This function is the specific call for pca projection. It projects the input matrix into the principal components. Args: @param schema_madlib Name of MADlib schema @param source_table Name of the input table (containing data to project) @param pc_table Name of table with principal components (output by the training function) @param out_table Name of output table to store projection result @param row_id Name of the row_id column @param residual_table Name of the residual table (to store error in projection) @param result_summary_table Name of result summary table Returns: None Throws: plpy.error if any argument is invalid """ pca_project_wrap(schema_madlib, source_table, pc_table, out_table, row_id, residual_table, result_summary_table) def pca_project_wrap(schema_madlib, source_table, pc_table, out_table, row_id, residual_table, result_summary_table, is_sparse=False, col_id=None, val_id=None, row_dim=None, col_dim=None, **kwargs): """ This wrapper was added to support grouping columns. This function does the necessary pre-processing for handling grouping_cols, if set. It then constructs a single query that includes a separate "madlib.pca_project_wrap(...)" for each group. """ # Reset the message level to avoid random messages old_msg_level = plpy.execute(""" SELECT setting FROM pg_settings WHERE name='client_min_messages' """)[0]['setting'] plpy.execute('SET client_min_messages TO warning') if is_sparse: _validate_args(schema_madlib, source_table, pc_table, out_table, row_id, col_id, val_id, row_dim, col_dim, residual_table, result_summary_table) else: _validate_args(schema_madlib, source_table, pc_table, out_table, row_id, None, None, None, None, residual_table, result_summary_table) # If we add new columns to the pca_train output table in the future, they should # be included in this list: pc_table_model_cols = ['row_id', 'principal_components', 'std_dev', 'proportion'] grouping_cols_list = [col for col in get_cols(pc_table) if col not in pc_table_model_cols] grouping_cols = '' if grouping_cols_list: grouping_cols = ', '.join(grouping_cols_list) other_columns_in_table = [col for col in get_cols(source_table) if col not in grouping_cols_list] grouping_cols_clause = '' if(grouping_cols): # validate the grouping columns. We currently only support grouping_cols # to be column names in the source_table, and not expressions! _assert(columns_exist_in_table(source_table, grouping_cols_list, schema_madlib), """PCA error: One or more grouping columns in {0} do not exist in {1}, but the model in {2} was learnt with grouping!""".format(grouping_cols, source_table, pc_table)) distinct_grouping_values = plpy.execute(""" SELECT DISTINCT {grouping_cols} FROM {source_table} """.format(grouping_cols=grouping_cols, source_table=source_table)) cols_names_types = get_cols_and_types(source_table) grouping_cols_clause = ', ' + ', '.join([c_name+" "+c_type for (c_name, c_type) in cols_names_types if c_name in grouping_cols_list]) # Create all output tables plpy.execute(""" DROP TABLE IF EXISTS {0}; CREATE TABLE {0} ( row_id INTEGER, row_vec double precision[] {1} ) """.format(out_table, grouping_cols_clause)) if result_summary_table: plpy.execute( """ DROP TABLE IF EXISTS {0}; CREATE TABLE {0} ( exec_time FLOAT8, residual_norm FLOAT8, relative_residual_norm FLOAT8 {1} ) """.format(result_summary_table, grouping_cols_clause)) else: result_summary_table = '' if residual_table: plpy.execute(""" DROP TABLE IF EXISTS {0}; CREATE TABLE {0} ( row_id INTEGER, row_vec double precision[] {1} ) """.format(residual_table, grouping_cols_clause)) if not residual_table: residual_table = '' # declare variables whose values will be different for each group, if # grouping_cols is specified grouping_where_clause = '' sparse_where_condition = '' select_grouping_cols = '' grouping_cols_values = '' result_summary_table_temp = '' other_columns_in_pc_table = [col for col in get_cols(pc_table) if col not in grouping_cols_list] temp_pc_table_columns = ', '.join(other_columns_in_pc_table) original_row_id = row_id other_columns_in_table.remove(row_id) temp_source_table_columns = ','.join(other_columns_in_table) pca_union_call_list = [] grp_id = 0 if not is_sparse: col_id = 'NULL' val_id = 'NULL' row_dim = 0 col_dim = 0 while True: if grouping_cols: grp_value_dict = distinct_grouping_values[grp_id] where_conditions = ' AND '.join([str(key)+"="+str(value) for (key, value) in grp_value_dict.items()]) sparse_where_condition = ' AND ' + where_conditions grouping_where_clause = ' WHERE ' + where_conditions select_grouping_cols = ', ' + ', '.join([str(value)+" AS "+key for (key, value) in grp_value_dict.items()]) grouping_cols_values = ', ' + ', '.join([str(value) for (key, value) in grp_value_dict.items()]) pca_union_call_list.append(""" {schema_madlib}._pca_project_union('{source_table}', '{pc_table}', '{out_table}', '{row_id}', '{original_row_id}', '{grouping_cols}', '{grouping_cols_clause}', '{residual_table}', '{result_summary_table}', {grp_id}, '{grouping_where_clause}', '{sparse_where_condition}','{select_grouping_cols}', '{grouping_cols_values}', '{temp_source_table_columns}', '{temp_pc_table_columns}', {is_sparse}, '{col_id}', '{val_id}', {row_dim}, {col_dim}) """.format(schema_madlib=schema_madlib, source_table=source_table, pc_table=pc_table, out_table=out_table, row_id=row_id, original_row_id=original_row_id, grouping_cols=grouping_cols, grouping_cols_clause=grouping_cols_clause, residual_table=residual_table, result_summary_table=result_summary_table, grp_id=grp_id, grouping_where_clause=grouping_where_clause, sparse_where_condition=sparse_where_condition, select_grouping_cols=select_grouping_cols, grouping_cols_values=grouping_cols_values, temp_source_table_columns=temp_source_table_columns, temp_pc_table_columns=temp_pc_table_columns, is_sparse=is_sparse, col_id=col_id, val_id=val_id, row_dim=row_dim, col_dim=col_dim)) grp_id += 1 if not grouping_cols_list or len(distinct_grouping_values) == grp_id: break # "SELECT , , , ..." is expected to run each # in parallel. pca_union_call = 'SELECT ' + ', '.join(pca_union_call_list) plpy.execute(pca_union_call) plpy.execute("SET client_min_messages TO %s" % old_msg_level) def _pca_project_union(schema_madlib, source_table, pc_table, out_table, row_id, original_row_id, grouping_cols, grouping_cols_clause, residual_table, result_summary_table, grp_id, grouping_where_clause, sparse_where_condition, select_grouping_cols, grouping_cols_values, temp_source_table_columns, temp_pc_table_columns, is_sparse, col_id, val_id, row_dim, col_dim, **kwargs): """ The pca_project is performed over each group, if any. Args: @param schema_madlib -- madlib schema name @param source_table -- Source table name (dense matrix) @param pc_table -- Output table name for the principal components @param out_table -- Output table name @param row_id -- Column name for the ID for each row @param original_row_id -- copy of the row_id originally passed @param grouping_cols -- Comma-separated list of grouping columns (Default: NULL) @param grouping_cols_clause -- Part of the SQL query to be used with grouping_cols @param residual_table -- Residual table name @param result_summary_table -- Table name to store summary of results (Default: NULL) @param grp_id -- a place holder id for each group @param grouping_where_clause -- WHERE clause using grouping_cols @param select_grouping_cols -- SELECT clause using grouping_cols @param grouping_cols_values -- distinct values of the grouping_cols @param temp_source_table_columns -- SELECT caluse for creating temporary copy of the source_table @param temp_pc_table_columns -- non grouping_cols of the source_table @param is_sparse -- specifies if the PCA call is for sparse or dense matrices @param col_id -- sparse representation based detail @param val_id -- sparse representation based detail @param row_dim -- sparse representation based detail @param col_dim -- sparse representation based detail Returns: None """ out_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id) if grouping_cols: pc_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id) plpy.execute(""" CREATE TABLE {pc_table_grouped} AS SELECT {temp_pc_table_columns} FROM {pc_table} {grouping_where_clause} """.format(pc_table_grouped=pc_table_grouped, pc_table=pc_table, grouping_where_clause=grouping_where_clause, temp_pc_table_columns=temp_pc_table_columns)) else: pc_table_grouped = pc_table t0 = time.time() # measure the starting time # Step 1: Validate the input arguments if is_sparse: # Step 1.1: Create a copy of the sparse matrix and add row_dims and col_dims # Warning: This changes the column names of the table sparse_table_copy = "pg_temp." + unique_string() + "_sparse_table_copy" create_temp_sparse_matrix_table_with_dims(source_table, sparse_table_copy, row_id, col_id, val_id, row_dim, col_dim, sparse_where_condition) validate_sparse(sparse_table_copy, {'row': row_id, 'col': col_id, 'val': val_id}, check_col=False) # Step 1.2: Densify the input matrix x_dense = "pg_temp." + unique_string() + "_dense" plpy.execute(""" SELECT {schema_madlib}.matrix_densify( '{sparse_table_copy}', 'row={row_id}, col={col_id}, val={val_id}', '{x_dense}', 'row=row_id, col=col_id,val=row_vec') """.format(schema_madlib=schema_madlib, sparse_table_copy=sparse_table_copy, row_id=row_id, col_id=col_id, val_id=val_id, x_dense=x_dense)) plpy.execute(""" DROP TABLE IF EXISTS {0}; """.format(sparse_table_copy)) source_table_grouped = x_dense else: # For Dense matrix format only: # We can now ignore the original row_id for all computations since we will # create a new table with a row_id column that has not duplicates and ranges # from 1 to number of rows in the group/table. This is to mainly support the # grouping scneario where the row_id values might not range between 1 and # number of rows in the group, for each group. Doing this also just extends # this behavior for non-grouping scenarios too. If creating a new temp table # that corrects the row_id column is not of much importance in non-grouping # cases, we can avoid creating the temp table and save some computation time. # But, at the moment, the code creates the temp table even for the non-grouping # scenario. # We don't need to do this for sparse representation because of the nature # of its definition. # Preserve the mapping between new row_id created and the original row_id. This is # required only for dense input format. temp_row_id = "original_row_id" + unique_string() row_id_map_table = "rowid" + unique_string() plpy.execute(""" CREATE TABLE {row_id_map_table} AS SELECT {source_table}.{original_row_id} AS {temp_row_id}, {select_clause} FROM {source_table} {grouping_where_clause} """.format(row_id_map_table=row_id_map_table, original_row_id=original_row_id, temp_row_id=temp_row_id, source_table=source_table, select_clause=""" ROW_NUMBER() OVER() AS row_id """, grouping_where_clause=grouping_where_clause)) # Creation of this temp table is unnecessary if the scenario does not involve # grouping, and/or, the input table had perfect values for the row_id column. # This temp table will ensure pca works even when row_id of the source_table # does not have serially increasing numbers starting from 1; source_table_grouped = "pg_temp." + unique_string() + "group_" + str(grp_id) plpy.execute(""" CREATE TABLE {source_table_grouped} AS SELECT {row_id_map_table}.row_id, {temp_source_table_columns} FROM ( SELECT * FROM {source_table} {grouping_where_clause} ) t1 INNER JOIN {row_id_map_table} ON {row_id_map_table}.{temp_row_id}=t1.{row_id} """.format(source_table_grouped=source_table_grouped, temp_row_id=temp_row_id, row_id_map_table=row_id_map_table, row_id=row_id, source_table=source_table, grouping_where_clause=grouping_where_clause, temp_source_table_columns=temp_source_table_columns)) row_id = 'row_id' # Make sure that the table has row_id and row_vec source_table_copy = "pg_temp." + unique_string() need_new_column_names = cast_dense_input_table_to_correct_columns( schema_madlib, source_table_grouped, source_table_copy, row_id) if(need_new_column_names): source_table_grouped = source_table_copy [row_dim, col_dim] = get_dims(source_table_grouped, {'row': 'row_id', 'col': 'col_id', 'val': 'row_vec'}) validate_dense(source_table_grouped, {'row': 'row_id', 'col': 'col_id', 'val': 'row_vec'}, check_col=False, row_dim=row_dim) # Step 2: Compute the PCA Projection matrix # The R code to perform this step is # p <- princomp(mat) # low_rank_representation <- mat %*% p$loadings[,1:k] # First normalize the data (Column means) scaled_source_table = "pg_temp." + unique_string() + "_scaled_table" x_std_str = _array_to_string([1] * col_dim) pc_table_mean = add_postfix(pc_table, "_mean") plpy.execute( """ CREATE TABLE {scaled_source_table} AS SELECT row_id, ({schema_madlib}.utils_normalize_data( row_vec, (select column_mean from {pc_table_mean} {grouping_where_clause}), '{x_std_str}'::double precision[])) AS row_vec FROM {source_table_grouped} """.format(schema_madlib=schema_madlib, pc_table_mean=pc_table_mean, source_table_grouped=source_table_grouped, scaled_source_table=scaled_source_table, grouping_where_clause=grouping_where_clause, x_std_str=x_std_str)) plpy.execute( """ SELECT {schema_madlib}.matrix_mult('{scaled_source_table}', 'trans=false,row=row_id, col=col_id, val=row_vec', '{pc_table_grouped}', 'trans=TRUE, row=row_id, col=col_id, val=principal_components', '{out_table_grouped}', 'row=row_id, col=col_id,val=row_vec'); """.format(schema_madlib=schema_madlib, scaled_source_table=scaled_source_table, pc_table_grouped=pc_table_grouped, out_table_grouped=out_table_grouped)) # Step 3: Compute the Residual table (if required) # Residual table: res = mat - proj create_residual_table = False if residual_table or result_summary_table: residual_table_grouped = "pg_temp." + unique_string() + "_temp_residual" create_temp_residual_table = False if not residual_table: create_temp_residual_table = True else: create_residual_table = True approx_table = "pg_temp." + unique_string() + "_approx" # Build an approximate reconstruction of the data plpy.execute( """ SELECT {schema_madlib}.matrix_mult('{out_table_grouped}', 'row=row_id, col=col_id, val=row_vec', '{pc_table_grouped}', 'row=row_id, col=col_id, val=principal_components', '{approx_table}', 'row=row_id, col=col_id, val=row_vec'); """.format(schema_madlib=schema_madlib, out_table_grouped=out_table_grouped, pc_table_grouped=pc_table_grouped, approx_table=approx_table)) # Compute the difference between the reconstruction and real data # Note that both the approximation and source data are recentered here plpy.execute( """ SELECT {schema_madlib}.matrix_scale_and_add( '{scaled_source_table}', 'row=row_id, col=col_id, val=row_vec', '{approx_table}', 'row=row_id, col=col_id, val=row_vec', -1, '{residual_table_grouped}', 'row=row_id, col=col_id, val=row_vec'); """.format(schema_madlib=schema_madlib, scaled_source_table=scaled_source_table, approx_table=approx_table, residual_table_grouped=residual_table_grouped)) # Step 4: Compute the results summary table (if required) # If the residual table is not asked by the user, but he does ask for # result summary table, then we need to compute the residuals if result_summary_table: source_table_norm = plpy.execute( """ SELECT {schema_madlib}.matrix_norm('{source_table_grouped}', 'row=row_id, col=col_id, val=row_vec') as r """.format(schema_madlib=schema_madlib, source_table_grouped=source_table_grouped, row_id=row_id))[0]['r'] # Compute the norm of the residual table residual_norm = plpy.execute( """ SELECT {schema_madlib}.matrix_norm('{residual_table_grouped}', 'row=row_id, col=col_id, val=row_vec') as r """.format(schema_madlib=schema_madlib, residual_table_grouped=residual_table_grouped, row_id=row_id))[0]['r'] # Compute the relative error of the norm # Prevent division by zero if(source_table_norm > ZERO_THRESHOLD): relative_residual_norm = residual_norm / source_table_norm else: relative_residual_norm = 0 # Compute the time in milli-seconds t1 = time.time() dt = (t1 - t0) * 1000. plpy.execute( """ INSERT INTO {result_summary_table} VALUES ({dt}, {residual_norm}::double precision, {relative_residual_norm}::double precision {grouping_cols_values} ); """.format(dt=dt, residual_norm=residual_norm, result_summary_table=result_summary_table, relative_residual_norm=relative_residual_norm, grouping_cols_values=grouping_cols_values)) plpy.execute(""" DROP TABLE IF EXISTS {approx_table}; """.format(approx_table=approx_table)) if create_temp_residual_table: plpy.execute(""" DROP TABLE IF EXISTS {0}; """.format(residual_table_grouped)) if is_sparse: ## We don't have to join based on row_id for sparse project. if create_residual_table: plpy.execute(""" INSERT INTO {residual_table} SELECT * {select_grouping_cols} FROM {residual_table_grouped} """.format(residual_table=residual_table, select_grouping_cols=select_grouping_cols, residual_table_grouped=residual_table_grouped)) plpy.execute(""" INSERT INTO {out_table} SELECT * {select_grouping_cols} FROM {out_table_grouped} """.format(out_table=out_table, select_grouping_cols=select_grouping_cols, out_table_grouped=out_table_grouped)) else: output_table_cols = get_cols(out_table_grouped) output_table_cols.remove('row_id') output_table_select_clause = """{row_id_map_table}.{temp_row_id}, {out_table_cols} {select_grouping_cols} """.format(row_id_map_table=row_id_map_table, temp_row_id=temp_row_id, out_table_cols=', '.join(output_table_cols), select_grouping_cols=select_grouping_cols) if create_residual_table: plpy.execute(""" INSERT INTO {residual_table} SELECT {select_clause} FROM {residual_table_grouped} INNER JOIN {row_id_map_table} ON {row_id_map_table}.row_id={residual_table_grouped}.row_id """.format(residual_table=residual_table, select_clause=output_table_select_clause, residual_table_grouped=residual_table_grouped, row_id_map_table=row_id_map_table)) plpy.execute(""" INSERT INTO {out_table} SELECT {select_clause} FROM {out_table_grouped} INNER JOIN {row_id_map_table} ON {row_id_map_table}.row_id={out_table_grouped}.row_id """.format(out_table=out_table, select_clause=output_table_select_clause, out_table_grouped=out_table_grouped, row_id_map_table=row_id_map_table)) plpy.execute(""" DROP TABLE IF EXISTS {0}; """.format(row_id_map_table)) if residual_table or result_summary_table: plpy.execute(""" DROP TABLE IF EXISTS {0} """.format(residual_table_grouped)) plpy.execute(""" DROP TABLE IF EXISTS {0}; DROP TABLE IF EXISTS {1}; DROP TABLE IF EXISTS {2}; """.format(scaled_source_table, source_table_grouped, out_table_grouped)) if grouping_cols: plpy.execute(""" DROP TABLE IF EXISTS {0}; """.format(pc_table_grouped))