# coding=utf-8 # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # PageRank # Please refer to the pagerank.sql_in file for the documentation """ @file pagerank.py_in @namespace graph """ import plpy from graph_utils import get_graph_usage from graph_utils import get_default_threshold_for_link_analysis from graph_utils import update_output_grouping_tables_for_link_analysis from graph_utils import validate_graph_coding from graph_utils import validate_output_and_summary_tables from graph_utils import validate_params_for_link_analysis from utilities.control import MinWarning from utilities.control import OptimizerControl from utilities.utilities import _assert from utilities.utilities import _check_groups from utilities.utilities import get_filtered_cols_subquery_str from utilities.utilities import get_table_qualified_col_str from utilities.utilities import add_postfix from utilities.utilities import extract_keyvalue_params from utilities.utilities import unique_string, split_quoted_delimited_str from utilities.utilities import is_platform_pg from utilities.utilities import py_list_to_sql_string from utilities.validate_args import columns_exist_in_table, get_cols_and_types from utilities.validate_args import table_exists def validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table, edge_params, out_table, damping_factor, max_iter, threshold, grouping_cols_list, personalization_vertices): """ Function to validate input parameters for PageRank """ validate_graph_coding(vertex_table, vertex_id, edge_table, edge_params, out_table, 'PageRank') # Validate args such as threshold and max_iter validate_params_for_link_analysis(schema_madlib, "PageRank", threshold, max_iter, edge_table, grouping_cols_list) _assert(damping_factor >= 0.0 and damping_factor <= 1.0, "PageRank: Invalid damping factor value ({0}), must be between 0 and 1.". format(damping_factor)) # Validate against the given set of nodes for Personalized Page Rank if personalization_vertices: grouping_cols = get_table_qualified_col_str( edge_table, grouping_cols_list) group_by_clause = "GROUP BY {0}".format(grouping_cols) \ if grouping_cols_list else '' src = edge_params["src"] dest = edge_params["dest"] # Get a list which has the number of personalization nodes of each group vertices_count_list_by_group = plpy.execute(""" SELECT count(distinct {vertex_id}) AS count FROM {vertex_table} RIGHT JOIN {edge_table} ON ({vertex_table}.{vertex_id} = {edge_table}.{src} OR {vertex_table}.{vertex_id} = {edge_table}.{dest}) AND {vertex_table}.{vertex_id} = ANY(ARRAY{personalization_vertices}) {group_by_clause} """.format(**locals())) input_personalization_vertices_length = len(personalization_vertices) # The number of personalization nodes for every group should be equal to # the number given by input personalization_vertices list. Otherwise, # some nodes are missing for certain group. Or there might be duplicate # nodes in input personalization_vertices list. Or there are some # invalid nodes in input list that don't exist in vertex table. In any # case, throw an error. for key in vertices_count_list_by_group: if key["count"] != input_personalization_vertices_length: plpy.error("Personalization nodes must be a subset " "of the vertex_table without duplicates and " "every nodes should be present in all the groups") def pagerank(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_table, damping_factor, max_iter, threshold, grouping_cols, personalization_vertices, **kwargs): """ Function that computes the PageRank Args: @param vertex_table @param vertex_id @param edge_table @param source_vertex @param dest_vertex @param out_table @param damping_factor @param max_iter @param threshold @param personalization_vertices """ # We noticed the current implementation is noticeably slower when orca=ON # on Greenplum 5, so that we temporarily turn if off to run pagerank. with OptimizerControl(False): with MinWarning('warning'): params_types = {'src': str, 'dest': str} default_args = {'src': 'src', 'dest': 'dest'} edge_params = extract_keyvalue_params( edge_args, params_types, default_args) # populate default values for optional params if null if damping_factor is None: damping_factor = 0.85 if max_iter is None: max_iter = 100 if not vertex_id: vertex_id = "id" if not grouping_cols: grouping_cols = '' grouping_cols_list = split_quoted_delimited_str(grouping_cols) validate_pagerank_args(schema_madlib, vertex_table, vertex_id, edge_table, edge_params, out_table, damping_factor, max_iter, threshold, grouping_cols_list, personalization_vertices) summary_table = add_postfix(out_table, "_summary") _assert(not table_exists(summary_table), "Graph PageRank: Output summary table ({summary_table}) already exists." .format(**locals())) src = edge_params["src"] dest = edge_params["dest"] n_vertices = plpy.execute(""" SELECT COUNT({0}) AS cnt FROM {1} """.format(vertex_id, vertex_table))[0]["cnt"] if threshold is None: threshold = get_default_threshold_for_link_analysis(n_vertices) # table/column names used when grouping_cols is set. distinct_grp_table = '' vertices_per_group = '' vpg = '' grouping_where_clause = '' group_by_clause = '' random_prob = '' ppr_join_clause = '' edge_temp_table = unique_string(desp='temp_edge') grouping_cols_comma = grouping_cols + ',' if grouping_cols else '' distribution = ('' if is_platform_pg() else "DISTRIBUTED BY ({0}{1})".format( grouping_cols_comma, dest)) plpy.execute("DROP TABLE IF EXISTS {0}".format(edge_temp_table)) plpy.execute(""" CREATE TEMP TABLE {edge_temp_table} AS SELECT * FROM {edge_table} {distribution} """.format(**locals())) # GPDB and HAWQ have distributed by clauses to help them with indexing. # For Postgres we add the index explicitly. if is_platform_pg(): plpy.execute("CREATE INDEX ON {0}({1})".format( edge_temp_table, src)) # Intermediate tables required. cur = unique_string(desp='cur') message = unique_string(desp='message') cur_unconv = unique_string(desp='cur_unconv') message_unconv = unique_string(desp='message_unconv') out_cnts = unique_string(desp='out_cnts') out_cnts_cnt = unique_string(desp='cnt') v1 = unique_string(desp='v1') if is_platform_pg(): cur_distribution = cnts_distribution = '' else: cur_distribution = cnts_distribution = "DISTRIBUTED BY ({0}{1})".format( grouping_cols_comma, vertex_id) cur_join_clause = """{edge_temp_table}.{dest} = {cur}.{vertex_id} """.format(**locals()) out_cnts_join_clause = """{out_cnts}.{vertex_id} = {edge_temp_table}.{src} """.format(**locals()) v1_join_clause = """{v1}.{vertex_id} = {edge_temp_table}.{src} """.format(**locals()) # Get query params for Personalized Page Rank. ppr_params = '' total_ppr_nodes = 0 random_jump_prob_ppr = 0 ppr_init_value_clause = '' if personalization_vertices: ppr_params = get_query_params_for_ppr(personalization_vertices, damping_factor, vertex_id, edge_temp_table, vertex_table, edge_params) total_ppr_nodes = ppr_params[0] random_jump_prob_ppr = ppr_params[1] ppr_init_value_clause = ppr_params[2] random_probability = (1.0 - damping_factor) / n_vertices if total_ppr_nodes > 0: random_jump_prob = random_jump_prob_ppr else: random_jump_prob = random_probability #################################################################### # Create several strings that will be used to construct required # queries. These strings will be required only during grouping. ignore_group_clause_first = '' limit = ' LIMIT 1 ' grouping_cols_select_pr = '' vertices_per_group_inner_join_pr = '' ignore_group_clause_pr = '' grouping_cols_select_ins = '' vpg_from_clause_ins = '' vpg_where_clause_ins = '' message_grp_where_ins = '' ignore_group_clause_ins = '' nodes_with_no_incoming_edges = unique_string(desp='no_incoming') ignore_group_clause_ins_noincoming = '' grouping_cols_select_conv = '{0}.{1}'.format(cur, vertex_id) group_by_grouping_cols_conv = '' message_grp_clause_conv = '' ignore_group_clause_conv = '' #################################################################### # Queries when groups are involved need a lot more conditions in # various clauses, so populating the required variables. Some intermediate # tables are unnecessary when no grouping is involved, so create some # tables and certain columns only during grouping. if grouping_cols: distinct_grp_table = unique_string(desp='grp') plpy.execute( "DROP TABLE IF EXISTS {0}".format(distinct_grp_table)) plpy.execute("""CREATE TEMP TABLE {distinct_grp_table} AS SELECT DISTINCT {grouping_cols} FROM {edge_temp_table} """.format(**locals())) vertices_per_group = unique_string(desp='nvert_grp') init_pr = unique_string(desp='init') random_prob = unique_string(desp='rand') subq = unique_string(desp='subquery') rand_damp = 1 - damping_factor grouping_where_clause = _check_groups( distinct_grp_table, subq, grouping_cols_list) group_by_clause = get_table_qualified_col_str( distinct_grp_table, grouping_cols_list) # Find number of vertices in each group, this is the normalizing # factor for computing the random_prob where_clause_ppr = '' if personalization_vertices: where_clause_ppr = """ where __vertices__ = ANY(ARRAY{personalization_vertices}) """.format(**locals()) random_prob_grp = 1.0 - damping_factor init_prob_grp = 1.0 / total_ppr_nodes else: random_prob_grp = """ {rand_damp}/COUNT(__vertices__)::DOUBLE PRECISION """.format(**locals()) init_prob_grp = """ 1/COUNT(__vertices__)::DOUBLE PRECISION """.format(**locals()) plpy.execute( "DROP TABLE IF EXISTS {0}".format(vertices_per_group)) plpy.execute("""CREATE TEMP TABLE {vertices_per_group} AS SELECT {distinct_grp_table}.*, {init_prob_grp} AS {init_pr}, {random_prob_grp} as {random_prob} FROM {distinct_grp_table} INNER JOIN ( SELECT {grouping_cols}, {src} AS __vertices__ FROM {edge_temp_table} UNION SELECT {grouping_cols}, {dest} FROM {edge_temp_table} ){subq} ON {grouping_where_clause} {where_clause_ppr} GROUP BY {group_by_clause} """.format(**locals())) grouping_where_clause = _check_groups( vertices_per_group, subq, grouping_cols_list) group_by_clause = get_table_qualified_col_str( vertices_per_group, grouping_cols_list) if personalization_vertices: init_prob_grp_ppr = 1.0 / total_ppr_nodes init_pr = """ CASE when __vertices__ = ANY(ARRAY{personalization_vertices}) THEN {init_prob_grp_ppr} ELSE 0 END """.format(**locals()) plpy.execute(""" CREATE TEMP TABLE {cur} AS SELECT {group_by_clause}, {subq}.__vertices__ AS {vertex_id}, {init_pr} AS pagerank FROM {vertices_per_group} INNER JOIN ( SELECT {grouping_cols}, {src} AS __vertices__ FROM {edge_temp_table} UNION SELECT {grouping_cols}, {dest} FROM {edge_temp_table} ){subq} ON {grouping_where_clause} {cur_distribution} """.format(**locals())) vpg = unique_string(desp='vpg') # Compute the out-degree of every node in the group-based # subgraphs. plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts)) plpy.execute("""CREATE TEMP TABLE {out_cnts} AS SELECT {grouping_cols_select} {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt} FROM {edge_temp_table} GROUP BY {grouping_cols_select} {src} {cnts_distribution} """.format(grouping_cols_select=grouping_cols + ',' if grouping_cols else '', **locals())) message_grp = _check_groups(cur, message, grouping_cols_list) cur_join_clause = cur_join_clause + ' AND ' + _check_groups( cur, edge_temp_table, grouping_cols_list) out_cnts_join_clause = out_cnts_join_clause + ' AND ' \ + _check_groups(out_cnts, edge_temp_table, grouping_cols_list) v1_join_clause = v1_join_clause + ' AND ' + _check_groups( v1, edge_temp_table, grouping_cols_list) vpg_join_clause = _check_groups( vpg, edge_temp_table, grouping_cols_list) vpg_t1_join_clause = _check_groups( vpg, '__t1__', grouping_cols_list) # join clause specific to populating random_prob for nodes without any # incoming edges. edge_grouping_cols_select = get_table_qualified_col_str( edge_temp_table, grouping_cols_list) cur_grouping_cols_select = get_table_qualified_col_str( cur, grouping_cols_list) # Create output summary table: cols_names_types = get_cols_and_types(edge_table) grouping_cols_clause = ', '.join([c_name + " " + c_type for (c_name, c_type) in cols_names_types if c_name in grouping_cols_list]) plpy.execute(""" CREATE TABLE {summary_table} ( {grouping_cols_clause}, __iterations__ INTEGER ) """.format(**locals())) # Create output table. This will be updated whenever a group converges # Note that vertex_id is assumed to be an integer (as described in # documentation) plpy.execute(""" CREATE TABLE {out_table} ( {grouping_cols_clause}, {vertex_id} INTEGER, pagerank DOUBLE PRECISION ) """.format(**locals())) temp_summary_table = unique_string(desp='temp_summary') plpy.execute( "DROP TABLE IF EXISTS {0}".format(temp_summary_table)) plpy.execute(""" CREATE TABLE {temp_summary_table} ( {grouping_cols_clause} ) """.format(**locals())) ################################################################ # Strings required for the main PageRank computation query grouping_cols_select_pr = edge_grouping_cols_select + ', ' if personalization_vertices: random_jump_prob = random_jump_prob_ppr else: random_jump_prob = 'MIN({vpg}.{random_prob})'.format( **locals()) vertices_per_group_inner_join_pr = """ INNER JOIN {vertices_per_group} AS {vpg} ON {vpg_join_clause}""".format(**locals()) ignore_group_clause_pr = ' WHERE ' + \ get_filtered_cols_subquery_str(edge_temp_table, summary_table, grouping_cols_list) ignore_group_clause_ins_noincoming = ' WHERE ' + \ get_filtered_cols_subquery_str(nodes_with_no_incoming_edges, summary_table, grouping_cols_list) # Strings required for updating PageRank scores of vertices that have # no incoming edges grouping_cols_select_ins = cur_grouping_cols_select + ',' vpg_from_clause_ins = ', {vertices_per_group} AS {vpg}'.format( **locals()) vpg_where_clause_ins = ' AND {vpg_t1_join_clause} '.format( **locals()) message_grp_where_ins = 'WHERE {message_grp}'.format(**locals()) ignore_group_clause_ins = ' AND ' + get_filtered_cols_subquery_str( cur, summary_table, grouping_cols_list) # Strings required for convergence test query grouping_cols_select_conv = cur_grouping_cols_select group_by_grouping_cols_conv = ' GROUP BY {0}'.format( cur_grouping_cols_select) message_grp_clause_conv = '{0} AND '.format(message_grp) ignore_group_clause_conv = ' AND ' + get_filtered_cols_subquery_str( cur, summary_table, grouping_cols_list) limit = '' # Find all nodes, in each group, that have no incoming edges. The PageRank # value of these nodes are not updated using the first query in the # following for loop. They must be explicitly plugged back in to the # message table, with their corresponding group's random_prob as their # PageRank values. plpy.execute(""" CREATE TABLE {nodes_with_no_incoming_edges} AS SELECT {select_group_cols}, __t1__.{src} AS {vertex_id}, {vpg}.{random_prob} AS pagerank FROM {edge_temp_table} AS __t1__ {vpg_from_clause_ins} WHERE NOT EXISTS ( SELECT 1 FROM {edge_temp_table} AS __t2__ WHERE __t1__.{src}=__t2__.{dest} AND {where_group_clause} ) {vpg_where_clause_ins} """.format( select_group_cols=get_table_qualified_col_str( '__t1__', grouping_cols_list), where_group_clause=_check_groups( '__t1__', '__t2__', grouping_cols_list), **locals())) else: # cur and out_cnts tables can be simpler when no grouping is # involved. if total_ppr_nodes > 0: init_value = ppr_init_value_clause else: init_value = 1.0 / n_vertices plpy.execute(""" CREATE TEMP TABLE {cur} AS SELECT {vertex_id}, {init_value}::DOUBLE PRECISION AS pagerank FROM {vertex_table} {cur_distribution} """.format(**locals())) # Compute the out-degree of every node in the graph. plpy.execute("DROP TABLE IF EXISTS {0}".format(out_cnts)) plpy.execute("""CREATE TEMP TABLE {out_cnts} AS SELECT {src} AS {vertex_id}, COUNT({dest}) AS {out_cnts_cnt} FROM {edge_temp_table} GROUP BY {src} {cnts_distribution} """.format(**locals())) # The summary table when there is no grouping will contain only # the iteration column. We don't need to create the out_table # when no grouping is used since the 'cur' table will be renamed # to out_table after pagerank computation is completed. plpy.execute(""" CREATE TABLE {summary_table} ( __iterations__ INTEGER ) """.format(**locals())) # Find all nodes in the graph that don't have any incoming edges and # assign random_prob as their pagerank values. plpy.execute(""" CREATE TABLE {nodes_with_no_incoming_edges} AS SELECT DISTINCT({src}), {random_probability} AS pagerank FROM {edge_temp_table} EXCEPT (SELECT DISTINCT({dest}), {random_probability} AS pagerank FROM {edge_temp_table}) """.format(**locals())) unconverged = 0 iteration_num = 0 for iteration_num in range(max_iter): ################################################################ # PageRank for node 'A' at any given iteration 'i' is given by: # PR_i(A) = damping_factor(PR_i-1(B)/degree(B) + # PR_i-1(C)/degree(C) + ...) + (1-damping_factor)/N # where 'N' is the number of vertices in the graph, # B, C are nodes that have edges to node A, and # degree(node) represents the number of outgoing edges from 'node' ################################################################ # Essentially, the pagerank for a node is based on an aggregate of a # fraction of the pagerank values of all the nodes that have incoming # edges to it, along with a small random probability. # More information can be found at: # https://en.wikipedia.org/wiki/PageRank#Damping_factor # The query below computes the PageRank of each node using the above # formula. A small explanatory note on ignore_group_clause: # This is used only when grouping is set. This essentially will have # the condition that will help skip the PageRank computation on groups # that have converged. plpy.execute(""" CREATE TABLE {message} AS SELECT {grouping_cols_select_pr} {edge_temp_table}.{dest} AS {vertex_id}, SUM(({v1}.pagerank)/{out_cnts}.{out_cnts_cnt})*{damping_factor}+ {random_jump_prob} AS pagerank FROM {edge_temp_table} INNER JOIN {cur} ON {cur_join_clause} INNER JOIN {out_cnts} ON {out_cnts_join_clause} INNER JOIN {cur} AS {v1} ON {v1_join_clause} {vertices_per_group_inner_join_pr} {ignore_group_clause} GROUP BY {grouping_cols_select_pr} {edge_temp_table}.{dest} {cur_distribution} """.format(ignore_group_clause=ignore_group_clause_pr if iteration_num > 0 else ignore_group_clause_first, **locals())) # If there are nodes that have no incoming edges, they are not # captured in the message table. Insert entries for such nodes, # with random_prob. plpy.execute(""" INSERT INTO {message} SELECT * FROM {nodes_with_no_incoming_edges} {ignore_group_clause} """.format(ignore_group_clause=ignore_group_clause_ins_noincoming if iteration_num > 0 else ignore_group_clause_first, **locals())) # Check for convergence: # Check for convergence only if threshold != 0. if threshold != 0: # message_unconv and cur_unconv will contain the unconverged groups # after current # and previous iterations respectively. Groups that # are missing in message_unconv but appear in cur_unconv are the # groups that have converged after this iteration's computations. # If no grouping columns are specified, then we check if there is # at least one unconverged node (limit 1 is used in the # query). plpy.execute(""" CREATE TEMP TABLE {message_unconv} AS SELECT {grouping_cols_select_conv} FROM {message} INNER JOIN {cur} ON {cur}.{vertex_id}={message}.{vertex_id} WHERE {message_grp_clause_conv} ABS({cur}.pagerank-{message}.pagerank) > {threshold} {ignore_group_clause} {group_by_grouping_cols_conv} {limit} """.format(ignore_group_clause=ignore_group_clause_ins if iteration_num > 0 else ignore_group_clause_conv, **locals())) unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0} """.format(message_unconv))[0]["cnt"] if iteration_num > 0 and grouping_cols: # Update result and summary tables for groups that have # converged # since the last iteration. update_output_grouping_tables_for_link_analysis(temp_summary_table, iteration_num, summary_table, out_table, message, grouping_cols_list, cur_unconv, message_unconv) plpy.execute("DROP TABLE IF EXISTS {0}".format(cur_unconv)) plpy.execute("""ALTER TABLE {message_unconv} RENAME TO {cur_unconv} """.format(**locals())) else: # Do not run convergence test if threshold=0, since that implies # the user wants to run max_iter iterations. unconverged = 1 plpy.execute("DROP TABLE IF EXISTS {0}".format(cur)) plpy.execute("""ALTER TABLE {message} RENAME TO {cur} """.format(**locals())) if unconverged == 0: break # If there still are some unconverged groups/(entire table), # update results. if grouping_cols: if unconverged > 0: if threshold != 0: # We completed max_iters, but there are still some unconverged # groups # Update the result and summary tables for unconverged # groups. update_output_grouping_tables_for_link_analysis(temp_summary_table, iteration_num, summary_table, out_table, cur, grouping_cols_list, cur_unconv) else: # No group has converged. List of all group values are in # distinct_grp_table. update_output_grouping_tables_for_link_analysis(temp_summary_table, iteration_num, summary_table, out_table, cur, grouping_cols_list, distinct_grp_table) # updating the calculated pagerank value in case of # Personalized Page Rank. # Ref : # https://docs.oracle.com/cd/E56133_01/latest/reference/algorithms/pagerank.html if total_ppr_nodes > 1: plpy.execute("""UPDATE {out_table} set pagerank = pagerank / {total_ppr_nodes}::DOUBLE PRECISION """.format(out_table=out_table, total_ppr_nodes=total_ppr_nodes)) else: # updating the calculated pagerank value in case of # Personalized Page Rank. # Ref : # https://docs.oracle.com/cd/E56133_01/latest/reference/algorithms/pagerank.html if total_ppr_nodes > 1: plpy.execute("""UPDATE {table_name} set pagerank = pagerank / {total_ppr_nodes}::DOUBLE PRECISION """.format(table_name=cur, total_ppr_nodes=total_ppr_nodes)) plpy.execute(""" ALTER TABLE {table_name} RENAME TO {out_table} """.format(table_name=cur, **locals())) plpy.execute(""" INSERT INTO {summary_table} VALUES ({iteration_num}+1) """.format(**locals())) # Step 4: Cleanup plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5},{6} """.format(out_cnts, edge_temp_table, cur, message, cur_unconv, message_unconv, nodes_with_no_incoming_edges)) if grouping_cols: plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2} """.format(vertices_per_group, temp_summary_table, distinct_grp_table)) def get_query_params_for_ppr(personalization_vertices, damping_factor, vertex_id, edge_temp_table, vertex_table, edge_params): """ This function will prepare the Join Clause and the condition to Calculate the Personalized Page Rank and Returns Total number of user provided personalization vertices, A join Clause and a clause to be added to existing formula to calculate pagerank. Args: @param personalization_vertices @param damping_factor @param vertex_id @param edge_temp_table @param vertex_table Returns : (Integer, String, String) """ total_ppr_nodes = 0 ppr_random_prob_clause = '' ppr_init_prob_clause = '' if personalization_vertices: total_ppr_nodes = len(personalization_vertices) ppr_init_value = 1.0 / total_ppr_nodes prob_value = 1.0 - damping_factor dest = edge_params["dest"] # In case of PPR, Assign the Random jump probability to the personalization_vertices only. # For rest of the nodes, Random jump probability will be zero. ppr_random_prob_clause = """ CASE WHEN {edge_temp_table}.{dest} = ANY(ARRAY{personalization_vertices}) THEN {prob_value} ELSE 0 END """.format(**locals()) ppr_init_prob_clause = """ CASE WHEN {vertex_id} = ANY(ARRAY{personalization_vertices}) THEN {ppr_init_value} ELSE 0 END """.format(**locals()) return(total_ppr_nodes, ppr_random_prob_clause, ppr_init_prob_clause) def pagerank_help(schema_madlib, message, **kwargs): """ Help function for pagerank Args: @param schema_madlib @param message: string, Help message string @param kwargs Returns: String. Help/usage information """ if message is not None and \ message.lower() in ("usage", "help", "?"): help_string = "Get from method below" help_string = get_graph_usage(schema_madlib, 'PageRank', """out_table TEXT, -- Name of the output table for PageRank damping_factor DOUBLE PRECISION, -- Damping factor in random surfer model -- (DEFAULT = 0.85) max_iter INTEGER, -- Maximum iteration number (DEFAULT = 100) threshold DOUBLE PRECISION, -- Stopping criteria (DEFAULT = 1/(N*1000), -- N is number of vertices in the graph) grouping_col TEXT, -- Comma separated column names to group on -- (DEFAULT = NULL, no grouping) personalization_vertices ARRAY OF INTEGER, -- A comma seperated list of vertices or nodes for personalized page rank. """) + """ A summary table is also created that contains information regarding the number of iterations required for convergence. It is named by adding the suffix '_summary' to the 'out_table' parameter. """ else: if message is not None and \ message.lower() in ("example", "examples"): help_string = """ ---------------------------------------------------------------------------- EXAMPLES ---------------------------------------------------------------------------- -- Create a graph, represented as vertex and edge tables. DROP TABLE IF EXISTS vertex, edge; CREATE TABLE vertex( id INTEGER ); CREATE TABLE edge( src INTEGER, dest INTEGER, user_id INTEGER ); INSERT INTO vertex VALUES (0), (1), (2), (3), (4), (5), (6); INSERT INTO edge VALUES (0, 1, 1), (0, 2, 1), (0, 4, 1), (1, 2, 1), (1, 3, 1), (2, 3, 1), (2, 5, 1), (2, 6, 1), (3, 0, 1), (4, 0, 1), (5, 6, 1), (6, 3, 1), (0, 1, 2), (0, 2, 2), (0, 4, 2), (1, 2, 2), (1, 3, 2), (2, 3, 2), (3, 0, 2), (4, 0, 2), (5, 6, 2), (6, 3, 2); -- Compute the PageRank: DROP TABLE IF EXISTS pagerank_out, pagerank_out_summary; SELECT madlib.pagerank( 'vertex', -- Vertex table 'id', -- Vertix id column 'edge', -- Edge table 'src=src, dest=dest', -- Comma delimted string of edge arguments 'pagerank_out'); -- Output table of PageRank -- View the PageRank of all vertices, sorted by their scores. SELECT * FROM pagerank_out ORDER BY pagerank DESC; -- View the summary table to find the number of iterations required for -- convergence. SELECT * FROM pagerank_out_summary; -- Compute PageRank of nodes associated with each user: DROP TABLE IF EXISTS pagerank_out, pagerank_out_summary; SELECT madlib.pagerank( 'vertex', -- Vertex table 'id', -- Vertix id column 'edge', -- Edge table 'src=src, dest=dest', -- Comma delimted string of edge arguments 'pagerank_out', -- Output table of PageRank NULL, -- Default damping factor NULL, -- Default max_iter 0.00000001, -- Threshold 'user_id'); -- Grouping column -- View the PageRank of all vertices, sorted by their scores. SELECT * FROM pagerank_out ORDER BY user_id, pagerank DESC; -- View the summary table to find the number of iterations required for -- convergence for each group. SELECT * FROM pagerank_out_summary; -- Compute the Personalized PageRank: DROP TABLE IF EXISTS pagerank_out, pagerank_out_summary; SELECT madlib.pagerank( 'vertex', -- Vertex table 'id', -- Vertix id column 'edge', -- Edge table 'src=src, dest=dest', -- Comma delimted string of edge arguments 'pagerank_out', -- Output table of PageRank NULL, -- Default damping factor (0.85) NULL, -- Default max iters (100) NULL, -- Default Threshold NULL, -- No Grouping ARRAY[2,4]); -- Personlized Nodes -- View the Personalized PageRank of all vertices, sorted by their scores. SELECT * FROM pagerank_out ORDER BY pagerank DESC; -- View the summary table to find the number of iterations required for -- convergence. SELECT * FROM pagerank_out_summary; """ else: help_string = """ ---------------------------------------------------------------------------- SUMMARY ---------------------------------------------------------------------------- Given a directed graph, pagerank algorithm finds the PageRank score of all the vertices in the graph. -- For an overview on usage, run: SELECT {schema_madlib}.pagerank('usage'); For some examples, run: SELECT {schema_madlib}.pagerank('example') -- """ return help_string.format(schema_madlib=schema_madlib) # ---------------------------------------------------------------------