# coding=utf-8 # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # Prediction Metrics # This module provides a set of prediction accuracy metrics. It is a support # module for several machine learning algorithms that require metrics to # validate their models. A typical function will take a set of "prediction" and # "observation" values to calculate the desired metric, unless noted otherwise. # Grouping is supported by all of these functions (except confusion matrix). # Please refer to the pred_metrics.sql_in file for the documentation import plpy from utilities.validate_args import input_tbl_valid, output_tbl_valid from utilities.validate_args import is_var_valid, cols_in_tbl_valid from utilities.utilities import _assert from utilities.utilities import split_quoted_delimited_str def _validate_args(table_in, table_out, validate_cols): input_tbl_valid(table_in, "Prediction Metrics") output_tbl_valid(table_out, "Prediction Metrics") is_var_valid(table_in, ', '.join(validate_cols)) def _parse_grp_col_str(grp_col_str): group_set = set(split_quoted_delimited_str(grp_col_str)) return list(group_set) # ---------------------------------------------------------------------- def _create_output_table(table_in, table_out, agg_fun, agg_name, grp_col_str=None): """ Create an output table with optional groups General template function that builds an output table with grouping while applying an aggregate function. Args: @param agg_fun: str, SQL aggregate to be executed @param grp_cols: str, Comma-separated list of column names """ grp_cols = _parse_grp_col_str(grp_col_str) _validate_args(table_in, table_out, grp_cols) if not grp_cols: grp_by_str = grp_out_str = "" else: grp_by_str = "GROUP BY " + grp_col_str grp_out_str = grp_col_str + ", " plpy.execute(""" CREATE TABLE {table_out} AS SELECT {grp_out_str} {agg_fun} AS {agg_name} FROM {table_in} {grp_by_str} """.format(**locals())) # Mean Absolute Error. def mean_abs_error(table_in, table_out, pred_col, obs_col, grp_cols=None): cols_in_tbl_valid(table_in, [pred_col, obs_col], "pred_metrics") mean_abs_agg = "AVG(ABS({0} - {1}))".format(pred_col, obs_col) _create_output_table(table_in, table_out, mean_abs_agg, "mean_abs_error", grp_cols) # Mean Absolute Percentage Error. def mean_abs_perc_error(table_in, table_out, pred_col, obs_col, grp_cols=None): cols_in_tbl_valid(table_in, [pred_col, obs_col], "pred_metrics") mean_abs_perc_agg = "AVG(ABS({0} - {1})/NULLIF({1}, 0))".format(pred_col, obs_col) _create_output_table(table_in, table_out, mean_abs_perc_agg, "mean_abs_perc_error", grp_cols) # Mean Percentage Error. def mean_perc_error(table_in, table_out, pred_col, obs_col, grp_cols=None): cols_in_tbl_valid(table_in, [pred_col, obs_col], "pred_metrics") mean_perc_agg = "AVG(({0} - {1})/NULLIF({1}, 0))".format(pred_col, obs_col) _create_output_table(table_in, table_out, mean_perc_agg, "mean_perc_error", grp_cols) # Mean Squared Error. def mean_squared_error(table_in, table_out, pred_col, obs_col, grp_cols=None): cols_in_tbl_valid(table_in, [pred_col, obs_col], "pred_metrics") mean_sq_agg = "AVG(({0} - {1})^2)".format(pred_col, obs_col) _create_output_table(table_in, table_out, mean_sq_agg, "mean_squared_error", grp_cols) def metric_agg_help_msg(schema_madlib, message, agg_name, **kwargs): if not message: help_string = """ ------------------------------------------------------------ SUMMARY ------------------------------------------------------------ Functionality: Evaluate prediction results using metric functions. This module provides a set of prediction accuracy metrics. It is a support module for several machine learning algorithms that require metrics to validate their models. The function will take "prediction" and "observation" values to calculate the desired metric. Grouping is supported by all of these functions. """ elif message.lower().strip() in ['usage', 'help', '?']: help_string = """ ------------------------------------------------------------ USAGE ------------------------------------------------------------ SELECT {schema_madlib}.{agg_name}( 'table_in', -- Name of the input table 'table_out', -- Table name to store the metric results 'pred_col', -- Column name containing prediction results 'obs_col', -- Column name containing observed (actual) values 'grouping_cols' -- Comma-separated list of columns to use as group-by ); """ else: help_string = "No such option. Use {schema_madlib}.{agg_name}('usage')" return help_string.format(**locals()) def _get_r2_score_sql(table_in, pred_col, obs_col, grp_col_str=None): """ Generate the SQL query to compute r2 score. This function abstracts the SQL to calculate r2 score from actually building the output table. This allows reusing the query for adjusted r2 function. Args: @param table_in: str, Input table name containing the data @param pred_col: str, Column name containing the predictions @param obs_col: str, Column name containing the actual observed class @param grp_col_str: str, Comma-separated list of columns to group by Definition: r2 = 1 - SS_res / SS_tot where SS_res = sum (pred - obs)^2 SS_tot = sum (obs - mean)^2 """ if grp_col_str: grp_out_str = grp_col_str + "," grp_by_str = "GROUP BY " + grp_col_str partition_str = "PARTITION BY " + grp_col_str else: grp_out_str = grp_by_str = partition_str = "" return """ SELECT {grp_out_str} 1 - avg(({pred_col} - {obs_col})^2)/var_pop({obs_col}) AS r2_score FROM {table_in} {grp_by_str} """.format(**locals()) def r2_score(table_in, table_out, pred_col, obs_col, grp_col_str=None): grp_cols = _parse_grp_col_str(grp_col_str) _validate_args(table_in, table_out, grp_cols + [pred_col, obs_col]) plpy.execute(""" CREATE TABLE {table_out} AS {r2_score_sql} """.format(table_out=table_out, r2_score_sql=_get_r2_score_sql(table_in, pred_col, obs_col, grp_col_str))) def r2_score_help(schema_madlib, message, **kwargs): if not message: help_string = """ ------------------------------------------------------------ SUMMARY ------------------------------------------------------------ Functionality: Compute coefficient of determination This function returns the coefficient of determination (R2) between the predicted and observed values. An R2 of 1 indicates that the regression line perfectly fits the data, while an R2 of 0 indicates that the line does not fit the data at all. Negative values of R2 may occur when fitting non-linear functions to data. """ elif message.lower().strip() in ['usage', 'help', '?']: help_string = """ ------------------------------------------------------------ USAGE ------------------------------------------------------------ SELECT {schema_madlib}.r2_score( 'table_in', -- Name of the input table 'table_out', -- Table name to store the metric results 'pred_col', -- Column name containing prediction results 'obs_col', -- Column name containing observed (actual) values 'grouping_cols' -- Comma-separated list of columns to use as group-by ); """ else: help_string = "No such option. Use {schema_madlib}.r2_score('usage')" return help_string.format(**locals()) def adjusted_r2_score(table_in, table_out, pred_col, obs_col, n_predictors, train_size, grp_col_str): """ Compute the adjusted r2 score Args: @param table_in: str, Input table name containing the data @param pred_col: str, Column name containing the predictions @param obs_col: str, Column name containing the actual observed class @param grp_col_str: str, Comma-separated list of columns to group by Definition: adj_r2 = 1 - (1 - r2) * (n - 1) / (n - p) where n = degrees of freedom p = number of explanatory variables """ grp_cols = _parse_grp_col_str(grp_col_str) _validate_args(table_in, table_out, grp_cols + [pred_col, obs_col]) plpy.execute(""" CREATE TABLE {table_out} AS SELECT *, 1 - ( ((1- r2_score) * ({train_size} -1)) / ({train_size} - {n_predictors} - 1) ) AS adjusted_r2_score FROM ( {r2_score_sql} )z """.format(r2_score_sql=_get_r2_score_sql(table_in, pred_col, obs_col, grp_col_str), **locals())) def adjusted_r2_score_help(schema_madlib, message, **kwargs): if not message: help_string = """ ------------------------------------------------------------ SUMMARY ------------------------------------------------------------ Functionality: Compute coefficient of determination (with adjustment for number of predictors) This function returns the adjusted R2 score. Adjusted R2 score is used to counter the problem of the R2 automatically increasing when extra explanatory variables are added to the model. It takes additional two integers describing the degrees of freedom of the model and the size of the training set over which it was developed, and returning the adjusted R-squared prediction accuracy metric. """ elif message.lower().strip() in ['usage', 'help', '?']: help_string = """ ------------------------------------------------------------ USAGE ------------------------------------------------------------ SELECT {schema_madlib}.adjusted_r2_score( 'table_in', -- Name of the input table 'table_out', -- Table name to store the metric results 'pred_col', -- Column name containing prediction results 'obs_col', -- Column name containing observed (actual) values 'num_predictors', -- Number of predictors (features) used in the model 'training_size', -- Size of the training data 'grouping_cols' -- Comma-separated list of columns to use as group-by ); """ else: help_string = "No such option. Use {schema_madlib}.adjusted_r2_score('usage')" return help_string.format(**locals()) def binary_classifier(table_in, table_out, pred_col, obs_col, grp_col_str=None): """ Get multiple metrics useful for evaluating binary classifiers. The 'obs_col' column in 'table_in' is assumed to be a int/float column with two level: 0 and 1, 1 is considered positive and 0 is negative. The 'pred_col' is assumed to be a float column that gives the probability of the prediction being positive. """ grp_cols = _parse_grp_col_str(grp_col_str) _validate_args(table_in, table_out, grp_cols + [pred_col, obs_col]) obs_levels = [i['c'] for i in plpy.execute("SELECT {0} as c FROM {1} GROUP BY {0} ORDER BY {0}". format(obs_col, table_in)) ] _assert(obs_levels == [0, 1], "Prediction metrics: Observed levels should be 0 and 1") if grp_col_str: grp_str = grp_col_str + "," partition_str = "PARTITION BY " + grp_col_str else: grp_str = partition_str = "" sql_st = """ CREATE TABLE {table_out} AS SELECT *, tp::float8 / NULLIF(tp + fn, 0) AS tpr, tn::float8 / NULLIF(fp + tn, 0) AS tnr, tp::float8 / NULLIF(tp + fp, 0) AS ppv, tn::float8 / NULLIF(tn + fn, 0) AS npv, fp::float8 / NULLIF(fp + tn, 0) AS fpr, fp::float8 / NULLIF(fp + tp, 0) AS fdr, fn::float8 / NULLIF(fn + tp, 0) AS fnr, (tp + tn)::float8 / NULLIF(tp + tn + fp + fn, 0) AS acc, tp * 2.0 / NULLIF(2.0 * tp + fp + fn, 0) AS f1 FROM ( SELECT {grp_str} threshold, sum(t) OVER (w) AS tp, sum(f) OVER (w) AS fp, sum(t) OVER ({partition_str}) - sum(t) OVER (w) AS fn, sum(f) OVER ({partition_str}) - sum(f) OVER (w) AS tn FROM ( SELECT {grp_str} {pred_col} AS threshold, sum({obs_col}::int) AS t, count(*) - sum({obs_col}::int) AS f FROM {table_in} GROUP BY {grp_str} threshold ) x WINDOW w AS ({partition_str} ORDER BY threshold DESC) ) y """.format(**locals()) plpy.execute(sql_st) def binary_classifier_help(schema_madlib, message, **kwargs): if not message: help_string = """ ------------------------------------------------------------ SUMMARY ------------------------------------------------------------ Functionality: Metrics for binary classification This function returns an output table with a number of metrics commonly used to evaluated binary classification. List of the various metrics output by the function: - tp is the count of correctly-classified positives - tn is the count of correctly-classified negatives - fp is the count of misclassified negatives - fn is the count of misclassified positives - tpr = tp / (tp + fn) - tnr = tn / (fp + tn) - ppv = tp / (tp + fp) - npv = tn / (tn + fn) - fpr = fp / (fp + tn) - fdr = 1 - ppv - fnr = fn / (fn + tp). - acc = (tp + tn) / (tp + tn + fp + fn). - f1 = 2* tp / (2 * tp + fp + fn). """ elif message.lower().strip() in ['usage', 'help', '?']: help_string = """ ------------------------------------------------------------ USAGE ------------------------------------------------------------ SELECT {schema_madlib}.binary_classifier( 'table_in', -- Name of the input table 'table_out', -- Table name to store the metric results 'pred_col', -- Column name containing prediction results 'obs_col', -- Column name containing observed (actual) values 'grouping_cols' -- Comma-separated list of columns to use as group-by ); """ else: help_string = "No such option. Use {schema_madlib}.binary_classifier('usage')" return help_string.format(**locals()) def area_under_roc(table_in, table_out, pred_col, obs_col, grp_col_str=None): """ Get area under ROC curve for a binary classifier. The 'obs_col' column in 'table_in' is assumed to be a int/float column with two level: 0 and 1, 1 is considered positive and 0 is negative. The 'pred_col' is assumed to be a float column that gives the probability of the prediction being positive. """ grp_cols = _parse_grp_col_str(grp_col_str) _validate_args(table_in, table_out, grp_cols + [pred_col, obs_col]) if grp_col_str: grp_str = grp_col_str + "," grp_by_str = "GROUP BY " + grp_col_str partition_str = "PARTITION BY " + grp_col_str else: grp_str = grp_by_str = partition_str = "" sql_st = """ CREATE TABLE {table_out} AS SELECT {grp_str} sum((tpr + prev_tpr) * (fpr - prev_fpr) * 0.5) AS area_under_roc FROM ( SELECT {grp_str} tpr, fpr, coalesce(lag(tpr) OVER ({partition_str} ORDER BY threshold DESC), 0) AS prev_tpr, coalesce(lag(fpr) OVER ({partition_str} ORDER BY threshold DESC), 0) AS prev_fpr FROM( SELECT {grp_str} threshold, sum(t) OVER ({partition_str} ORDER BY threshold DESC) * 1.0/ NULLIF(sum(t) OVER ({partition_str}), 0) AS tpr, sum(f) OVER ({partition_str} ORDER BY threshold DESC) * 1.0/NULLIF(sum(f) OVER ({partition_str}),0) AS fpr FROM ( SELECT {grp_str} {pred_col} AS threshold, sum({obs_col}::int) AS t, count(*) - sum({obs_col}::int) AS f FROM {table_in} GROUP BY {grp_str} threshold ) x ) y ) z {grp_by_str} """.format(**locals()) plpy.execute(sql_st) def area_under_roc_help(schema_madlib, message, **kwargs): if not message: help_string = """ ------------------------------------------------------------ SUMMARY ------------------------------------------------------------ Functionality: Area under the ROC curve for binary classification This function returns the area under the Receiver Operating Characteristic curve for binary classification (the AUC). The ROC curve is the curve relating the classifier's TPR and FPR metrics. Note that the binary classification function ({schema_madlib}.binary_classifier) can be used to obtain the data (tpr and fpr values) required for drawing the ROC curve. """ elif message.lower().strip() in ['usage', 'help', '?']: help_string = """ ------------------------------------------------------------ USAGE ------------------------------------------------------------ SELECT {schema_madlib}.area_under_roc( 'table_in', -- Name of the input table 'table_out', -- Table name to store the metric results 'pred_col', -- Column name containing prediction results 'obs_col', -- Column name containing observed (actual) values 'grouping_cols' -- Comma-separated list of columns to use as group-by ); """ else: help_string = "No such option. Use {schema_madlib}.area_under_roc('usage')" return help_string.format(**locals()) def confusion_matrix(table_in, table_out, pred_col, obs_col): """ Get the confusion matrix for a multi-class classifier Args: @param table_in: str, Input table name @param table_out: str, Output table name @param pred_col: str, Column name containing the predictions @param obs_col: str, Column name containing the actually classes (observations) 'pred_col' and 'obs_col' columns can have any number of levels (which could be different between the columns). The output confusion matrix will be N x N, where N is the combined number of unique levels. For level combinations that are not present in the input table, a 0 will be output as the frequency count. The output format is the matrix format as described in matrix_ops.sql_in. The rows are indexed with column 'row_id' with each corresponding observed class. The 'row_id' column gives the order of the observed classes. The predicted classes are in an array in the same order as described by 'row_id'. """ _validate_args(table_in, table_out, [pred_col, obs_col]) sql_st = """ CREATE TABLE {table_out} AS WITH all_levels AS ( SELECT {obs_col} AS a FROM {table_in} GROUP BY {obs_col} UNION SELECT {pred_col} as a FROM {table_in} GROUP BY {pred_col} ) SELECT ROW_NUMBER() over (ORDER BY class) as row_id, class, confusion_arr FROM ( SELECT class, array_agg(cnt ORDER BY pred) AS confusion_arr FROM ( SELECT obs as class, pred, sum(cnt) AS cnt FROM ( SELECT {obs_col} AS obs, {pred_col} AS pred, count(*) AS cnt FROM {table_in} GROUP BY obs, pred UNION -- create 0 entries in matrix as defaults if all combinations -- are not available in the input. SELECT r.a, s.a, 0 FROM all_levels r, all_levels s ) x GROUP BY class, pred ) y GROUP BY class ) z """.format(**locals()) plpy.execute(sql_st) def confusion_matrix_help(schema_madlib, message, **kwargs): if not message: help_string = """ ------------------------------------------------------------ SUMMARY ------------------------------------------------------------ Functionality: Confusion matrix for multi-class classifier This function returns the confusion matrix of a multi-class classification. Each column of the matrix represents the instances in a predicted class while each row represents the instances in an actual class. This allows more detailed analysis than mere proportion of correct guesses (accuracy). """ elif message.lower().strip() in ['usage', 'help', '?']: help_string = """ ------------------------------------------------------------ USAGE ------------------------------------------------------------ SELECT {schema_madlib}.confusion_matrix( 'table_in', -- Name of the input table 'table_out', -- Table name to store the metric results 'pred_col', -- Column name containing prediction results 'obs_col' -- Column name containing observed (actual) values ); """ else: help_string = "No such option. Use {schema_madlib}.confusion_matrix('usage')" return help_string.format(**locals())