# coding=utf-8 """ @file control.py_in @brief controller classes (e.g. iteration controller) @namespace utilities @brief driver functions shared by modules """ import plpy from control import MinWarning from utilities import __mad_version from utilities import unique_string _unique_string = unique_string version_wrapper = __mad_version() mad_vec = version_wrapper.select_vecfunc() m4_changequote(`') class GroupIterationController: """ @brief Abstraction for implementing driver functions in PL/Python This class encapsulates handling of the inter-iteration state. The design goal is to avoid any conversion between backend-native types and those of procedureal languages like PL/Python. Therefore, the expectation is that *** all "template" parameters are passed as PL/Python arguments ***, whereas non-template arguments are provided in an argument table. Here, "template" arguments are those parameters that cannot be SQL parameters, *** such as table and column names ***. This class assumes a transition state always has its status indicator in the last element. 0 means in progress, 1 means completed, > 1 means abnormal. Perhaps a C++ UDF should be added for extracting the status. The inter-state iteration table contains three columns: - _grouping_cols - List of columns that are provided as grouping arguments - _iteration INTEGER - The 0-based iteration number - _state self.kwargs.stateType - The state (after iteration \c _interation) """ def __init__(self, rel_args, rel_state, stateType, temporaryTables=True, schema_madlib="MADLIB_SCHEMA_MISSING", verbose=False, grouping_str="NULL", col_grp_iteration="_iteration", col_grp_state="_state", **kwargs): self.temporaryTables = temporaryTables self.verbose = verbose self.inWith = False self.iteration = -1 self.grouping_str = grouping_str self.kwargs = kwargs self.kwargs.update( unqualified_rel_state=rel_state, rel_args=('pg_temp.' if temporaryTables else '') + rel_args, rel_state=('pg_temp.' if temporaryTables else '') + rel_state, stateType=stateType.format(schema_madlib=schema_madlib), schema_madlib=schema_madlib, grouping_str=self.grouping_str, col_grp_null=_unique_string(), col_grp_key=_unique_string(), col_grp_iteration=col_grp_iteration, col_grp_state=col_grp_state ) grouping_col = "Null" if kwargs["grouping_col"] is None else kwargs["grouping_col"] using_str = "on True" if kwargs["grouping_col"] is None else "using ({grouping_col})".format(**kwargs) self.is_group_null = True if kwargs["grouping_col"] is None else False self.kwargs["grouping_col"] = grouping_col self.kwargs["using_str"] = using_str self.grouping_col = grouping_col self.hawq = m4_ifdef(, , ) if self.hawq: self.new_states = [] self.old_states = [] self.finished_states = [] def __enter__(self): with MinWarning('warning'): # currently assuming that groups is passed as a valid array group_col = ("NULL::integer as {col_grp_null}" if self.is_group_null else "{grouping_col}").format(**self.kwargs) groupby_str = ("{col_grp_null}" if self.is_group_null else "{grouping_col}").format(**self.kwargs) primary_str = "" if self.is_group_null else ", {grouping_col}".format(**self.kwargs) self.runSQL( """ drop table if exists {rel_state}; create {temp} table {unqualified_rel_state} as ( select {group_col}, 0::integer as {col_grp_iteration}, Null::{stateType} as {col_grp_state} m4_ifdef(, ) from {rel_source} group by {groupby_str} ); m4_ifdef(, , ) """.format(group_col=group_col, groupby_str=groupby_str, primary_str=primary_str, temp='TEMPORARY' if self.temporaryTables else '', **self.kwargs)) null_test = " or ".join([g.strip() + " is NULL" for g in self.kwargs['grouping_col'].split(",")]) null_count = plpy.execute( """ select count(*) from {rel_state} where {null_test} """.format(null_test=null_test, **self.kwargs))[0]['count'] if null_count != 0 and primary_str: plpy.error("Grouping error: at least one of the grouping columns contains NULL values!" " Please filter out those NULL values.") if not self.hawq: self.runSQL("alter table {rel_state} add primary key " "({col_grp_iteration} {primary_str})". format(primary_str=primary_str, **self.kwargs)) else: self.new_states = plpy.execute("SELECT * FROM {rel_state}".format(**self.kwargs)) if not self.is_group_null: converted_source_table = _unique_string() plpy.execute(""" CREATE TEMP TABLE {converted_source_table} AS SELECT *, array_to_string(ARRAY[{grouping_str}], ',') AS {col_grp_key} FROM {rel_source} """.format(converted_source_table=converted_source_table, **self.kwargs)) self.kwargs.update(rel_source=converted_source_table) self.inWith = True return self def __exit__(self, type, value, tb): self.inWith = False if self.hawq: insert_sql = """ INSERT INTO {rel_state} SELECT {grouping_col}, _rel_state.iteration, _rel_state.state, _rel_state.grp_key FROM ( SELECT {grouping_col}, {col_grp_key} FROM {rel_state} WHERE {col_grp_iteration} = 0 ) AS _src, ( SELECT grp_key, iteration, state FROM {schema_madlib}._gen_state($1, $2, $3) ) AS _rel_state WHERE _src.{col_grp_key} = _rel_state.grp_key """.format(**self.kwargs) insert_plan = plpy.prepare(insert_sql, ["text[]", "integer[]", "double precision[]"]) grp_keys = [] states = [] iterations = [] null_empty = [] for state in self.finished_states: # the length of states for null_empty groups are different if state["{col_grp_state}".format(**self.kwargs)][-1] == 3: null_empty.append(state) continue iterations.append(state["{col_grp_iteration}".format(**self.kwargs)]) grp_keys.append(state["{col_grp_key}".format(**self.kwargs)]) states.extend(state["{col_grp_state}".format(**self.kwargs)]) plpy.execute(insert_plan, [grp_keys, iterations, states]) grp_keys = [] states = [] iterations = [] for state in null_empty: iterations.append(state["{col_grp_iteration}".format(**self.kwargs)]) grp_keys.append(state["{col_grp_key}".format(**self.kwargs)]) states.extend(state["{col_grp_state}".format(**self.kwargs)]) insert_plan = plpy.prepare(insert_sql, ["text[]", "integer[]", "double precision[]"]) plpy.execute(insert_plan, [grp_keys, iterations, states]) if not self.is_group_null: plpy.execute('DROP TABLE IF EXISTS ' + self.kwargs['rel_source']) def runSQL(self, sql): if self.verbose: plpy.notice(sql) return plpy.execute(sql) def evaluate(self, expression): """ Evaluate the given expression. The expression may depend on the current inter-iteration state and all arguments @param expression SQL expression. The following names are defined and can be used in the condition: - \c _args - The (single-row) argument table - \c _state - The row of the state table containing the latest inter-iteration state @return None if \c expression evaluates to NULL, otherwise the value of \c expression """ if self.hawq: eva_plan = plpy.prepare(""" SELECT ({expression}) AS _expression, subq1.grp_key AS _groups FROM {{rel_args}} AS _args left outer join ( ( SELECT grp_key, state AS _state_previous FROM {schema_madlib}._gen_state($1, NULL, $2) ) sub1 JOIN ( SELECT grp_key, state AS _state_current FROM {schema_madlib}._gen_state($3, NULL, $4) ) sub2 using (grp_key) ) AS subq1 ON True """.format(expression=expression, **self.kwargs). format(iteration=self.iteration, **self.kwargs), ["text[]", "double precision[]", "text[]", "double precision[]"]) grp_keys_new = [] states_new = [] for state in self.new_states: if state["{col_grp_state}".format(**self.kwargs)][-1] < 2: grp_keys_new.append(state["{col_grp_key}".format(**self.kwargs)]) states_new.extend(state["{col_grp_state}".format(**self.kwargs)]) grp_keys_old = [] states_old = [] for state in self.old_states: grp_keys_old.append(state["{col_grp_key}".format(**self.kwargs)]) if state["{col_grp_state}".format(**self.kwargs)] is not None: states_old.extend(state["{col_grp_state}".format(**self.kwargs)]) resultObject = plpy.execute(eva_plan, [grp_keys_old, states_old, grp_keys_new, states_new]) else: resultObject = self.runSQL(""" SELECT ({expression}) AS _expression, ARRAY[{{grouping_str}}] AS _groups FROM {{rel_args}} AS _args left outer join ( ( SELECT {{grouping_col}}, {col_grp_state} AS _state_previous FROM {{rel_state}} WHERE {col_grp_iteration} = {{iteration}} - 1 ) sub1 JOIN ( SELECT {{grouping_col}}, {col_grp_state} AS _state_current FROM {{rel_state}} WHERE {col_grp_iteration} = {{iteration}} ) sub2 {using_str} ) AS subq1 ON True """.format(expression=expression, **self.kwargs). format(iteration=self.iteration, **self.kwargs)) if resultObject.nrows() == 0: return None else: complete_grps = [] for each_elem in resultObject: if not self.hawq: # update status for each group group_vector = mad_vec(each_elem["_groups"]) groups_as_str = [None] * len(group_vector) # convert group values to string objects for index, each_grp in enumerate(group_vector): if not each_grp or each_grp.lower() == 'null': # NULL values should be outputed as NULL instead of # as a string 'NULL' groups_as_str[index] = "NULL::text" else: groups_as_str[index] = "'" + str(each_grp) + "'::text" array_str = "array[" + ",".join(groups_as_str) + "]" # update status for the group if it completed iterating if each_elem['_expression']: if self.hawq: complete_grps.append(each_elem["_groups"]) else: self.runSQL(""" UPDATE {rel_state} set {col_grp_state}[array_upper({col_grp_state}, 1)] = 1 WHERE ARRAY[{grouping_str}] = {_group_val} and {col_grp_state}[array_upper({col_grp_state}, 1)] < 2 and {col_grp_iteration} = {iteration} """.format( _group_val=array_str, iteration=self.iteration, **self.kwargs)) if self.hawq: all_finish = True tmp = [] for state in self.new_states: if state["{col_grp_key}".format(**self.kwargs)] in complete_grps \ and state["{col_grp_state}".format(**self.kwargs)][-1] < 1: state["{col_grp_state}".format(**self.kwargs)][-1] = 1 if state["{col_grp_state}".format(**self.kwargs)][-1] < 1: all_finish = False tmp.append(state) else: self.finished_states.append(state) self.new_states = tmp return all_finish else: # return True only if all group combinations have finished iterating rv = self.runSQL( """ select bool_and({col_grp_state}[array_upper({col_grp_state}, 1)]::integer::boolean) as rst from {rel_state} as _state_table where _state_table.{col_grp_iteration} = {iteration} """.format( iteration=self.iteration, **self.kwargs))[0]["rst"] return rv def test(self, condition): """ Test if the given condition is satisfied. The condition may depend on the current inter-iteration state and all arguments @param condition Boolean SQL expression. The following names are defined and can be used in the condition: - \c _args - The (single-row) argument table - \c _state - The row of the state table containing the latest inter-iteration state @return None if \c condition evaluates to NULL, otherwise the Boolean value of \c condition """ return self.evaluate("CAST(({0}) AS BOOLEAN)".format(condition)) def update(self, newState, **updateKwargs): """ Update the inter-iteration state @param newState SQL expression of type stateType.kwargs.stateType. The following names are defined and can be used in the condition: - \c _args - The (single-row) argument table - \c _state - The row of the state table containing the latest inter-iteration state . Note that {iteration} will still be the current iteration. For instance, it could be used in the expression as a WHERE condition: [...] WHERE _state._iteration = {iteration} This updates the current inter-iteration state to the result of evaluating \c newState. If self.truncAfterIteration is true, this will replace the old state, otherwise the history of all old states is kept. """ newState = newState.format(**self.kwargs) self.iteration = self.iteration + 1 groupby_str = "" if self.is_group_null \ else "group by {grouping_col}, {col_grp_key}".format(**self.kwargs) if self.hawq: groupby_str = "" if self.is_group_null \ else "group by {grouping_col}, {col_grp_key}".format(**self.kwargs) update_plan = plpy.prepare( """ select {_grp_key} AS {col_grp_key}, {grouping_col} {as_string}, {iteration} AS {col_grp_iteration}, ({newState}) AS {col_grp_state} from {rel_source} AS _src, ( SELECT grp_key, state AS {col_grp_state} FROM {schema_madlib}._gen_state($1, NULL, $2) ) AS rel_state where {_grp_key} = grp_key {groupby_str} """.format( iteration=self.iteration, groupby_str=groupby_str, as_string='AS _grp' if self.is_group_null else '', _grp_key="array_to_string(ARRAY[{grouping_str}], ',')".format(**self.kwargs) if self.is_group_null else self.kwargs['col_grp_key'], newState=newState, **self.kwargs), ["text[]", "double precision[]"]) grp_keys = [] states = [] for state in self.new_states: grp_keys.append(state["{col_grp_key}".format(**self.kwargs)]) if state["{col_grp_state}".format(**self.kwargs)] is not None: states.extend(state["{col_grp_state}".format(**self.kwargs)]) self.old_states = self.new_states self.new_states = plpy.execute(update_plan, [grp_keys, states]) else: groupby_str = "" if self.is_group_null \ else "group by {grouping_col}".format(**self.kwargs) self.runSQL( """ insert into {rel_state} (select {grouping_col}, {iteration}, ({newState}) from ({rel_source} AS _src join {rel_state} AS rel_state {using_str}) where rel_state.{col_grp_iteration} = {iteration} - 1 and (case when {iteration} = 1 then True else rel_state.{col_grp_state}[array_upper(rel_state.{col_grp_state}, 1)] = 0 end) {groupby_str}) """.format( groupby_str=groupby_str, iteration=self.iteration, newState=newState, **self.kwargs)) m4_changequote(, )