# coding=utf-8 """ @file assoc_rules.py_in @brief Association Rules - Apriori Algorithm Implementation. @namespace assoc_rules """ import time import plpy from utilities.validate_args import columns_exist_in_table from utilities.validate_args import table_exists """ @brief if the given condition is false, then raise an error with the message @param condition the condition to be assert @param msg the error message to be reported """ def __assert(condition, msg): if not condition : plpy.error(msg); """ @brief test if the 1st value is greater than the 2nd value @param val1 a float number @param val2 a float number @return true if the 1st value greater than the 2nd value. Otherwise return false """ def __float_gt(val1, val2): return val1 - val2 > 1E-14; """ @brief test if the 1st value is less than or equal to the 2nd value @param val1 a float number @param val2 a float number @return true if the 1st value is less than or equal to the 2nd value. Otherwise return false """ def __float_le(val1, val2): return (val1 - val2) < -1E-14 or abs(val1 - val2) < 1E-14; """ @brief The entry function for the Apriori. @param support minimum level of support needed for each itemset to be included in result @param confidence minimum level of confidence needed for each rule to be included in result @param tid_col name of the column storing the transaction ids @param item_col name of the column storing the products @param input_table name of the table where the data is stored @param output_schema name of the schema where the final results will be stored @param verbose determining if output contains comments @param max_itemset_size determines the maximum size of frequent itemsets allowed to generate association rules from """ def assoc_rules(madlib_schema, support, confidence, tid_col, item_col, input_table, output_schema, verbose, max_itemset_size): begin_func_exec = time.time(); begin_step_exec = time.time(); cal_itemsets_time = 0; if max_itemset_size is None: max_itemset_size = float('inf') elif max_itemset_size <= 1: plpy.error("ERROR: max_itemset_size has to be greater than 1.") #check parameters __assert( support is not None and confidence is not None and tid_col is not None and item_col is not None and input_table is not None and verbose is not None, "none of the parameters except the output_schema should be null" ); __assert( __float_gt(support, 0) and __float_le(support, 1), "the parameter support must be in range of (0, 1]" ); __assert( __float_gt(confidence, 0) and __float_le(confidence, 1), "the parameter confidence must be in range of (0, 1]" ); __assert(len(input_table) > 0 and table_exists(input_table, madlib_schema) and len(tid_col) > 0 and len(item_col) > 0 and columns_exist_in_table(input_table, [tid_col, item_col], madlib_schema), "the input table must exist and the tid_col and " "item_col must be in the input table") if output_schema is None : rv = plpy.execute("SELECT current_schema() as s"); output_schema = rv[0]["s"]; else : rv = plpy.execute(""" SELECT count(*) as cnt FROM pg_namespace where quote_ident(nspname) = '{0}' """.format(output_schema) ); __assert(rv[0]["cnt"] == 1, "the output schema does not exist"); if verbose: plpy.info("finished checking parameters"); # create the result table for keeping the generated rules plpy.execute("DROP TABLE IF EXISTS {0}.assoc_rules".format(output_schema)); plpy.execute(""" CREATE TABLE {0}.assoc_rules ( ruleId INT, pre TEXT[], post TEXT[], count INT, support FLOAT8, confidence FLOAT8, lift FLOAT8, conviction FLOAT8 ) m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (ruleId)')""".format(output_schema) ); # the auxiliary table for keeping the association rules plpy.execute("DROP TABLE IF EXISTS assoc_rules_aux_tmp"); plpy.execute(""" CREATE TEMP TABLE assoc_rules_aux_tmp ( ruleId SERIAL, pre TEXT, post TEXT, support FLOAT8, confidence FLOAT8, lift FLOAT8, conviction FLOAT8 ) m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (ruleId)')"""); # if the tid in the input table doesn't start with 1 and the IDs are not # continuous, then we will make the IDs start with 1 and continuous. # note: duplicated records will be removed. plpy.execute("DROP TABLE IF EXISTS assoc_input_unique"); rv = plpy.execute(""" SELECT max({0}) as x, min({1}) as i, count({2}) as c FROM {3}""".format(tid_col, tid_col, tid_col, input_table)); if rv[0]["i"] != 1 or rv[0]["x"] - rv[0]["i"] != rv[0]["c"] : plpy.execute(""" CREATE TEMP TABLE assoc_input_unique AS SELECT dense_rank() OVER (ORDER BY tid)::BIGINT as tid, item FROM ( SELECT {0} as tid, {1}::text as item FROM {2} WHERE {3} IS NOT NULL AND {4} IS NOT NULL GROUP BY 1,2 ) t m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (tid)') """.format(tid_col, item_col, input_table, tid_col, item_col) ); if verbose : plpy.info("finished removing duplicates"); rv = plpy.execute(""" SELECT count(DISTINCT tid) as c1, count(DISTINCT item) as c2 FROM assoc_input_unique """); num_tranx = rv[0]["c1"]; num_prod = rv[0]["c2"]; min_supp_tranx = float(num_tranx) * support; # get the items whose counts are greater than the given # support counts. Each item will be given a continuous number. plpy.execute("DROP TABLE IF EXISTS assoc_item_uniq"); plpy.execute(""" CREATE TEMP TABLE assoc_item_uniq (item_id, item_text, cnt) AS SELECT row_number() OVER (ORDER BY cnt DESC)::BIGINT, item::TEXT, cnt::FLOAT8 FROM ( SELECT item AS item, count(tid) as cnt FROM assoc_input_unique GROUP BY item ) t WHERE cnt::FLOAT8 >= {0} m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (item_id)') """.format(min_supp_tranx)); if verbose : plpy.info("finished encoding items"); rv = plpy.execute("SELECT count(item_id) as c FROM assoc_item_uniq"); num_supp_prod = rv[0]["c"]; plpy.execute("DROP TABLE IF EXISTS assoc_enc_input"); plpy.execute(""" CREATE TEMP TABLE assoc_enc_input (tid, item, cnt) AS SELECT t1.tid, t2.item_id, t2.cnt FROM assoc_input_unique t1, assoc_item_uniq t2 WHERE t1.item = t2.item_text m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (tid)') """); if verbose : plpy.info("finished encoding input table: {0}".format( time.time() - begin_step_exec)); begin_step_exec = time.time(); if verbose: plpy.info("Beginning iteration #1"); begin_step_exec = time.time(); # this table keeps the patterns in each iteration. plpy.execute("DROP TABLE IF EXISTS assoc_rule_sets"); plpy.execute(""" CREATE TEMP TABLE assoc_rule_sets ( text_svec TEXT, set_list {0}.svec, support FLOAT8, iteration INT ) m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (text_svec)') """.format(madlib_schema)); # this table keeps the patterns in the current iteration. plpy.execute("DROP TABLE IF EXISTS assoc_rule_sets_loop"); plpy.execute(""" CREATE TEMP TABLE assoc_rule_sets_loop ( id BIGINT, set_list {0}.svec, support FLOAT8, tids {1}.svec ) m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (id)') """.format(madlib_schema, madlib_schema)); plpy.execute(""" INSERT INTO assoc_rule_sets_loop (id, set_list, support, tids) SELECT t.item, {0}.svec_cast_positions_float8arr (ARRAY[t.item], ARRAY[1], {1}, 0), t.cnt::FLOAT8 / {2}, {3}.svec_cast_positions_float8arr (array_agg(t.tid), array_agg(1), {4}, 0) FROM assoc_enc_input t GROUP BY t.item, t.cnt """.format(madlib_schema, num_supp_prod, num_tranx, madlib_schema, num_tranx) ); rv = plpy.execute(""" SELECT count(id) as c1, max(id) as c2 FROM assoc_rule_sets_loop """); num_item_loop = rv[0]["c1"]; max_item_loop = rv[0]["c2"]; __assert( (num_item_loop == max_item_loop) or (num_item_loop == 0 and max_item_loop is None), "internal error: num_item_loop must be equal to max_item_loop" ); # to avoid the self cross join of the table assoc_rule_sets_loop, # we use the following table to generate the join relationship # so that we can use inner join (hash join) plpy.execute("DROP TABLE IF EXISTS rule_set_rel"); plpy.execute(""" CREATE TEMP TABLE rule_set_rel ( sid BIGINT, did BIGINT ) m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (sid)') """); # As two different svecs may have the same hash key, # we use this table to assign a unique ID for each svec. plpy.execute("DROP TABLE IF EXISTS assoc_loop_aux"); plpy.execute(""" CREATE TEMP TABLE assoc_loop_aux ( id BIGSERIAL, set_list {0}.svec, support FLOAT8, tids {1}.svec ) m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (id)') """.format(madlib_schema, madlib_schema)); if verbose : plpy.info("{0} Frequent itemsets found in this iteration".format( num_supp_prod)); plpy.info("Completed iteration # 1. Time: {0}".format( time.time() - begin_step_exec)); iter = 0; while num_item_loop > 0 and iter < max_itemset_size: begin_step_exec = time.time(); iter = iter + 1; if verbose : plpy.info("Beginning iteration # {0}".format(iter + 1)); plpy.execute("TRUNCATE TABLE rule_set_rel"); plpy.execute(""" INSERT INTO rule_set_rel(sid, did) SELECT t1.id, generate_series(t1.id + 1, {0}) FROM assoc_rule_sets_loop t1 """.format(num_item_loop)); plpy.execute(""" INSERT INTO assoc_rule_sets (text_svec, set_list, support, iteration) SELECT array_to_string( {0}.svec_nonbase_positions(set_list, 0), ','), set_list, support, {1} FROM assoc_rule_sets_loop""".format(madlib_schema, iter) ); if verbose : plpy.info("time of preparing data: {0}".format( time.time() - begin_step_exec)); # generate the patterns for the next iteration plpy.execute("ALTER SEQUENCE assoc_loop_aux_id_seq RESTART WITH 1"); plpy.execute("TRUNCATE TABLE assoc_loop_aux"); plpy.execute(""" INSERT INTO assoc_loop_aux(set_list, support, tids) SELECT DISTINCT ON({0}.svec_to_string(set_list)) set_list, {1}.svec_l1norm(tids)::FLOAT8 / {2}, tids FROM ( SELECT {3}.svec_minus( {4}.svec_plus(t1.set_list, t3.set_list), {5}.svec_mult(t1.set_list, t3.set_list) ) as set_list, {6}.svec_mult(t1.tids, t3.tids) as tids FROM assoc_rule_sets_loop t1, rule_set_rel t2, assoc_rule_sets_loop t3 WHERE t1.id = t2.sid and t2.did = t3.id ) t WHERE {7}.svec_l1norm(set_list)::INT = {8} AND {9}.svec_l1norm(tids)::FLOAT8 >= {10} """.format(madlib_schema, madlib_schema, num_tranx, madlib_schema, madlib_schema, madlib_schema, madlib_schema, madlib_schema, iter + 1, madlib_schema, min_supp_tranx) ); plpy.execute("TRUNCATE TABLE assoc_rule_sets_loop"); plpy.execute(""" INSERT INTO assoc_rule_sets_loop(id, set_list, support, tids) SELECT * FROM assoc_loop_aux """ ); rv = plpy.execute(""" SELECT count(id) as c1, max(id) as c2 FROM assoc_rule_sets_loop """); num_item_loop = rv[0]["c1"]; max_item_loop = rv[0]["c2"]; __assert( (num_item_loop == max_item_loop) or (num_item_loop == 0 and max_item_loop is None), "internal error: num_item_loop must be equal to max_item_loop" ); if verbose : plpy.info("{0} Frequent itemsets found in this iteration".format( num_item_loop)); plpy.info("Completed iteration # {0}. Time: {1}".format( iter + 1, time.time() - begin_step_exec)); cal_itemsets_time = cal_itemsets_time + (time.time() - begin_step_exec); if (num_item_loop == 0) and (iter < 2) and verbose : plpy.info("No association rules found that meet given criteria"); plpy.info("finished itemsets finding. Time: {0}".format( cal_itemsets_time)); total_rules = 0; else : if (verbose) : plpy.info("begin to generate the final rules"); begin_step_exec = time.time(); # generate all the final rules plpy.execute(""" INSERT INTO assoc_rules_aux_tmp (pre, post, support, confidence, lift, conviction) SELECT t.item[1], t.item[2], t.support_xy, t.support_xy / x.support, t.support_xy / (x.support * y.support), CASE WHEN abs(t.support_xy / x.support - 1) < 1.0E-10 THEN 0 ELSE (1 - y.support) / (1 - (t.support_xy / x.support)) END FROM ( SELECT {0}.gen_rules_from_cfp(text_svec, iteration) as item, support as support_xy FROM assoc_rule_sets WHERE iteration > 1 ) t, assoc_rule_sets x, assoc_rule_sets y WHERE t.item[1] = x.text_svec AND t.item[2] = y.text_svec AND (t.support_xy / x.support) >= {1} """.format(madlib_schema, confidence) ); # generate the readable rules plpy.execute("DROP TABLE IF EXISTS pre_tmp_table"); plpy.execute(""" CREATE TEMP TABLE pre_tmp_table AS SELECT ruleId, array_agg(item_text) as pre FROM ( SELECT ruleId, unnest(string_to_array(pre, ','))::BIGINT as pre_id FROM assoc_rules_aux_tmp ) s1, assoc_item_uniq s2 WHERE s1.pre_id = s2.item_id GROUP BY ruleId m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (ruleId)') """); plpy.execute("DROP TABLE IF EXISTS post_tmp_table"); plpy.execute(""" CREATE TEMP TABLE post_tmp_table AS SELECT ruleId, array_agg(item_text) as post FROM ( SELECT ruleId, unnest(string_to_array(post, ','))::BIGINT as post_id FROM assoc_rules_aux_tmp ) s1, assoc_item_uniq s2 WHERE s1.post_id = s2.item_id GROUP BY ruleId m4_ifdef(`__POSTGRESQL__', `', `DISTRIBUTED BY (ruleId)') """); plpy.execute(""" INSERT INTO {0}.assoc_rules SELECT t1.ruleId, t2.pre, t3.post, t1.support*{1}::INT AS count, support, confidence, lift, conviction FROM assoc_rules_aux_tmp t1, pre_tmp_table t2, post_tmp_table t3 WHERE t1.ruleId = t2.ruleId AND t1.ruleId = t3.ruleId """.format(output_schema, num_tranx) ); # if in verbose mode, we will keep all the intermediate tables if not verbose : plpy.execute(""" DROP TABLE IF EXISTS post_tmp_table; DROP TABLE IF EXISTS pre_tmp_table; DROP TABLE IF EXISTS assoc_rules_aux_tmp; DROP TABLE IF EXISTS assoc_input_unique; DROP TABLE IF EXISTS assoc_item_uniq; DROP TABLE IF EXISTS assoc_item_svec; DROP TABLE IF EXISTS assoc_enc_input; DROP TABLE IF EXISTS assoc_rule_sets; DROP TABLE IF EXISTS assoc_rule_sets_loop; DROP TABLE IF EXISTS rule_set_rel; DROP TABLE IF EXISTS assoc_loop_aux; """); if verbose : rv = plpy.execute(""" SELECT count(*) as c FROM {0}.assoc_rules """.format(output_schema)); num_item_loop = rv[0]["c"]; plpy.info("{0} Total association rules found. Time: {1}".format( num_item_loop, time.time() - begin_step_exec)); rv = plpy.execute(""" SELECT count(*) as c FROM {0}.assoc_rules """.format(output_schema)); total_rules = rv[0]["c"]; return ( output_schema, 'assoc_rules', total_rules, time.time() - begin_func_exec ); def assoc_rules_help_message(schema_madlib, message=None, **kwargs): """ Given a help string, provide usage information Args: @param schema_madlib Name of the MADlib schema @param message Helper message to print Returns: None """ if message is not None and \ message.lower() in ("usage", "help", "?"): return """ ----------------------------------------------------------------------- USAGE ----------------------------------------------------------------------- SELECT {schema_madlib}.assoc_rules( support, -- FLOAT8, minimum level of support needed for each itemset to be included in result confidence, -- FLOAT8, minimum level of confidence needed for each rule to be included in result tid_col, -- TEXT, name of the column storing the transaction ids item_col, -- TEXT, name of the column storing the products input_table, -- TEXT, name of the table containing the input data output_schema, -- TEXT, name of the schema where the final results will be stored. The schema must be created before calling the function. Alternatively, use NULL to output to the current schema. verbose, -- BOOLEAN, (optional, default: False) determines if details are printed for each iteration as the algorithm progresses max_itemset_size -- INTEGER, (optional, default: itemsets of all sizes) determines the maximum size of frequent itemsets allowed that are used for generating association rules. Value less than 2 throws an error. ); ------------------------------------------------------------------------- OUTPUT TABLES ------------------------------------------------------------------------- The output table "assoc_rules" in the "output_schema" contains a unique rule of the form "If X, then Y (i.e., X => Y)" in each row. X and Y are non-empty itemsets, called the antecedent and consequent, or the left-hand-side (LHS) and right-hand-side (LHS), of the rule respectively. in each row, with the following columns: ruleid, -- INTEGER, row number pre, -- TEXT, specifies the antecedent, or the LHS of the rule post, -- DOUBLE, specifies the consequent, or the RHS of the rule support, -- DOUBLE, support of the frequent itemset X,Y count, -- INTEGER, number of transactions in the input table that contain X,Y confidence, -- DOUBLE, the ratio of number of transactions that contain X,Y to the number of transactions that contain X lift, -- DOUBLE, the ratio of observed support of X,Y to the expected support of X,Y, assuming X and Y are independent. conviction -- DOUBLE, the ratio of expected support of X occurring without Y assuming X and Y are independent, to the observed support of X occuring without Y """.format(schema_madlib=schema_madlib) else: if message.lower() in ("example", "examples"): return """ ------------------------------------------------------------------------ EXAMPLES ------------------------------------------------------------------------ DROP TABLE IF EXISTS test_data; CREATE TABLE test_data ( trans_id INT, product TEXT ); INSERT INTO test_data VALUES (1, 'beer'); INSERT INTO test_data VALUES (1, 'diapers'); INSERT INTO test_data VALUES (1, 'chips'); INSERT INTO test_data VALUES (2, 'beer'); INSERT INTO test_data VALUES (2, 'diapers'); INSERT INTO test_data VALUES (3, 'beer'); INSERT INTO test_data VALUES (3, 'diapers'); INSERT INTO test_data VALUES (4, 'beer'); INSERT INTO test_data VALUES (4, 'chips'); INSERT INTO test_data VALUES (5, 'beer'); INSERT INTO test_data VALUES (6, 'beer'); INSERT INTO test_data VALUES (6, 'diapers'); INSERT INTO test_data VALUES (6, 'chips'); INSERT INTO test_data VALUES (7, 'beer'); INSERT INTO test_data VALUES (7, 'diapers'); Find all association rules with a support and threshold value of at least 0.25 and 0.5 respectively. SELECT * FROM {schema_madlib}.assoc_rules( .25, .5, 'trans_id', 'product', 'test_data', NULL, TRUE ); View output results: SELECT * FROM assoc_rules; Find association rules generated from itemsets of size at most 2, and a support and threshold value of at least 0.25 and 0.5 respectively. SELECT * FROM {schema_madlib}.assoc_rules( .25, .5, 'trans_id', 'product', 'test_data', NULL, TRUE, 2 ); View output results: SELECT * FROM assoc_rules; """.format(schema_madlib=schema_madlib) else: return """ For an overview on usage, run: SELECT {schema_madlib}.assoc_rules('usage'); For an example of using assoc_rules, run: SELECT {schema_madlib}.assoc_rules('example'); """.format(schema_madlib=schema_madlib)