/* * Create the trigger function for the parent table of an id-based partition set */ CREATE FUNCTION create_function_id(p_parent_table text) RETURNS void LANGUAGE plpgsql SECURITY DEFINER AS $$ DECLARE ex_context text; ex_detail text; ex_hint text; ex_message text; v_control text; v_count int; v_current_partition_name text; v_current_partition_id bigint; v_datetime_string text; v_final_partition_id bigint; v_function_name text; v_higher_parent text := p_parent_table; v_id_position int; v_job_id bigint; v_jobmon text; v_jobmon_schema text; v_last_partition text; v_max bigint; v_next_partition_id bigint; v_next_partition_name text; v_old_search_path text; v_parent_schema text; v_parent_tablename text; v_partition_interval bigint; v_premake int; v_prev_partition_id bigint; v_prev_partition_name text; v_row_max_id record; v_run_maint boolean; v_step_id bigint; v_top_parent text := p_parent_table; v_trig_func text; BEGIN SELECT partition_interval::bigint , control , premake , use_run_maintenance , jobmon INTO v_partition_interval , v_control , v_premake , v_run_maint , v_jobmon FROM @extschema@.part_config WHERE parent_table = p_parent_table AND partition_type = 'id'; IF NOT FOUND THEN RAISE EXCEPTION 'ERROR: no config found for %', p_parent_table; END IF; SELECT show_partitions INTO v_last_partition FROM @extschema@.show_partitions(p_parent_table, 'DESC') LIMIT 1; IF v_jobmon THEN SELECT nspname INTO v_jobmon_schema FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid; IF v_jobmon_schema IS NOT NULL THEN SELECT current_setting('search_path') INTO v_old_search_path; EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||''',''false'')'; END IF; END IF; IF v_jobmon_schema IS NOT NULL THEN v_job_id := add_job('PARTMAN CREATE FUNCTION: '||p_parent_table); v_step_id := add_step(v_job_id, 'Creating partition function for table '||p_parent_table); END IF; SELECT schemaname, tablename INTO v_parent_schema, v_parent_tablename FROM pg_catalog.pg_tables WHERE schemaname ||'.'|| tablename = p_parent_table; v_function_name := @extschema@.check_name_length(v_parent_tablename, v_parent_schema, '_part_trig_func', FALSE); -- Get the highest level top parent if multi-level partitioned in order to get proper max() value below WHILE v_higher_parent IS NOT NULL LOOP -- initially set in DECLARE WITH top_oid AS ( SELECT i.inhparent AS top_parent_oid FROM pg_catalog.pg_inherits i JOIN pg_catalog.pg_class c ON c.oid = i.inhrelid JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname||'.'||c.relname = v_higher_parent ) SELECT n.nspname||'.'||c.relname INTO v_higher_parent FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace JOIN top_oid t ON c.oid = t.top_parent_oid JOIN @extschema@.part_config p ON p.parent_table = n.nspname||'.'||c.relname WHERE p.partition_type = 'id'; IF v_higher_parent IS NOT NULL THEN -- initially set in DECLARE v_top_parent := v_higher_parent; END IF; END LOOP; -- Loop through child tables starting from highest to get current max value in partition set -- Avoids doing a scan on entire partition set and/or getting any values accidentally in parent. FOR v_row_max_id IN SELECT show_partitions FROM @extschema@.show_partitions(v_top_parent, 'DESC') LOOP EXECUTE 'SELECT max('||v_control||') FROM '||v_row_max_id.show_partitions INTO v_max; IF v_max IS NOT NULL THEN EXIT; END IF; END LOOP; IF v_max IS NULL THEN v_max := 0; END IF; v_current_partition_id = v_max - (v_max % v_partition_interval); v_next_partition_id := v_current_partition_id + v_partition_interval; v_current_partition_name := @extschema@.check_name_length(v_parent_tablename, v_parent_schema, v_current_partition_id::text, TRUE); v_trig_func := 'CREATE OR REPLACE FUNCTION '||v_function_name||'() RETURNS trigger LANGUAGE plpgsql AS $t$ DECLARE v_count int; v_current_partition_id bigint; v_current_partition_name text; v_id_position int; v_last_partition text := '||quote_literal(v_last_partition)||'; v_next_partition_id bigint; v_next_partition_name text; v_partition_created boolean; BEGIN IF TG_OP = ''INSERT'' THEN IF NEW.'||v_control||' >= '||v_current_partition_id||' AND NEW.'||v_control||' < '||v_next_partition_id|| ' THEN '; SELECT count(*) INTO v_count FROM pg_catalog.pg_tables WHERE schemaname ||'.'||tablename = v_current_partition_name; IF v_count > 0 THEN v_trig_func := v_trig_func || ' INSERT INTO '||v_current_partition_name||' VALUES (NEW.*); '; ELSE v_trig_func := v_trig_func || ' -- Child table for current values does not exist in this partition set, so write to parent RETURN NEW;'; END IF; FOR i IN 1..v_premake LOOP v_prev_partition_id := v_current_partition_id - (v_partition_interval * i); v_next_partition_id := v_current_partition_id + (v_partition_interval * i); v_final_partition_id := v_next_partition_id + v_partition_interval; v_prev_partition_name := @extschema@.check_name_length(v_parent_tablename, v_parent_schema, v_prev_partition_id::text, TRUE); v_next_partition_name := @extschema@.check_name_length(v_parent_tablename, v_parent_schema, v_next_partition_id::text, TRUE); -- Check that child table exist before making a rule to insert to them. -- Handles edge case of changing premake immediately after running create_parent(). SELECT count(*) INTO v_count FROM pg_catalog.pg_tables WHERE schemaname ||'.'||tablename = v_prev_partition_name; IF v_count > 0 THEN -- Only handle previous partitions if they're starting above zero IF v_prev_partition_id >= 0 THEN v_trig_func := v_trig_func ||' ELSIF NEW.'||v_control||' >= '||v_prev_partition_id||' AND NEW.'||v_control||' < '||v_prev_partition_id + v_partition_interval|| ' THEN INSERT INTO '||v_prev_partition_name||' VALUES (NEW.*); '; END IF; END IF; SELECT count(*) INTO v_count FROM pg_catalog.pg_tables WHERE schemaname ||'.'||tablename = v_next_partition_name; IF v_count > 0 THEN v_trig_func := v_trig_func ||' ELSIF NEW.'||v_control||' >= '||v_next_partition_id||' AND NEW.'||v_control||' < '||v_final_partition_id|| ' THEN INSERT INTO '||v_next_partition_name||' VALUES (NEW.*);'; END IF; END LOOP; v_trig_func := v_trig_func ||' ELSE v_current_partition_id := NEW.'||v_control||' - (NEW.'||v_control||' % '||v_partition_interval||'); v_current_partition_name := @extschema@.check_name_length('''||v_parent_tablename||''', '''||v_parent_schema||''', v_current_partition_id::text, TRUE); SELECT count(*) INTO v_count FROM pg_tables WHERE schemaname ||''.''|| tablename = v_current_partition_name; IF v_count > 0 THEN EXECUTE ''INSERT INTO ''||v_current_partition_name||'' VALUES($1.*)'' USING NEW; ELSE RETURN NEW; END IF; END IF;'; IF v_run_maint IS FALSE THEN v_trig_func := v_trig_func ||' v_current_partition_id := NEW.'||v_control||' - (NEW.'||v_control||' % '||v_partition_interval||'); IF (NEW.'||v_control||' % '||v_partition_interval||') > ('||v_partition_interval||' / 2) THEN v_id_position := (length(v_last_partition) - position(''p_'' in reverse(v_last_partition))) + 2; v_next_partition_id := (substring(v_last_partition from v_id_position)::bigint) + '||v_partition_interval||'; WHILE ((v_next_partition_id - v_current_partition_id) / '||v_partition_interval||') <= '||v_premake||' LOOP v_partition_created := @extschema@.create_partition_id('||quote_literal(p_parent_table)||', ARRAY[v_next_partition_id]); IF v_partition_created THEN PERFORM @extschema@.create_function_id('||quote_literal(p_parent_table)||'); PERFORM @extschema@.apply_constraints('||quote_literal(p_parent_table)||'); END IF; v_next_partition_id := v_next_partition_id + '||v_partition_interval||'; END LOOP; END IF;'; END IF; v_trig_func := v_trig_func ||' END IF; RETURN NULL; END $t$;'; EXECUTE v_trig_func; IF v_jobmon_schema IS NOT NULL THEN PERFORM update_step(v_step_id, 'OK', 'Added function for current id interval: '||v_current_partition_id||' to '||v_final_partition_id-1); END IF; IF v_jobmon_schema IS NOT NULL THEN PERFORM close_job(v_job_id); EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')'; END IF; EXCEPTION WHEN OTHERS THEN GET STACKED DIAGNOSTICS ex_message = MESSAGE_TEXT, ex_context = PG_EXCEPTION_CONTEXT, ex_detail = PG_EXCEPTION_DETAIL, ex_hint = PG_EXCEPTION_HINT; IF v_jobmon_schema IS NOT NULL THEN IF v_job_id IS NULL THEN EXECUTE format('SELECT %I.add_job(''PARTMAN CREATE FUNCTION: %s'')', v_jobmon_schema, p_parent_table) INTO v_job_id; EXECUTE format('SELECT %I.add_step(%s, ''Partition function maintenance for table %s failed'')', v_jobmon_schema, v_job_id, p_parent_table) INTO v_step_id; ELSIF v_step_id IS NULL THEN EXECUTE format('SELECT %I.add_step(%s, ''EXCEPTION before first step logged'')', v_jobmon_schema, v_job_id) INTO v_step_id; END IF; EXECUTE format('SELECT %I.update_step(%s, ''CRITICAL'', %L)', v_jobmon_schema, v_step_id, 'ERROR: '||coalesce(SQLERRM,'unknown')); EXECUTE format('SELECT %I.fail_job(%s)', v_jobmon_schema, v_job_id); END IF; RAISE EXCEPTION '% CONTEXT: % DETAIL: % HINT: %', ex_message, ex_context, ex_detail, ex_hint; END $$;