/* * Populate the child table(s) of a time-based partition set with old data from the original parent */ CREATE FUNCTION partition_data_time(p_parent_table text, p_batch_count int DEFAULT 1, p_batch_interval interval DEFAULT NULL, p_lock_wait numeric DEFAULT 0, p_order text DEFAULT 'ASC') RETURNS bigint LANGUAGE plpgsql SECURITY DEFINER AS $$ DECLARE v_control text; v_datetime_string text; v_current_partition_name text; v_epoch boolean; v_last_partition text; v_lock_iter int := 1; v_lock_obtained boolean := FALSE; v_max_partition_timestamp timestamp; v_min_partition_timestamp timestamp; v_parent_schema text; v_parent_tablename text; v_partition_interval interval; v_partition_suffix text; v_partition_timestamp timestamp[]; v_quarter text; v_rowcount bigint; v_sql text; v_start_control timestamp; v_time_position int; v_total_rows bigint := 0; v_type text; v_year text; BEGIN SELECT partition_type , partition_interval::interval , control , datetime_string , epoch INTO v_type , v_partition_interval , v_control , v_datetime_string , v_epoch FROM @extschema@.part_config WHERE parent_table = p_parent_table AND (partition_type = 'time' OR partition_type = 'time-custom'); IF NOT FOUND THEN RAISE EXCEPTION 'ERROR: no config found for %', p_parent_table; END IF; IF p_batch_interval IS NULL OR p_batch_interval > v_partition_interval THEN p_batch_interval := v_partition_interval; END IF; SELECT partition_tablename INTO v_last_partition FROM @extschema@.show_partitions(p_parent_table, 'DESC') LIMIT 1; SELECT schemaname, tablename INTO v_parent_schema, v_parent_tablename FROM pg_catalog.pg_tables WHERE schemaname ||'.'|| tablename = p_parent_table; FOR i IN 1..p_batch_count LOOP IF v_epoch = false THEN IF p_order = 'ASC' THEN EXECUTE format('SELECT min(%I) FROM ONLY %I.%I', v_control, v_parent_schema, v_parent_tablename) INTO v_start_control; ELSIF p_order = 'DESC' THEN EXECUTE format('SELECT max(%I) FROM ONLY %I.%I', v_control, v_parent_schema, v_parent_tablename) INTO v_start_control; ELSE RAISE EXCEPTION 'Invalid value for p_order. Must be ASC or DESC'; END IF; ELSE IF p_order = 'ASC' THEN EXECUTE format('SELECT to_timestamp(min(%I)) FROM ONLY %I.%I', v_control, v_parent_schema, v_parent_tablename) INTO v_start_control; ELSIF p_order = 'DESC' THEN EXECUTE format('SELECT to_timestamp(max(%I)) FROM ONLY %I.%I', v_control, v_parent_schema, v_parent_tablename) INTO v_start_control; ELSE RAISE EXCEPTION 'Invalid value for p_order. Must be ASC or DESC'; END IF; END IF; IF v_start_control IS NULL THEN EXIT; END IF; IF v_type = 'time' THEN CASE WHEN v_partition_interval = '15 mins' THEN v_min_partition_timestamp := date_trunc('hour', v_start_control) + '15min'::interval * floor(date_part('minute', v_start_control) / 15.0); WHEN v_partition_interval = '30 mins' THEN v_min_partition_timestamp := date_trunc('hour', v_start_control) + '30min'::interval * floor(date_part('minute', v_start_control) / 30.0); WHEN v_partition_interval = '1 hour' THEN v_min_partition_timestamp := date_trunc('hour', v_start_control); WHEN v_partition_interval = '1 day' THEN v_min_partition_timestamp := date_trunc('day', v_start_control); WHEN v_partition_interval = '1 week' THEN v_min_partition_timestamp := date_trunc('week', v_start_control); WHEN v_partition_interval = '1 month' THEN v_min_partition_timestamp := date_trunc('month', v_start_control); WHEN v_partition_interval = '3 months' THEN v_min_partition_timestamp := date_trunc('quarter', v_start_control); WHEN v_partition_interval = '1 year' THEN v_min_partition_timestamp := date_trunc('year', v_start_control); END CASE; ELSIF v_type = 'time-custom' THEN -- Keep going backwards, checking if the time interval encompases the current v_start_control value v_time_position := (length(v_last_partition) - position('p_' in reverse(v_last_partition))) + 2; v_min_partition_timestamp := to_timestamp(substring(v_last_partition from v_time_position), v_datetime_string); v_max_partition_timestamp := v_min_partition_timestamp + v_partition_interval; LOOP IF v_start_control >= v_min_partition_timestamp AND v_start_control < v_max_partition_timestamp THEN EXIT; ELSE v_max_partition_timestamp := v_min_partition_timestamp; BEGIN v_min_partition_timestamp := v_min_partition_timestamp - v_partition_interval; EXCEPTION WHEN datetime_field_overflow THEN RAISE EXCEPTION 'Attempted partition time interval is outside PostgreSQL''s supported time range. Unable to create partition with interval before timestamp % ', v_min_partition_interval; END; END IF; END LOOP; END IF; v_partition_timestamp := ARRAY[v_min_partition_timestamp]; IF p_order = 'ASC' THEN IF (v_start_control + p_batch_interval) >= (v_min_partition_timestamp + v_partition_interval) THEN v_max_partition_timestamp := v_min_partition_timestamp + v_partition_interval; ELSE v_max_partition_timestamp := v_start_control + p_batch_interval; END IF; ELSIF p_order = 'DESC' THEN -- Must be greater than max value still in parent table since query below grabs < max v_max_partition_timestamp := v_min_partition_timestamp + v_partition_interval; -- Make sure minimum doesn't underflow current partition minimum IF (v_start_control - p_batch_interval) >= v_min_partition_timestamp THEN v_min_partition_timestamp = v_start_control - p_batch_interval; END IF; ELSE RAISE EXCEPTION 'Invalid value for p_order. Must be ASC or DESC'; END IF; -- do some locking with timeout, if required IF p_lock_wait > 0 THEN v_lock_iter := 0; WHILE v_lock_iter <= 5 LOOP v_lock_iter := v_lock_iter + 1; BEGIN IF v_epoch = false THEN v_sql := format('SELECT * FROM ONLY %I.%I WHERE %I >= %L AND %I < %L FOR UPDATE NOWAIT' , v_parent_schema , v_parent_tablename , v_control , v_min_partition_timestamp , v_control , v_max_partition_timestamp); ELSE v_sql := format('SELECT * FROM ONLY %I.%I WHERE to_timestamp(%I) >= %L AND to_timestamp(%I) < %L FOR UPDATE NOWAIT' , v_parent_schema , v_parent_tablename , v_control , v_min_partition_timestamp , v_control , v_max_partition_timestamp); END IF; EXECUTE v_sql; v_lock_obtained := TRUE; EXCEPTION WHEN lock_not_available THEN PERFORM pg_sleep( p_lock_wait / 5.0 ); CONTINUE; END; EXIT WHEN v_lock_obtained; END LOOP; IF NOT v_lock_obtained THEN RETURN -1; END IF; END IF; PERFORM @extschema@.create_partition_time(p_parent_table, v_partition_timestamp); -- This suffix generation code is in create_partition_time() as well v_partition_suffix := to_char(v_min_partition_timestamp, v_datetime_string); v_current_partition_name := @extschema@.check_name_length(v_parent_tablename, v_partition_suffix, TRUE); IF v_epoch = false THEN v_sql := format('WITH partition_data AS ( DELETE FROM ONLY %I.%I WHERE %I >= %L AND %I < %L RETURNING *) INSERT INTO %I.%I SELECT * FROM partition_data' , v_parent_schema , v_parent_tablename , v_control , v_min_partition_timestamp , v_control , v_max_partition_timestamp , v_parent_schema , v_current_partition_name); ELSE v_sql := format('WITH partition_data AS ( DELETE FROM ONLY %I.%I WHERE to_timestamp(%I) >= %L AND to_timestamp(%I) < %L RETURNING *) INSERT INTO %I.%I SELECT * FROM partition_data' , v_parent_schema , v_parent_tablename , v_control , v_min_partition_timestamp , v_control , v_max_partition_timestamp , v_parent_schema , v_current_partition_name); END IF; EXECUTE v_sql; GET DIAGNOSTICS v_rowcount = ROW_COUNT; v_total_rows := v_total_rows + v_rowcount; IF v_rowcount = 0 THEN EXIT; END IF; END LOOP; PERFORM @extschema@.create_function_time(p_parent_table); RETURN v_total_rows; END $$;