-- -- E-Maj : logs and rollbacks table updates : V 0.11.0 -- -- This software is distributed under the GNU General Public License. -- -- This script installs E-Maj extension for PostgreSQL version prior 9.1 -- -- This script must be executed by a role having SUPERUSER privileges. -- Before its execution: -- -> the concerned cluster must contain a tablespace named "tspemaj", for instance previously created by -- CREATE TABLESPACE tspemaj LOCATION '/.../tspemaj', -- -> the plpgsql language must have been created in the concerned database, -- (-> the dblink contrib/extension must have been installed.) \set ON_ERROR_STOP ON \set QUIET ON SET client_min_messages TO WARNING; \echo 'E-Maj objects creation...' -- create, execute and drop a specific plpgsql function to check the environment -- If all pre-requisites are not met, it generates an error that blocks the E-Maj installation. CREATE or REPLACE FUNCTION public.emaj_tmp_check_envir() RETURNS VOID LANGUAGE plpgsql AS $tmp$ DECLARE v_stmt TEXT; BEGIN -- the creation of the function implicitely validates that plpgsql language is created! -- check postgres version is >= 8.2 -- (warning, the test is alphanumeric => to be adapted when pg 10.0 will appear!) IF substring (version() from E'PostgreSQL\\s(\\d+\\.\\d+)') < '8.2' THEN RAISE EXCEPTION 'E-Maj installation: the current postgres version is too old for E-Maj.'; END IF; -- check the current role is a superuser PERFORM 0 FROM pg_roles WHERE rolname = current_user AND rolsuper; IF NOT FOUND THEN RAISE EXCEPTION 'E-Maj installation: the current user (%) is not a superuser.', current_user; END IF; -- check the tspemaj tablespace is already created PERFORM 0 FROM pg_tablespace WHERE spcname = 'tspemaj'; IF NOT FOUND THEN RAISE EXCEPTION 'E-Maj installation: the "tspemaj" tablespace doesn''t exist.'; END IF; -- check the dblink contrib is installed -- PERFORM 0 FROM pg_proc WHERE proname = 'dblink'; -- IF NOT FOUND THEN -- RAISE EXCEPTION 'E-Maj installation: dblink contrib/extension is not installed.'; -- END IF; -- RETURN; END; $tmp$; SELECT public.emaj_tmp_check_envir(); DROP FUNCTION public.emaj_tmp_check_envir(); -- OK, now create E-Maj objects in a single transaction BEGIN TRANSACTION; ------------------------------------------ -- -- -- emaj schema and environment checking -- -- -- ------------------------------------------ -- if an emaj schema already exists, drop it DROP SCHEMA IF EXISTS emaj CASCADE; -- creation of the schema 'emaj' containing all the needed objets CREATE SCHEMA emaj; COMMENT ON SCHEMA emaj IS $$Contains all E-Maj related objects.$$; -- create emaj roles and the _txid_current() function CREATE or REPLACE FUNCTION emaj._tmp_create_some_components() RETURNS VOID LANGUAGE plpgsql AS $tmp$ DECLARE v_pgVersion TEXT := substring (version() from E'PostgreSQL\\s(\\d+\\.\\d+)'); v_stmt TEXT; BEGIN -- create emaj roles (NOLOGIN), if they do not exist -- does 'emaj_adm' already exist ? PERFORM 1 FROM pg_roles WHERE rolname = 'emaj_adm'; -- if no, create it IF NOT FOUND THEN CREATE ROLE emaj_adm; COMMENT ON ROLE emaj_adm IS $$This role may be granted to other roles in charge of E-Maj administration.$$; END IF; -- does 'emaj_viewer' already exist ? PERFORM 1 FROM pg_roles WHERE rolname = 'emaj_viewer'; -- if no, create it IF NOT FOUND THEN CREATE ROLE emaj_viewer; COMMENT ON ROLE emaj_viewer IS $$This role may be granted to other roles allowed to view E-Maj objects content.$$; END IF; -- create a SQL emaj_txid_function that encapsulates the standart txid_current function with postgres 8.3+ -- or just returns 0 with postgres 8.2 v_stmt = 'CREATE or REPLACE FUNCTION emaj._txid_current() RETURNS BIGINT LANGUAGE SQL AS $$ SELECT '; IF v_pgVersion < '8.3' THEN v_stmt = v_stmt || '0::BIGINT;$$'; ELSE v_stmt = v_stmt || 'txid_current();$$'; END IF; EXECUTE v_stmt; -- RETURN; END; $tmp$; SELECT emaj._tmp_create_some_components(); DROP FUNCTION emaj._tmp_create_some_components(); ----------------------------------------- -- -- -- emaj technical sequences and tables -- -- -- ----------------------------------------- -- the emaj_global_seq sequence provides a unique identifier for all rows inserted into all emaj log tables of the database. -- It is used to order all these rows in insertion time order for rollback as well as other purposes. -- (So this order is not based on system time that can be unsafe). -- The sequence is created with the following (default) characteristics: -- - increment = 1 -- - no cache (to keep the delivered nextval value in time order) -- - no cycle (would the end of the sequence be reached, no new log row would be accepted) CREATE SEQUENCE emaj.emaj_global_seq; COMMENT ON SEQUENCE emaj.emaj_global_seq IS $$Global sequence to identifiy all rows of emaj log tables.$$; -- table containing Emaj parameters CREATE TABLE emaj.emaj_param ( param_key TEXT NOT NULL, -- parameter key param_value_text TEXT, -- value if type is text, otherwise NULL param_value_int BIGINT, -- value if type is bigint, otherwise NULL param_value_boolean BOOLEAN, -- value if type is boolean, otherwise NULL param_value_interval INTERVAL, -- value if type is interval, otherwise NULL PRIMARY KEY (param_key) ) TABLESPACE tspemaj; COMMENT ON TABLE emaj.emaj_param IS $$Contains E-Maj parameters.$$; -- table containing the history of all E-Maj events CREATE TABLE emaj.emaj_hist ( -- records the history of hist_id BIGSERIAL NOT NULL, -- internal id hist_datetime TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(), -- insertion time hist_function TEXT NOT NULL, -- main E-Maj function generating the event hist_event TEXT, -- type of event (often BEGIN or END) hist_object TEXT, -- object supporting the event (often the group name) hist_wording TEXT, -- additional comment hist_user TEXT DEFAULT session_user, -- the user who call the E-Maj function hist_txid BIGINT DEFAULT emaj._txid_current(), -- and its tx_id PRIMARY KEY (hist_id) ) TABLESPACE tspemaj; COMMENT ON TABLE emaj.emaj_hist IS $$Contains E-Maj events history.$$; -- table containing the definition of groups' content. Filled and maintained by the user, it is used by emaj_create_group function. CREATE TABLE emaj.emaj_group_def ( grpdef_group TEXT NOT NULL, -- name of the group containing this table or sequence grpdef_schema TEXT NOT NULL, -- schema name of this table or sequence grpdef_tblseq TEXT NOT NULL, -- table or sequence name grpdef_priority INTEGER, -- priority level (tables are processed in ascending -- order, with NULL last) PRIMARY KEY (grpdef_group, grpdef_schema, grpdef_tblseq) -- the group name is included in the pkey so that a table/sequence can be temporarily assigned to several groups ) TABLESPACE tspemaj; COMMENT ON TABLE emaj.emaj_group_def IS $$Contains E-Maj groups definition, supplied by the E-Maj administrator.$$; -- table containing the defined groups -- rows are created at emaj_create_group time and deleted at emaj_drop_group time CREATE TABLE emaj.emaj_group ( group_name TEXT NOT NULL, group_state TEXT NOT NULL, -- 2 possibles states: -- 'LOGGING' between emaj_start_group and emaj_stop_group -- 'IDLE' in other cases group_nb_table INT, -- number of tables at emaj_create_group time group_nb_sequence INT, -- number of sequences at emaj_create_group time group_is_rollbackable BOOLEAN, -- false for 'AUDIT_ONLY' groups, true for 'ROLLBACKABLE' groups group_creation_datetime TIMESTAMPTZ NOT NULL -- start time of the transaction that created the group DEFAULT transaction_timestamp(), group_pg_version TEXT NOT NULL -- postgres version at emaj_create_group() time DEFAULT substring (version() from E'PostgreSQL\\s([.,0-9,A-Z,a-z]*)'), group_comment TEXT, -- optional user comment PRIMARY KEY (group_name) ) TABLESPACE tspemaj; COMMENT ON TABLE emaj.emaj_group IS $$Contains created E-Maj groups.$$; -- table containing the relations (tables and sequences) of created tables groups CREATE TABLE emaj.emaj_relation ( rel_schema TEXT NOT NULL, -- schema name containing the relation rel_tblseq TEXT NOT NULL, -- table or sequence name rel_group TEXT NOT NULL, -- name of the group that owns the relation rel_priority INTEGER, -- priority level of processing inside the group rel_kind TEXT, -- similar to the relkind column of pg_class table -- ('r' = table, 'S' = sequence) rel_session INT, -- rollback session id rel_rows BIGINT, -- number of rows to rollback, computed at rollback time PRIMARY KEY (rel_schema, rel_tblseq), FOREIGN KEY (rel_group) REFERENCES emaj.emaj_group (group_name) ON DELETE CASCADE ) TABLESPACE tspemaj; COMMENT ON TABLE emaj.emaj_relation IS $$Contains the content (tables and sequences) of created E-Maj groups.$$; -- table containing the marks CREATE TABLE emaj.emaj_mark ( mark_group TEXT NOT NULL, -- group for which the mark has been set mark_name TEXT NOT NULL, -- mark name mark_id BIGSERIAL NOT NULL, -- serial id used to order rows (not to rely on timestamps -- that are not safe if system time changes) mark_datetime TIMESTAMPTZ NOT NULL, -- precise timestamp of the mark creation, used as a reference -- for other tables like emaj_sequence and all log tables mark_global_seq BIGINT NOT NULL, -- emaj_global_seq last value at mark set (used to rollback tables) mark_state TEXT, -- state of the mark, with 2 possible values: -- 'ACTIVE' and 'DELETED' mark_comment TEXT, -- optional user comment mark_txid BIGINT -- id of the tx that has set the mark DEFAULT emaj._txid_current(), mark_last_sequence_id BIGINT, -- last sequ_id for the group at the end of the _set_mark_groups operation mark_last_seq_hole_id BIGINT, -- last sqhl_id for the group at _set_mark_groups time mark_log_rows_before_next BIGINT, -- number of log rows recorded for the group between the mark and the next one (NULL if last mark) - used to speedup marks lists display in phpPgAdmin plugin PRIMARY KEY (mark_group, mark_name), FOREIGN KEY (mark_group) REFERENCES emaj.emaj_group (group_name) ON DELETE CASCADE ) TABLESPACE tspemaj; COMMENT ON TABLE emaj.emaj_mark IS $$Contains marks set on E-Maj tables groups.$$; -- table containing the sequences characteristics log -- (to record at mark time the state of application sequences and sequences used by log tables) CREATE TABLE emaj.emaj_sequence ( sequ_id BIGSERIAL NOT NULL, -- serial id used to delete oldest or newest rows (not to rely -- on timestamps that are not safe if system time changes) sequ_schema TEXT NOT NULL, -- application or 'emaj' schema or that owns the sequence sequ_name TEXT NOT NULL, -- application or emaj sequence name sequ_datetime TIMESTAMPTZ NOT NULL, -- timestamp the sequence characteristics have been recorded -- the same timestamp as referenced in emaj_mark table sequ_mark TEXT NOT NULL, -- name of the mark associated to the insertion timestamp sequ_last_val BIGINT NOT NULL, -- sequence last value sequ_start_val BIGINT NOT NULL, -- sequence start value, (0 with postgres 8.2) sequ_increment BIGINT NOT NULL, -- sequence increment sequ_max_val BIGINT NOT NULL, -- sequence max value sequ_min_val BIGINT NOT NULL, -- sequence min value sequ_cache_val BIGINT NOT NULL, -- sequence cache value sequ_is_cycled BOOLEAN NOT NULL, -- sequence flag 'is cycled ?' sequ_is_called BOOLEAN NOT NULL, -- sequence flag 'is called ?' PRIMARY KEY (sequ_schema, sequ_name, sequ_datetime) ) TABLESPACE tspemaj; COMMENT ON TABLE emaj.emaj_sequence IS $$Contains values of sequences at E-Maj set_mark times.$$; -- table containing the holes in sequences log -- these holes are due to rollback operations that do not adjust log sequences -- the hole size = difference of sequence's current last_value and last value at the rollback mark CREATE TABLE emaj.emaj_seq_hole ( sqhl_id BIGSERIAL NOT NULL, -- serial id used to delete oldest or newest rows (not to rely -- on timestamps that are not safe if system time changes) sqhl_schema TEXT NOT NULL, -- schema that owns the table sqhl_table TEXT NOT NULL, -- application table for which a sequence hole is recorded -- in the associated log table sqhl_datetime TIMESTAMPTZ NOT NULL -- timestamp of the rollback operation that generated the hole DEFAULT transaction_timestamp(), sqhl_hole_size BIGINT NOT NULL, -- hole size computed as the difference of 2 sequence last-values PRIMARY KEY (sqhl_schema, sqhl_table, sqhl_datetime) ) TABLESPACE tspemaj; COMMENT ON TABLE emaj.emaj_seq_hole IS $$Contains description of holes in sequence values for E-Maj log tables.$$; -- table containing statistics about previously executed rollback operations -- and used to estimate rollback durations CREATE TABLE emaj.emaj_rlbk_stat ( rlbk_operation TEXT NOT NULL, -- type of operation, can contains 'rlbk', 'del_log', 'cre_fk' rlbk_schema TEXT NOT NULL, -- schema that owns the table or the foreign key rlbk_tbl_fk TEXT NOT NULL, -- table or foreign key name rlbk_datetime TIMESTAMPTZ NOT NULL, -- timestamp of the rollback that has generated the statistic rlbk_nb_rows BIGINT NOT NULL, -- number of rows processed by the operation rlbk_duration INTERVAL NOT NULL, -- duration of the elementary operation PRIMARY KEY (rlbk_operation, rlbk_schema, rlbk_tbl_fk, rlbk_datetime) ) TABLESPACE tspemaj; COMMENT ON TABLE emaj.emaj_rlbk_stat IS $$Contains statistics about previous E-Maj rollback durations.$$; -- working storage table containing foreign key definition -- (used at table rollback time to drop and later recreate foreign keys) CREATE TABLE emaj.emaj_fk ( fk_groups TEXT[] NOT NULL, -- groups for which the rollback operation is performed fk_session INT NOT NULL, -- session number (for parallel rollback purpose) fk_name TEXT NOT NULL, -- foreign key name fk_schema TEXT NOT NULL, -- schema name of the table that owns the foreign key fk_table TEXT NOT NULL, -- name of the table that owns the foreign key fk_def TEXT NOT NULL, -- foreign key definition as reported by pg_get_constraintdef PRIMARY KEY (fk_groups, fk_name, fk_schema, fk_table) ) TABLESPACE tspemaj; COMMENT ON TABLE emaj.emaj_fk IS $$Contains temporary description of foreign keys suppressed by E-Maj rollback operations.$$; ------------------------------------ -- -- -- emaj types -- -- -- ------------------------------------ CREATE TYPE emaj.emaj_log_stat_type AS ( stat_group TEXT, stat_schema TEXT, stat_table TEXT, stat_rows BIGINT ); COMMENT ON TYPE emaj.emaj_log_stat_type IS $$Represents the structure of rows returned by the emaj_log_stat_group() function.$$; CREATE TYPE emaj.emaj_detailed_log_stat_type AS ( stat_group TEXT, stat_schema TEXT, stat_table TEXT, stat_role VARCHAR(32), stat_verb VARCHAR(6), stat_rows BIGINT ); COMMENT ON TYPE emaj.emaj_detailed_log_stat_type IS $$Represents the structure of rows returned by the emaj_detailed_log_stat_group() function.$$; ------------------------------------ -- -- -- 'Fixed' parameters -- -- -- ------------------------------------ INSERT INTO emaj.emaj_param (param_key, param_value_text) VALUES ('emaj_version','0.11.0'); -- Other parameters are optional. They are set by users if needed. -- The history_retention parameter defines the time interval when a row remains in the emaj history table - default is 1 month -- INSERT INTO emaj.emaj_param (param_key, param_value_interval) VALUES ('history_retention','1 month'::interval); -- 4 parameters are used by the emaj_estimate_rollback_duration function as a default value to compute the approximate duration of a rollback operation. -- The avg_row_rollback_duration parameter defines the average duration needed to rollback a row. -- INSERT INTO emaj.emaj_param (param_key, param_value_interval) VALUES ('avg_row_rollback_duration','100 microsecond'::interval); -- The avg_row_delete_log_duration parameter defines the average duration needed to delete log rows. -- INSERT INTO emaj.emaj_param (param_key, param_value_interval) VALUES ('avg_row_delete_log_duration','10 -microsecond'::interval); -- The fixed_table_rollback_duration parameter defines the fixed rollback cost for any table or sequence belonging to a group -- INSERT INTO emaj.emaj_param (param_key, param_value_interval) VALUES ('fixed_table_rollback_duration','5 millisecond'::interval); -- The fixed_table_with_rollback_duration parameter defines the additional fixed rollback cost for any table that has effective rows to rollback -- INSERT INTO emaj.emaj_param (param_key, param_value_interval) VALUES ('fixed_table_with_rollback_duration','2.5 millisecond'::interval); ------------------------------------ -- -- -- Low level Functions -- -- -- ------------------------------------ CREATE or REPLACE FUNCTION emaj._pg_version() RETURNS TEXT LANGUAGE sql IMMUTABLE as $$ -- This functions returns as a string the 2 major parts of the current postgresql version (x.y) SELECT substring (version() from E'PostgreSQL\\s(\\d+\\.\\d+)'); $$; CREATE or REPLACE FUNCTION emaj._purge_hist() RETURNS VOID LANGUAGE sql as $$ -- This function purges the emaj history by deleting all rows prior the 'history_retention' parameter -- and not deleting rows generated by groups that are currently in logging state DELETE FROM emaj.emaj_hist WHERE hist_datetime < (SELECT MIN(datetime) FROM ( -- compute the least recent start_group time of groups in logging state (SELECT MIN(hist_datetime) FROM emaj.emaj_group, emaj.emaj_hist WHERE group_name = ANY (string_to_array(hist_object,',')) AND group_state = 'LOGGING' AND (hist_function = 'START_GROUP' OR hist_function = 'START_GROUPS') AND hist_event = 'BEGIN') UNION -- compute the timestamp of now minus the history_retention (1 month by default) (SELECT current_timestamp - coalesce((SELECT param_value_interval FROM emaj.emaj_param WHERE param_key = 'history_retention'),'1 MONTH')) ) AS tmst(datetime)); $$; CREATE or REPLACE FUNCTION emaj._get_mark_name(TEXT, TEXT) RETURNS TEXT LANGUAGE sql as $$ -- This function returns a mark name if exists for a group, processing the EMAJ_LAST_MARK keyword. -- input: group name and mark name -- output: mark name or NULL SELECT case when $2 = 'EMAJ_LAST_MARK' then (SELECT mark_name FROM emaj.emaj_mark WHERE mark_group = $1 ORDER BY mark_datetime DESC LIMIT 1) else (SELECT mark_name FROM emaj.emaj_mark WHERE mark_group = $1 AND mark_name = $2) end $$; CREATE or REPLACE FUNCTION emaj._get_mark_datetime(TEXT, TEXT) RETURNS TIMESTAMPTZ LANGUAGE sql as $$ -- This function returns the creation timestamp of a mark if exists for a group, -- processing the EMAJ_LAST_MARK keyword. -- input: group name and mark name -- output: mark date-time or NULL SELECT case when $2 = 'EMAJ_LAST_MARK' then (SELECT MAX(mark_datetime) FROM emaj.emaj_mark WHERE mark_group = $1) else (SELECT mark_datetime FROM emaj.emaj_mark WHERE mark_group = $1 AND mark_name = $2) end $$; CREATE or REPLACE FUNCTION emaj._build_log_seq_name(TEXT, TEXT) RETURNS TEXT LANGUAGE sql IMMUTABLE as $$ -- This function returns the log sequence name associated to an application schema and table. -- input: application schema name and table name -- output: log sequence name SELECT $1 || '_' || $2 || '_log_seq' $$; CREATE or REPLACE FUNCTION emaj._check_group_names_array(v_groupNames TEXT[]) RETURNS TEXT[] LANGUAGE plpgsql AS $_check_group_names_array$ -- This function build a array of group names similar to the supplied array, except that NULL -- values, empty string and duplicate names are suppressed. Issue a warning if the result array is NULL. -- Input: group names array -- Output: validated group names array DECLARE v_gn TEXT[]; v_i INT; BEGIN IF array_upper(v_groupNames,1) >= 1 THEN -- if there are elements, build the result array FOR v_i in 1 .. array_upper(v_groupNames,1) LOOP -- look for not NULL & not empty group name IF v_groupNames[v_i] IS NULL OR v_groupNames[v_i] = '' THEN RAISE WARNING '_check_group_names_array: a group name is NULL or empty.'; -- look for duplicate name ELSEIF v_gn IS NOT NULL AND v_groupNames[v_i] = ANY (v_gn) THEN RAISE WARNING '_check_group_names_array: duplicate group name %.',v_groupNames[v_i]; ELSE -- OK, keep the name v_gn = array_append (v_gn, v_groupNames[v_i]); END IF; END LOOP; END IF; -- check for NULL result IF v_gn IS NULL THEN RAISE WARNING '_check_group_names_array: No group name to process.'; END IF; RETURN v_gn; END; $_check_group_names_array$; CREATE or REPLACE FUNCTION emaj._check_class(v_schemaName TEXT, v_className TEXT) RETURNS TEXT LANGUAGE plpgsql AS $_check_class$ -- This function verifies that an application table or sequence exists in pg_class -- It also protects from a recursive use : tables or sequences from emaj schema cannot be managed by EMAJ -- Input: the names of the schema and the class (table or sequence) -- Output: the relkind of the class : 'r' for a table and 's' for a sequence -- If the schema or the class is not known, the function stops. DECLARE v_relkind TEXT; v_schemaOid OID; BEGIN IF v_schemaName = 'emaj' THEN RAISE EXCEPTION '_check_class: object from schema % cannot be managed by EMAJ.', v_schemaName; END IF; SELECT oid INTO v_schemaOid FROM pg_namespace WHERE nspname = v_schemaName; IF NOT found THEN RAISE EXCEPTION '_check_class: schema % doesn''t exist.', v_schemaName; END IF; SELECT relkind INTO v_relkind FROM pg_class WHERE relNameSpace = v_schemaOid AND relName = v_className AND relkind in ('r','S'); IF NOT found THEN RAISE EXCEPTION '_check_class: table or sequence % doesn''t exist.', v_className; END IF; RETURN v_relkind; END; $_check_class$; CREATE or REPLACE FUNCTION emaj._check_new_mark(v_mark TEXT, v_groupNames TEXT[]) RETURNS TEXT LANGUAGE plpgsql AS $_check_new_mark$ -- This function verifies that a new mark name supplied the user is valid. -- It processes the possible NULL mark value and the replacement of % wild characters. -- It also checks that the mark name do not already exist for any group. -- Input: name of the mark to set, array of group names -- The array of group names may be NULL to avoid the check against groups -- Output: internal name of the mark DECLARE v_i INT; v_markName TEXT := v_mark; BEGIN -- check the mark name is not 'EMAJ_LAST_MARK' IF v_mark = 'EMAJ_LAST_MARK' THEN RAISE EXCEPTION '_check_new_mark: % is not an allowed name for a new mark.', v_mark; END IF; -- process null or empty supplied mark name IF v_markName = '' OR v_markName IS NULL THEN v_markName = 'MARK_%'; END IF; -- process % wild characters in mark name v_markName = replace(v_markName, '%', to_char(current_timestamp, 'HH24.MI.SS.MS')); -- if requested, check the existence of the mark in groups IF v_groupNames IS NOT NULL THEN -- for each group of the array, FOR v_i in 1 .. array_upper(v_groupNames,1) LOOP -- ... if a mark with the same name already exists for the group, stop PERFORM 1 FROM emaj.emaj_mark WHERE mark_group = v_groupNames[v_i] AND mark_name = v_markName; IF FOUND THEN RAISE EXCEPTION '_check_new_mark: Group % already contains a mark named %.', v_groupNames[v_i], v_markName; END IF; END LOOP; END IF; RETURN v_markName; END; $_check_new_mark$; CREATE or REPLACE FUNCTION emaj._create_tbl(v_schemaName TEXT, v_tableName TEXT, v_isRollbackable BOOLEAN) RETURNS void LANGUAGE plpgsql SECURITY DEFINER AS $_create_tbl$ -- This function creates all what is needed to manage the log and rollback operations for an application table -- Input: schema name (mandatory even for the 'public' schema), table name, boolean indicating whether the table belongs to a rollbackable group -- Are created: -- - the associated log table, with its own sequence -- - the function that logs the tables updates, defined as a trigger -- - the rollback function (one per table) -- The function is defined as SECURITY DEFINER so that emaj_adm role can use it even if he is not the owner of the application table. DECLARE -- variables for the name of tables, functions, triggers,... v_fullTableName TEXT; v_emajSchema TEXT := 'emaj'; v_emajTblSpace TEXT := 'tspemaj'; v_logTableName TEXT; v_logIdxName TEXT; v_logFnctName TEXT; v_rlbkFnctName TEXT; v_exceptionRlbkFnctName TEXT; v_logTriggerName TEXT; v_truncTriggerName TEXT; v_sequenceName TEXT; -- variables to hold pieces of SQL v_pkCondList TEXT; v_colList TEXT; v_valList TEXT; v_setList TEXT; -- other variables v_attname TEXT; v_relhaspkey BOOLEAN; v_pgVersion TEXT := emaj._pg_version(); v_stmt TEXT := ''; v_triggerList TEXT := ''; r_column RECORD; r_trigger RECORD; -- cursor to retrieve all columns of the application table col1_curs CURSOR (tbl regclass) FOR SELECT attname FROM pg_attribute WHERE attrelid = tbl AND attnum > 0 AND attisdropped = false ORDER BY attnum; -- cursor to retrieve all columns of table's primary key -- (taking column names in pg_attribute from the table's definition instead of index definition is mandatory -- starting from pg9.0, joining tables with indkey instead of indexrelid) col2_curs CURSOR (tbl regclass) FOR SELECT attname FROM pg_attribute, pg_index WHERE pg_attribute.attrelid = pg_index.indrelid AND attnum = ANY (indkey) AND indrelid = tbl AND indisprimary AND attnum > 0 AND attisdropped = false; BEGIN -- check the table has a primary key SELECT true INTO v_relhaspkey FROM pg_class, pg_namespace, pg_constraint WHERE relnamespace = pg_namespace.oid AND connamespace = pg_namespace.oid AND conrelid = pg_class.oid AND contype = 'p' AND nspname = v_schemaName AND relname = v_tableName; IF NOT FOUND THEN v_relhaspkey = false; END IF; IF v_isRollbackable AND v_relhaspkey = FALSE THEN RAISE EXCEPTION '_create_tbl: table % has no PRIMARY KEY.', v_tableName; END IF; -- OK, build the different name for table, trigger, functions,... v_fullTableName := quote_ident(v_schemaName) || '.' || quote_ident(v_tableName); v_logTableName := quote_ident(v_emajSchema) || '.' || quote_ident(v_schemaName || '_' || v_tableName || '_log'); v_logIdxName := quote_ident(v_schemaName || '_' || v_tableName || '_log_idx'); v_logFnctName := quote_ident(v_emajSchema) || '.' || quote_ident(v_schemaName || '_' || v_tableName || '_log_fnct'); v_rlbkFnctName := quote_ident(v_emajSchema) || '.' || quote_ident(v_schemaName || '_' || v_tableName || '_rlbk_fnct'); v_exceptionRlbkFnctName=substring(quote_literal(v_rlbkFnctName) FROM '^(.*).$'); -- suppress last character v_logTriggerName := quote_ident(v_schemaName || '_' || v_tableName || '_emaj_log_trg'); v_truncTriggerName := quote_ident(v_schemaName || '_' || v_tableName || '_emaj_trunc_trg'); v_sequenceName := quote_ident(v_emajSchema) || '.' || quote_ident(emaj._build_log_seq_name(v_schemaName, v_tableName)); -- creation of the log table: the log table looks like the application table, with some additional technical columns EXECUTE 'DROP TABLE IF EXISTS ' || v_logTableName; EXECUTE 'CREATE TABLE ' || v_logTableName || ' ( LIKE ' || v_fullTableName || ') TABLESPACE ' || v_emajTblSpace; EXECUTE 'ALTER TABLE ' || v_logTableName || ' ADD COLUMN emaj_verb VARCHAR(3),' || ' ADD COLUMN emaj_tuple VARCHAR(3),' || ' ADD COLUMN emaj_gid BIGINT NOT NULL DEFAULT nextval(''emaj.emaj_global_seq''),' || ' ADD COLUMN emaj_changed TIMESTAMPTZ DEFAULT clock_timestamp(),' || ' ADD COLUMN emaj_txid BIGINT DEFAULT emaj._txid_current(),' || ' ADD COLUMN emaj_user VARCHAR(32) DEFAULT session_user,' || ' ADD COLUMN emaj_user_ip INET DEFAULT inet_client_addr()'; -- creation of the index on the log table IF v_pgVersion >= '8.3' THEN EXECUTE 'CREATE UNIQUE INDEX ' || v_logIdxName || ' ON ' || v_logTableName || ' (emaj_gid, emaj_tuple DESC) TABLESPACE ' || v_emajTblSpace; ELSE -- in 8.2, DESC clause doesn't exist. So the index cannot be used at rollback time. -- It only enforces the uniqueness of (emaj_gid, emaj_tuple) EXECUTE 'CREATE UNIQUE INDEX ' || v_logIdxName || ' ON ' || v_logTableName || ' (emaj_gid, emaj_tuple) TABLESPACE ' || v_emajTblSpace; END IF; -- remove the NOT NULL constraints of application columns. -- They are useless and blocking to store truncate event for tables belonging to audit_only tables FOR r_column IN SELECT ' ALTER COLUMN ' || quote_ident(attname) || ' DROP NOT NULL' AS action FROM pg_attribute, pg_class, pg_namespace WHERE relnamespace = pg_namespace.oid AND attrelid = pg_class.oid AND nspname = v_emajSchema AND relname = v_schemaName || '_' || v_tableName || '_log' AND attnum > 0 AND attnotnull AND attisdropped = false AND attname NOT LIKE E'emaj\\_%' LOOP IF v_stmt = '' THEN v_stmt = v_stmt || r_column.action; ELSE v_stmt = v_stmt || ',' || r_column.action; END IF; END LOOP; IF v_stmt <> '' THEN EXECUTE 'ALTER TABLE ' || v_logTableName || v_stmt; END IF; -- create the sequence associated to the log table EXECUTE 'CREATE SEQUENCE ' || v_sequenceName; -- creation of the log fonction that will be mapped to the log trigger later -- The new row is logged for each INSERT, the old row is logged for each DELETE -- and the old and the new rows are logged for each UPDATE. EXECUTE 'CREATE or REPLACE FUNCTION ' || v_logFnctName || '() RETURNS trigger AS $logfnct$' || 'BEGIN' -- The sequence associated to the log table is incremented at the beginning of the function ... || ' PERFORM NEXTVAL(' || quote_literal(v_sequenceName) || ');' -- ... and the global id sequence is incremented by the first/only INSERT into the log table. || ' IF (TG_OP = ''DELETE'') THEN' || ' INSERT INTO ' || v_logTableName || ' SELECT OLD.*, ''DEL'', ''OLD'';' || ' RETURN OLD;' || ' ELSIF (TG_OP = ''UPDATE'') THEN' || ' INSERT INTO ' || v_logTableName || ' SELECT OLD.*, ''UPD'', ''OLD'';' || ' INSERT INTO ' || v_logTableName || ' SELECT NEW.*, ''UPD'', ''NEW'', lastval();' || ' RETURN NEW;' || ' ELSIF (TG_OP = ''INSERT'') THEN' || ' INSERT INTO ' || v_logTableName || ' SELECT NEW.*, ''INS'', ''NEW'';' || ' RETURN NEW;' || ' END IF;' || ' RETURN NULL;' || 'END;' || '$logfnct$ LANGUAGE plpgsql SECURITY DEFINER;'; -- creation of the log trigger on the application table, using the previously created log function -- But the trigger is not immediately activated (it will be at emaj_start_group time) EXECUTE 'DROP TRIGGER IF EXISTS ' || v_logTriggerName || ' ON ' || v_fullTableName; EXECUTE 'CREATE TRIGGER ' || v_logTriggerName || ' AFTER INSERT OR UPDATE OR DELETE ON ' || v_fullTableName || ' FOR EACH ROW EXECUTE PROCEDURE ' || v_logFnctName || '()'; EXECUTE 'ALTER TABLE ' || v_fullTableName || ' DISABLE TRIGGER ' || v_logTriggerName; -- creation of the trigger that manage any TRUNCATE on the application table -- But the trigger is not immediately activated (it will be at emaj_start_group time) IF v_pgVersion >= '8.4' THEN EXECUTE 'DROP TRIGGER IF EXISTS ' || v_truncTriggerName || ' ON ' || v_fullTableName; IF v_isRollbackable THEN -- For rollbackable groups, use the common _forbid_truncate_fnct() function that blocks the operation EXECUTE 'CREATE TRIGGER ' || v_truncTriggerName || ' BEFORE TRUNCATE ON ' || v_fullTableName || ' FOR EACH STATEMENT EXECUTE PROCEDURE emaj._forbid_truncate_fnct()'; ELSE -- For audit_only groups, use the common _log_truncate_fnct() function that records the operation into the log table EXECUTE 'CREATE TRIGGER ' || v_truncTriggerName || ' BEFORE TRUNCATE ON ' || v_fullTableName || ' FOR EACH STATEMENT EXECUTE PROCEDURE emaj._log_truncate_fnct()'; END IF; EXECUTE 'ALTER TABLE ' || v_fullTableName || ' DISABLE TRIGGER ' || v_truncTriggerName; END IF; -- -- create the rollback function, if the table belongs to a rollbackable group -- IF v_isRollbackable THEN -- First build some pieces of the CREATE FUNCTION statement -- build the tables's columns list -- and the SET clause for the UPDATE, from the same columns list v_colList := ''; v_valList := ''; v_setList := ''; OPEN col1_curs (v_fullTableName); LOOP FETCH col1_curs INTO v_attname; EXIT WHEN NOT FOUND; IF v_colList = '' THEN v_colList := quote_ident(v_attname); v_valList := 'rec_log.' || quote_ident(v_attname); v_setList := quote_ident(v_attname) || ' = rec_old_log.' || quote_ident(v_attname); ELSE v_colList := v_colList || ', ' || quote_ident(v_attname); v_valList := v_valList || ', rec_log.' || quote_ident(v_attname); v_setList := v_setList || ', ' || quote_ident(v_attname) || ' = rec_old_log.' || quote_ident(v_attname); END IF; END LOOP; CLOSE col1_curs; -- build "equality on the primary key" conditions, from the list of the primary key's columns v_pkCondList := ''; OPEN col2_curs (v_fullTableName); LOOP FETCH col2_curs INTO v_attname; EXIT WHEN NOT FOUND; IF v_pkCondList = '' THEN v_pkCondList := quote_ident(v_attname) || ' = rec_log.' || quote_ident(v_attname); ELSE v_pkCondList := v_pkCondList || ' AND ' || quote_ident(v_attname) || ' = rec_log.' || quote_ident(v_attname); END IF; END LOOP; CLOSE col2_curs; -- Then create the rollback function associated to the table -- At execution, it will loop on each row from the log table in reverse order -- It will insert the old deleted rows, delete the new inserted row -- and update the new rows by setting back the old rows -- The function returns the number of rollbacked elementary operations or rows -- All these functions will be called by the emaj_rlbk_tbl function, which is activated by the -- emaj_rollback_group function EXECUTE 'CREATE or REPLACE FUNCTION ' || v_rlbkFnctName || ' (v_lastGlobalSeq BIGINT)' || ' RETURNS BIGINT AS $rlbkfnct$' || ' DECLARE' || ' v_nb_rows BIGINT := 0;' || ' v_nb_proc_rows INTEGER;' || ' rec_log ' || v_logTableName || '%ROWTYPE;' || ' rec_old_log ' || v_logTableName || '%ROWTYPE;' || ' log_curs CURSOR FOR ' || ' SELECT * FROM ' || v_logTableName || ' WHERE emaj_gid > v_lastGlobalSeq ' || ' ORDER BY emaj_gid DESC, emaj_tuple;' || ' BEGIN' || ' OPEN log_curs;' || ' LOOP ' || ' FETCH log_curs INTO rec_log;' || ' EXIT WHEN NOT FOUND;' || ' IF rec_log.emaj_verb = ''INS'' THEN' -- || ' RAISE NOTICE ''emaj_gid = % ; INS'', rec_log.emaj_gid;' || ' DELETE FROM ' || v_fullTableName || ' WHERE ' || v_pkCondList || ';' || ' ELSIF rec_log.emaj_verb = ''UPD'' THEN' -- || ' RAISE NOTICE ''emaj_gid = % ; UPD ; %'', rec_log.emaj_gid,rec_log.emaj_tuple;' || ' FETCH log_curs into rec_old_log;' -- || ' RAISE NOTICE ''emaj_gid = % ; UPD ; %'', rec_old_log.emaj_gid,rec_old_log.emaj_tuple;' || ' UPDATE ' || v_fullTableName || ' SET ' || v_setList || ' WHERE ' || v_pkCondList || ';' || ' ELSIF rec_log.emaj_verb = ''DEL'' THEN' -- || ' RAISE NOTICE ''emaj_gid = % ; DEL'', rec_log.emaj_gid;' || ' INSERT INTO ' || v_fullTableName || ' (' || v_colList || ') VALUES (' || v_valList || ');' || ' ELSE' || ' RAISE EXCEPTION ' || v_exceptionRlbkFnctName || ': internal error - emaj_verb = % is unknown, emaj_gid = %.'',' || ' rec_log.emaj_verb, rec_log.emaj_gid;' || ' END IF;' || ' GET DIAGNOSTICS v_nb_proc_rows = ROW_COUNT;' || ' IF v_nb_proc_rows <> 1 THEN' || ' RAISE EXCEPTION ' || v_exceptionRlbkFnctName || ': internal error - emaj_verb = %, emaj_gid = %, # processed rows = % .''' || ' ,rec_log.emaj_verb, rec_log.emaj_gid, v_nb_proc_rows;' || ' END IF;' || ' v_nb_rows := v_nb_rows + 1;' || ' END LOOP;' || ' CLOSE log_curs;' -- || ' RAISE NOTICE ''Table ' || v_fullTableName || ' -> % rollbacked rows.'', v_nb_rows;' || ' RETURN v_nb_rows;' || ' END;' || '$rlbkfnct$ LANGUAGE plpgsql;'; END IF; -- check if the table has (neither internal - ie. created for fk - nor previously created by emaj) trigger, -- This check is not done for postgres 8.2 because column tgconstraint doesn't exist IF v_pgVersion >= '8.3' THEN FOR r_trigger IN SELECT tgname FROM pg_trigger WHERE tgrelid = v_fullTableName::regclass AND tgconstraint = 0 AND tgname NOT LIKE E'%emaj\\_%\\_trg' LOOP IF v_triggerList = '' THEN v_triggerList = v_triggerList || r_trigger.tgname; ELSE v_triggerList = v_triggerList || ', ' || r_trigger.tgname; END IF; END LOOP; -- if yes, issue a warning (if a trigger updates another table in the same table group or outside) it could generate problem at rollback time) IF v_triggerList <> '' THEN RAISE WARNING '_create_tbl: table % has triggers (%). Verify the compatibility with emaj rollback operations (in particular if triggers update one or several other tables). Triggers may have to be manualy disabled before rollback.', v_fullTableName, v_triggerList; END IF; END IF; -- grant appropriate rights to both emaj roles EXECUTE 'GRANT SELECT ON TABLE ' || v_logTableName || ' TO emaj_viewer'; EXECUTE 'GRANT ALL PRIVILEGES ON TABLE ' || v_logTableName || ' TO emaj_adm'; EXECUTE 'GRANT SELECT ON SEQUENCE ' || v_sequenceName || ' TO emaj_viewer'; EXECUTE 'GRANT ALL PRIVILEGES ON SEQUENCE ' || v_sequenceName || ' TO emaj_adm'; RETURN; END; $_create_tbl$; CREATE or REPLACE FUNCTION emaj._drop_tbl(v_schemaName TEXT, v_tableName TEXT, v_isRollbackable BOOLEAN) RETURNS void LANGUAGE plpgsql SECURITY DEFINER AS $_drop_tbl$ -- The function deletes all what has been created by _create_tbl function -- Required inputs: schema name (mandatory even if "public") and table name -- The function is defined as SECURITY DEFINER so that emaj_adm role can use it even if he is not the owner of the application table. DECLARE v_emajSchema TEXT := 'emaj'; v_pgVersion TEXT := emaj._pg_version(); v_fullTableName TEXT; v_logTableName TEXT; v_logFnctName TEXT; v_rlbkFnctName TEXT; v_logTriggerName TEXT; v_truncTriggerName TEXT; v_seqName TEXT; v_fullSeqName TEXT; BEGIN v_fullTableName := quote_ident(v_schemaName) || '.' || quote_ident(v_tableName); v_logTableName := quote_ident(v_emajSchema) || '.' || quote_ident(v_schemaName || '_' || v_tableName || '_log'); v_logFnctName := quote_ident(v_emajSchema) || '.' || quote_ident(v_schemaName || '_' || v_tableName || '_log_fnct'); v_rlbkFnctName := quote_ident(v_emajSchema) || '.' || quote_ident(v_schemaName || '_' || v_tableName || '_rlbk_fnct'); v_logTriggerName := quote_ident(v_schemaName || '_' || v_tableName || '_emaj_log_trg'); v_truncTriggerName := quote_ident(v_schemaName || '_' || v_tableName || '_emaj_trunc_trg'); v_seqName := emaj._build_log_seq_name(v_schemaName, v_tableName); v_fullSeqName := quote_ident(v_emajSchema) || '.' || quote_ident(v_seqName); -- delete the log trigger on the application table EXECUTE 'DROP TRIGGER IF EXISTS ' || v_logTriggerName || ' ON ' || v_fullTableName; -- delete the truncate trigger on the application table IF v_pgVersion >= '8.4' THEN EXECUTE 'DROP TRIGGER IF EXISTS ' || v_truncTriggerName || ' ON ' || v_fullTableName; END IF; -- delete log and rollback functions, EXECUTE 'DROP FUNCTION IF EXISTS ' || v_logFnctName || '()'; IF v_isRollbackable THEN EXECUTE 'DROP FUNCTION IF EXISTS ' || v_rlbkFnctName || '(bigint)'; END IF; -- delete the sequence associated to the log table EXECUTE 'DROP SEQUENCE IF EXISTS ' || v_fullSeqName; -- delete the log table EXECUTE 'DROP TABLE IF EXISTS ' || v_logTableName || ' CASCADE'; -- delete rows related to the log sequence from emaj_sequence table DELETE FROM emaj.emaj_sequence WHERE sequ_schema = 'emaj' AND sequ_name = v_seqName; -- delete rows related to the table from emaj_seq_hole table DELETE FROM emaj.emaj_seq_hole WHERE sqhl_schema = quote_ident(v_schemaName) AND sqhl_table = quote_ident(v_tableName); RETURN; END; $_drop_tbl$; CREATE or REPLACE FUNCTION emaj._drop_seq(v_schemaName TEXT, v_seqName TEXT) RETURNS void LANGUAGE plpgsql AS $_drop_seq$ -- The function deletes the rows stored into emaj_sequence for a particular sequence -- Required inputs: schema name and sequence name BEGIN -- delete rows from emaj_sequence EXECUTE 'DELETE FROM emaj.emaj_sequence WHERE sequ_schema = ' || quote_literal(v_schemaName) || ' AND sequ_name = ' || quote_literal(v_seqName); RETURN; END; $_drop_seq$; CREATE or REPLACE FUNCTION emaj._rlbk_tbl(v_schemaName TEXT, v_tableName TEXT, v_lastGlobalSeq BIGINT, v_timestamp TIMESTAMPTZ, v_disableTrigger BOOLEAN, v_deleteLog BOOLEAN, v_lastSequenceId BIGINT, v_lastSeqHoleId BIGINT) RETURNS void LANGUAGE plpgsql SECURITY DEFINER AS $_rlbk_tbl$ -- This function rollbacks one table to a given timestamp -- The function is called by emaj._rlbk_groups_step5() -- Input: schema name and table name, global sequence value limit for rollback, mark timestamp, -- flag to specify if log trigger must be disable during rollback operation, -- flag to specify if rollbacked log rows must be deleted, -- last sequence and last hole identifiers to keep (greater ones being to be deleted) -- These flags must be respectively: -- - true and true for common (unlogged) rollback, -- - false and false for logged rollback, -- - true and false for unlogged rollback with undeleted log rows (now deleted emaj_rollback_and_stop_group function) -- The function is defined as SECURITY DEFINER so that emaj_adm role can use it even if he is not the owner of the application table. DECLARE v_emajSchema TEXT := 'emaj'; v_fullTableName TEXT; v_logTableName TEXT; v_rlbkFnctName TEXT; v_logTriggerName TEXT; v_seqName TEXT; v_fullSeqName TEXT; v_nb_rows BIGINT; v_tsrlbk_start TIMESTAMP; v_tsrlbk_end TIMESTAMP; v_tsdel_start TIMESTAMP; v_tsdel_end TIMESTAMP; BEGIN v_fullTableName := quote_ident(v_schemaName) || '.' || quote_ident(v_tableName); v_logTableName := quote_ident(v_emajSchema) || '.' || quote_ident(v_schemaName || '_' || v_tableName || '_log'); v_rlbkFnctName := quote_ident(v_emajSchema) || '.' || quote_ident(v_schemaName || '_' || v_tableName || '_rlbk_fnct'); v_logTriggerName := quote_ident(v_schemaName || '_' || v_tableName || '_emaj_log_trg'); v_seqName := emaj._build_log_seq_name(v_schemaName, v_tableName); v_fullSeqName := quote_ident(v_emajSchema) || '.' || quote_ident(v_seqName); -- insert begin event in history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('ROLLBACK_TABLE', 'BEGIN', v_fullTableName, 'All log rows with emaj_gid > ' || v_lastGlobalSeq); -- deactivate the log trigger on the application table, if needed (unlogged rollback) IF v_disableTrigger THEN EXECUTE 'ALTER TABLE ' || v_fullTableName || ' DISABLE TRIGGER ' || v_logTriggerName; END IF; -- record the time at the rollback start SELECT clock_timestamp() INTO v_tsrlbk_start; -- rollback the table EXECUTE 'SELECT ' || v_rlbkFnctName || '(' || v_lastGlobalSeq || ')' INTO v_nb_rows; -- record the time at the rollback SELECT clock_timestamp() INTO v_tsrlbk_end; -- insert rollback duration into the emaj_rlbk_stat table, if at least 1 row has been processed IF v_nb_rows > 0 THEN INSERT INTO emaj.emaj_rlbk_stat (rlbk_operation, rlbk_schema, rlbk_tbl_fk, rlbk_datetime, rlbk_nb_rows, rlbk_duration) VALUES ('rlbk', v_schemaName, v_tableName, v_tsrlbk_start, v_nb_rows, v_tsrlbk_end - v_tsrlbk_start); END IF; -- if the caller requires it, suppress the rollbacked log part IF v_deleteLog THEN -- record the time at the delete start SELECT clock_timestamp() INTO v_tsdel_start; -- delete obsolete log rows EXECUTE 'DELETE FROM ' || v_logTableName || ' WHERE emaj_gid > ' || v_lastGlobalSeq; -- ... and suppress from emaj_sequence table the rows regarding the emaj log sequence for this application table -- corresponding to potential later intermediate marks that disappear with the rollback operation DELETE FROM emaj.emaj_sequence WHERE sequ_schema = v_emajSchema AND sequ_name = v_seqName AND sequ_id > v_lastSequenceId; -- record the sequence holes generated by the delete operation -- this is due to the fact that log sequences are not rollbacked, this information will be used by the emaj_log_stat_group -- function (and indirectly by emaj_estimate_rollback_duration()) -- first delete, if exist, sequence holes that have disappeared with the rollback DELETE FROM emaj.emaj_seq_hole WHERE sqhl_schema = v_schemaName AND sqhl_table = v_tableName AND sqhl_id > v_lastSeqHoleId; -- and then insert the new sequence hole EXECUTE 'INSERT INTO emaj.emaj_seq_hole (sqhl_schema, sqhl_table, sqhl_hole_size) VALUES (' || quote_literal(v_schemaName) || ',' || quote_literal(v_tableName) || ', (' || ' SELECT CASE WHEN is_called THEN last_value + increment_by ELSE last_value END FROM ' || v_fullSeqName || ')-(' || ' SELECT CASE WHEN sequ_is_called THEN sequ_last_val + sequ_increment ELSE sequ_last_val END FROM ' || ' emaj.emaj_sequence WHERE' || ' sequ_schema = ''' || v_emajSchema || ''' AND sequ_name = ' || quote_literal(v_seqName) || ' AND sequ_datetime = ' || quote_literal(v_timestamp) || '))'; -- record the time at the delete SELECT clock_timestamp() INTO v_tsdel_end; -- insert delete duration into the emaj_rlbk_stat table, if at least 1 row has been processed IF v_nb_rows > 0 THEN INSERT INTO emaj.emaj_rlbk_stat (rlbk_operation, rlbk_schema, rlbk_tbl_fk, rlbk_datetime, rlbk_nb_rows, rlbk_duration) VALUES ('del_log', v_schemaName, v_tableName, v_tsrlbk_start, v_nb_rows, v_tsdel_end - v_tsdel_start); END IF; END IF; -- re-activate the log trigger on the application table, if previously disabled IF v_disableTrigger THEN EXECUTE 'ALTER TABLE ' || v_fullTableName || ' ENABLE TRIGGER ' || v_logTriggerName; END IF; -- insert end event in history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('ROLLBACK_TABLE', 'END', v_fullTableName, v_nb_rows || ' rollbacked rows'); RETURN; END; $_rlbk_tbl$; CREATE or REPLACE FUNCTION emaj._rlbk_seq(v_schemaName TEXT, v_seqName TEXT, v_timestamp TIMESTAMPTZ, v_deleteLog BOOLEAN, v_lastSequenceId BIGINT) RETURNS void LANGUAGE plpgsql SECURITY DEFINER AS $_rlbk_seq$ -- This function rollbacks one sequence to a given mark -- The function is called by emaj.emaj._rlbk_groups_step7() -- Input: schema name and table name, mark -- The function is defined as SECURITY DEFINER so that emaj_adm role can use it even if he is not the owner of the application sequence. DECLARE v_pgVersion TEXT := emaj._pg_version(); v_fullSeqName TEXT; v_stmt TEXT; mark_seq_rec RECORD; curr_seq_rec RECORD; BEGIN -- Read sequence's characteristics at mark time BEGIN SELECT sequ_schema, sequ_name, sequ_mark, sequ_last_val, sequ_start_val, sequ_increment, sequ_max_val, sequ_min_val, sequ_cache_val, sequ_is_cycled, sequ_is_called INTO STRICT mark_seq_rec FROM emaj.emaj_sequence WHERE sequ_schema = v_schemaName AND sequ_name = v_seqName AND sequ_datetime = v_timestamp; EXCEPTION WHEN NO_DATA_FOUND THEN RAISE EXCEPTION '_rlbk_seq: Mark at % not found for sequence %.%.', v_timestamp, v_schemaName, v_seqName; WHEN TOO_MANY_ROWS THEN RAISE EXCEPTION '_rlbk_seq: Internal error 1.'; END; -- Read the current sequence's characteristics v_fullSeqName := quote_ident(v_schemaName) || '.' || quote_ident(v_seqName); v_stmt = 'SELECT last_value, '; IF v_pgVersion <= '8.3' THEN v_stmt = v_stmt || '0 as start_value, '; ELSE v_stmt = v_stmt || 'start_value, '; END IF; v_stmt = v_stmt || 'increment_by, max_value, min_value, cache_value, is_cycled, is_called FROM ' || v_fullSeqName; EXECUTE v_stmt INTO STRICT curr_seq_rec; -- Build the ALTER SEQUENCE statement, depending on the differences between the present values and the related -- values at the requested mark time v_stmt=''; IF curr_seq_rec.last_value <> mark_seq_rec.sequ_last_val OR curr_seq_rec.is_called <> mark_seq_rec.sequ_is_called THEN IF mark_seq_rec.sequ_is_called THEN v_stmt=v_stmt || ' RESTART ' || mark_seq_rec.sequ_last_val + mark_seq_rec.sequ_increment; ELSE v_stmt=v_stmt || ' RESTART ' || mark_seq_rec.sequ_last_val; END IF; END IF; IF curr_seq_rec.start_value <> mark_seq_rec.sequ_start_val THEN v_stmt=v_stmt || ' START ' || mark_seq_rec.sequ_start_val; END IF; IF curr_seq_rec.increment_by <> mark_seq_rec.sequ_increment THEN v_stmt=v_stmt || ' INCREMENT ' || mark_seq_rec.sequ_increment; END IF; IF curr_seq_rec.min_value <> mark_seq_rec.sequ_min_val THEN v_stmt=v_stmt || ' MINVALUE ' || mark_seq_rec.sequ_min_val; END IF; IF curr_seq_rec.max_value <> mark_seq_rec.sequ_max_val THEN v_stmt=v_stmt || ' MAXVALUE ' || mark_seq_rec.sequ_max_val; END IF; IF curr_seq_rec.cache_value <> mark_seq_rec.sequ_cache_val THEN v_stmt=v_stmt || ' CACHE ' || mark_seq_rec.sequ_cache_val; END IF; IF curr_seq_rec.is_cycled <> mark_seq_rec.sequ_is_cycled THEN IF mark_seq_rec.sequ_is_cycled = 'f' THEN v_stmt=v_stmt || ' NO '; END IF; v_stmt=v_stmt || ' CYCLE '; END IF; -- and execute the statement if at least one parameter has changed IF v_stmt <> '' THEN -- RAISE NOTICE 'Rollback sequence % with%', v_fullSeqName, v_stmt; EXECUTE 'ALTER SEQUENCE ' || v_fullSeqName || v_stmt; END IF; -- if the caller requires it, delete the rollbacked intermediate sequences from the sequence table IF v_deleteLog THEN DELETE FROM emaj.emaj_sequence WHERE sequ_schema = v_schemaName AND sequ_name = v_seqName AND sequ_id > v_lastSequenceId; END IF; -- insert event in history INSERT INTO emaj.emaj_hist (hist_function, hist_object, hist_wording) VALUES ('ROLLBACK_SEQUENCE', v_fullSeqName, substr(v_stmt,2)); RETURN; END; $_rlbk_seq$; CREATE or REPLACE FUNCTION emaj._log_stat_table(v_schemaName TEXT, v_tableName TEXT, v_tsFirstMark TIMESTAMPTZ, v_tsLastMark TIMESTAMPTZ, v_firstLastSeqHoleId BIGINT, v_lastLastSeqHoleId BIGINT) RETURNS BIGINT LANGUAGE plpgsql AS $_log_stat_table$ -- This function returns the number of log rows for a single table between 2 marks or between a mark and the current situation. -- It is called by emaj_log_stat_group function -- These statistics are computed using the serial id of log tables and holes is sequences recorded into emaj_seq_hole at rollback time -- Input: schema name and table name, the timestamps of both marks, the emaj_seq_hole last id of both marks -- a NULL value as last timestamp mark indicates the current situation -- Output: number of log rows between both marks for the table DECLARE v_emajSchema TEXT := 'emaj'; v_fullSeqName TEXT; v_beginLastValue BIGINT; v_endLastValue BIGINT; v_sumHole BIGINT; BEGIN -- get the log table id at first mark time SELECT CASE WHEN sequ_is_called THEN sequ_last_val ELSE sequ_last_val - sequ_increment END INTO v_beginLastValue FROM emaj.emaj_sequence WHERE sequ_schema = v_emajSchema AND sequ_name = emaj._build_log_seq_name(v_schemaName,v_tableName) AND sequ_datetime = v_tsFirstMark; IF v_tsLastMark IS NULL THEN -- last mark is NULL, so examine the current state of the log table id v_fullSeqName := quote_ident(v_emajSchema) || '.' || quote_ident(emaj._build_log_seq_name(v_schemaName, v_tableName)); EXECUTE 'SELECT CASE WHEN is_called THEN last_value ELSE last_value - increment_by END FROM ' || v_fullSeqName INTO v_endLastValue; -- and count the sum of hole from the start mark time until now SELECT coalesce(sum(sqhl_hole_size),0) INTO v_sumHole FROM emaj.emaj_seq_hole WHERE sqhl_schema = v_schemaName AND sqhl_table = v_tableName AND sqhl_id > v_firstLastSeqHoleId; ELSE -- last mark is not NULL, so get the log table id at last mark time SELECT CASE WHEN sequ_is_called THEN sequ_last_val ELSE sequ_last_val - sequ_increment END INTO v_endLastValue FROM emaj.emaj_sequence WHERE sequ_schema = v_emajSchema AND sequ_name = emaj._build_log_seq_name(v_schemaName,v_tableName) AND sequ_datetime = v_tsLastMark; -- and count the sum of hole from the start mark time to the end mark time SELECT coalesce(sum(sqhl_hole_size),0) INTO v_sumHole FROM emaj.emaj_seq_hole WHERE sqhl_schema = v_schemaName AND sqhl_table = v_tableName AND sqhl_id > v_firstLastSeqHoleId AND sqhl_id <= v_lastLastSeqHoleId; END IF; -- return the stat row for the table RETURN (v_endLastValue - v_beginLastValue - v_sumHole); END; $_log_stat_table$; CREATE or REPLACE FUNCTION emaj.emaj_verify_all() RETURNS SETOF TEXT LANGUAGE plpgsql AS $emaj_verify_all$ -- The function verifies the consistency between all emaj objects present inside emaj schema and -- emaj objects related to tables and sequences referenced in emaj_relation table. -- It returns a set of warning messages for discovered discrepancies. If no error is detected, a single row is returned. DECLARE v_emajSchema TEXT := 'emaj'; v_pgVersion TEXT := emaj._pg_version(); v_finalMsg TEXT := 'Global checking: no error encountered'; r_object RECORD; r_group RECORD; BEGIN -- detect if the current postgres version is at least 8.1 IF v_pgVersion < '8.2' THEN RETURN NEXT 'Global checking: the current postgres version (' || version() || ') is not compatible with E-Maj.'; v_finalMsg = ''; END IF; -- detect log tables that don't correspond to a row in the groups table FOR r_object IN SELECT 'Global checking: table ' || relname || ' is not linked to an application table declared in the emaj_relation table' AS msg FROM pg_class, pg_namespace WHERE relnamespace = pg_namespace.oid AND nspname = v_emajSchema AND relkind = 'r' AND relname LIKE E'%\\_log' AND relname NOT IN (SELECT rel_schema || '_' || rel_tblseq || '_log' FROM emaj.emaj_relation) LOOP RETURN NEXT r_object.msg; v_finalMsg = ''; END LOOP; -- verify that all log, rollback and truncate functions correspond to a row in the groups table FOR r_object IN SELECT 'Global checking: function ' || proname || ' is not linked to an application table declared in the emaj_relation table' AS msg FROM pg_proc, pg_namespace WHERE pronamespace = pg_namespace.oid AND nspname = v_emajSchema AND ((proname LIKE E'%\\_log\\_fnct' AND proname NOT IN ( SELECT rel_schema || '_' || rel_tblseq || '_log_fnct' FROM emaj.emaj_relation)) OR (proname LIKE E'%\\_rlbk\\_fnct' AND proname NOT IN ( SELECT rel_schema || '_' || rel_tblseq || '_rlbk_fnct' FROM emaj.emaj_relation)) ) LOOP RETURN NEXT r_object.msg; v_finalMsg = ''; END LOOP; -- final message for global check if no error has been yet detected IF v_finalMsg <> '' THEN RETURN NEXT v_finalMsg; END IF; -- verify all groups defined in emaj_group FOR r_group IN SELECT group_name FROM emaj.emaj_group ORDER BY 1 LOOP FOR r_object IN SELECT msg FROM emaj._verify_group(r_group.group_name, false) msg LOOP RETURN NEXT r_object.msg; END LOOP; END LOOP; RETURN; END; $emaj_verify_all$; COMMENT ON FUNCTION emaj.emaj_verify_all() IS $$Verifies the consistency between existing E-Maj and application objects.$$; CREATE or REPLACE FUNCTION emaj._forbid_truncate_fnct() RETURNS TRIGGER AS $_forbid_truncate_fnct$ -- The function is triggered by the execution of TRUNCATE SQL verb on tables of a rollbackable group -- in logging mode. -- It can only be called with postgresql in a version greater or equal 8.4 BEGIN IF (TG_OP = 'TRUNCATE') THEN RAISE EXCEPTION 'emaj._forbid_truncate_fnct: TRUNCATE is not allowed while updates on this table (%.%) are currently protected by E-Maj. Consider stopping the group before issuing a TRUNCATE.', TG_TABLE_SCHEMA, TG_TABLE_NAME; END IF; RETURN NULL; END; $_forbid_truncate_fnct$ LANGUAGE plpgsql SECURITY DEFINER; CREATE or REPLACE FUNCTION emaj._log_truncate_fnct() RETURNS TRIGGER AS $_log_truncate_fnct$ -- The function is triggered by the execution of TRUNCATE SQL verb on tables of an audit_only group -- in logging mode. -- It can only be called with postgresql in a version greater or equal 8.4 DECLARE v_emajSchema TEXT := 'emaj'; v_logTableName TEXT; BEGIN IF (TG_OP = 'TRUNCATE') THEN v_logTableName := quote_ident(v_emajSchema) || '.' || quote_ident(TG_TABLE_SCHEMA || '_' || TG_TABLE_NAME || '_log'); EXECUTE 'INSERT INTO ' || v_logTableName || ' (emaj_verb) VALUES (''TRU'')'; END IF; RETURN NULL; END; $_log_truncate_fnct$ LANGUAGE plpgsql SECURITY DEFINER; ------------------------------------------------ ---- ---- ---- Functions to manage groups ---- ---- ---- ------------------------------------------------ CREATE or REPLACE FUNCTION emaj._verify_group(v_groupName TEXT, v_onErrorStop boolean) RETURNS SETOF TEXT LANGUAGE plpgsql AS $_verify_group$ -- The function verifies the consistency between log and application tables for a group -- Input: group name, boolean to specify if a detected error must raise an exception -- If onErrorStop boolean is false, it returns a set of warning messages for discovered discrepancies. -- If no error is detected, a single row is returned. DECLARE v_emajSchema TEXT := 'emaj'; v_pgVersion TEXT := emaj._pg_version(); v_finalMsg TEXT; v_isRollbackable BOOLEAN; v_creationPgVersion TEXT; v_msgPrefix TEXT; v_msg TEXT; v_fullTableName TEXT; v_logTableName TEXT; v_logFnctName TEXT; v_rlbkFnctName TEXT; v_logTriggerName TEXT; v_truncTriggerName TEXT; r_tblsq RECORD; BEGIN -- for 8.1-, E-Maj is not compatible IF v_pgVersion < '8.2' THEN RAISE EXCEPTION 'The current postgres version (%) is not compatible with E-Maj.', version(); END IF; -- get some characteristics of the group SELECT group_is_rollbackable, group_pg_version INTO v_isRollbackable, v_creationPgVersion FROM emaj.emaj_group WHERE group_name = v_groupName; IF NOT FOUND THEN RAISE EXCEPTION '_verify_group: group % has not been created.', v_groupName; END IF; -- Build message parts v_msgPrefix = 'Checking tables group ' || v_groupName || ': '; v_finalMsg = v_msgPrefix || 'no error encountered'; -- check the postgres version at creation time is compatible with the current version -- Warning: comparisons on version numbers are alphanumeric. -- But we suppose these tests will not be useful anymore when pg 10.0 will appear! -- for 8.2 and 8.3, both major versions must be the same IF ((v_pgVersion = '8.2' OR v_pgVersion = '8.3') AND substring (v_creationPgVersion FROM E'(\\d+\\.\\d+)') <> v_pgVersion) OR -- for 8.4+, both major versions must be 8.4+ (v_pgVersion >= '8.4' AND substring (v_creationPgVersion FROM E'(\\d+\\.\\d+)') < '8.4') THEN v_msg = v_msgPrefix || 'the group has been created with a non compatible postgresql version (' || v_creationPgVersion || '). It must be dropped and recreated.'; if v_onErrorStop THEN RAISE EXCEPTION '_verify_group: %',v_msg; END IF; RETURN NEXT v_msg; v_finalMsg = ''; END IF; -- per table verifications FOR r_tblsq IN SELECT rel_priority, rel_schema, rel_tblseq, rel_kind FROM emaj.emaj_relation WHERE rel_group = v_groupName ORDER BY rel_priority, rel_schema, rel_tblseq LOOP -- check the class is unchanged IF r_tblsq.rel_kind <> emaj._check_class(r_tblsq.rel_schema, r_tblsq.rel_tblseq) THEN v_msg = v_msgPrefix || 'the relation type for ' || r_tblsq.rel_schema || '.' || r_tblsq.rel_tblseq || ' has changed (was ''' || r_tblsq.rel_kind || ''' at emaj_create_group time).'; if v_onErrorStop THEN RAISE EXCEPTION '_verify_group: %',v_msg; END IF; RETURN NEXT v_msg; v_finalMsg = ''; END IF; IF r_tblsq.rel_kind = 'r' THEN -- if it is a table, ... v_logTableName := r_tblsq.rel_schema || '_' || r_tblsq.rel_tblseq || '_log'; v_logFnctName := r_tblsq.rel_schema || '_' || r_tblsq.rel_tblseq || '_log_fnct'; v_rlbkFnctName := r_tblsq.rel_schema || '_' || r_tblsq.rel_tblseq || '_rlbk_fnct'; v_logTriggerName := r_tblsq.rel_schema || '_' || r_tblsq.rel_tblseq || '_emaj_log_trg'; v_truncTriggerName := r_tblsq.rel_schema || '_' || r_tblsq.rel_tblseq || '_emaj_trunc_trg'; v_fullTableName := quote_ident(r_tblsq.rel_schema) || '.' || quote_ident(r_tblsq.rel_tblseq); -- -> check boths functions exists PERFORM proname FROM pg_proc , pg_namespace WHERE pronamespace = pg_namespace.oid AND nspname = v_emajSchema AND proname = v_logFnctName; IF NOT FOUND THEN v_msg = v_msgPrefix || 'log function ' || v_logFnctName || ' not found.'; if v_onErrorStop THEN RAISE EXCEPTION '_verify_group: %',v_msg; END IF; RETURN NEXT v_msg; v_finalMsg = ''; END IF; IF v_isRollbackable THEN PERFORM proname FROM pg_proc , pg_namespace WHERE pronamespace = pg_namespace.oid AND nspname = v_emajSchema AND proname = v_rlbkFnctName; IF NOT FOUND THEN v_msg = v_msgPrefix || 'rollback function ' || v_rlbkFnctName || ' not found.'; if v_onErrorStop THEN RAISE EXCEPTION '_verify_group: %',v_msg; END IF; RETURN NEXT v_msg; v_finalMsg = ''; END IF; END IF; -- -> check both triggers exist PERFORM tgname FROM pg_trigger WHERE tgname = v_logTriggerName; IF NOT FOUND THEN v_msg = v_msgPrefix || 'log trigger ' || v_logTriggerName || ' not found.'; if v_onErrorStop THEN RAISE EXCEPTION '_verify_group: %',v_msg; END IF; RETURN NEXT v_msg; v_finalMsg = ''; END IF; IF v_pgVersion >= '8.4' THEN PERFORM tgname FROM pg_trigger WHERE tgname = v_truncTriggerName; IF NOT FOUND THEN v_msg = v_msgPrefix || 'truncate trigger ' || v_truncTriggerName || ' not found.'; if v_onErrorStop THEN RAISE EXCEPTION '_verify_group: %',v_msg; END IF; RETURN NEXT v_msg; v_finalMsg = ''; END IF; END IF; -- -> check the log table exists PERFORM relname FROM pg_class, pg_namespace WHERE relnamespace = pg_namespace.oid AND nspname = v_emajSchema AND relkind = 'r' AND relname = v_logTableName; IF NOT FOUND THEN v_msg = v_msgPrefix || 'log table ' || v_logTableName || ' not found.'; if v_onErrorStop THEN RAISE EXCEPTION '_verify_group: %',v_msg; END IF; RETURN NEXT v_msg; v_finalMsg = ''; ELSE -- -> check that the log tables structure is consistent with the application tables structure -- (same columns and same formats) -- - added or changed column in application table PERFORM attname, atttypid, attlen, atttypmod FROM pg_attribute, pg_class, pg_namespace WHERE nspname = r_tblsq.rel_schema AND relnamespace = pg_namespace.oid AND relname = r_tblsq.rel_tblseq AND attrelid = pg_class.oid AND attnum > 0 AND attisdropped = false EXCEPT SELECT attname, atttypid, attlen, atttypmod FROM pg_attribute, pg_class, pg_namespace WHERE nspname = v_emajSchema AND relnamespace = pg_namespace.oid AND relname = v_logTableName AND attrelid = pg_class.oid AND attnum > 0 AND attisdropped = false AND attname NOT LIKE 'emaj%'; IF FOUND THEN v_msg = v_msgPrefix || 'the structure of log table ' || v_logTableName || ' is not coherent with ' || v_fullTableName || ' (added or changed column?).'; if v_onErrorStop THEN RAISE EXCEPTION '_verify_group: %',v_msg; END IF; RETURN NEXT v_msg; v_finalMsg = ''; END IF; -- - missing or changed column in application table PERFORM attname, atttypid, attlen, atttypmod FROM pg_attribute, pg_class, pg_namespace WHERE nspname = v_emajSchema AND relnamespace = pg_namespace.oid AND relname = v_logTableName AND attrelid = pg_class.oid AND attnum > 0 AND attisdropped = false AND attname NOT LIKE 'emaj%' EXCEPT SELECT attname, atttypid, attlen, atttypmod FROM pg_attribute, pg_class, pg_namespace WHERE nspname = r_tblsq.rel_schema AND relnamespace = pg_namespace.oid AND relname = r_tblsq.rel_tblseq AND attrelid = pg_class.oid AND attnum > 0 AND attisdropped = false; IF FOUND THEN v_msg = v_msgPrefix || 'the structure of log table ' || v_logTableName || ' is not coherent with ' || v_fullTableName || ' (dropped or changed column?).'; if v_onErrorStop THEN RAISE EXCEPTION '_verify_group: %',v_msg; END IF; RETURN NEXT v_msg; v_finalMsg = ''; END IF; END IF; -- if it is a sequence, nothing to do END IF; END LOOP; IF v_finalMsg <> '' THEN -- OK, no error for the group RETURN NEXT v_finalMsg; END IF; RETURN; END; $_verify_group$; CREATE or REPLACE FUNCTION emaj._check_fk_groups(v_groupNames TEXT[]) RETURNS void LANGUAGE plpgsql AS $_check_fk_groups$ -- this function checks foreign key constraints for tables of a groups array. -- tables from audit_only groups are ignored in this check because they will never be rollbacked. -- Input: group names array DECLARE r_fk RECORD; BEGIN -- issue a warning if a table of the groups has a foreign key that references a table outside the groups FOR r_fk IN SELECT c.conname,r.rel_schema,r.rel_tblseq,nf.nspname,tf.relname FROM pg_constraint c, pg_namespace n, pg_class t, pg_namespace nf,pg_class tf, emaj.emaj_relation r, emaj.emaj_group g WHERE contype = 'f' -- FK constraints only AND c.conrelid = t.oid AND t.relnamespace = n.oid -- join for table and namespace AND c.confrelid = tf.oid AND tf.relnamespace = nf.oid -- join for referenced table and namespace AND n.nspname = r.rel_schema AND t.relname = r.rel_tblseq -- join on emaj_relation table AND r.rel_group = g.group_name -- join on emaj_group table AND r.rel_group = ANY (v_groupNames) -- only tables of the selected groups AND g.group_is_rollbackable -- only tables from rollbackable groups AND (nf.nspname,tf.relname) NOT IN -- referenced table outside the groups (SELECT rel_schema,rel_tblseq FROM emaj.emaj_relation WHERE rel_group = ANY (v_groupNames)) LOOP RAISE WARNING '_check_fk_groups: Foreign key %, from table %.%, references %.% that is outside groups (%).', r_fk.conname,r_fk.rel_schema,r_fk.rel_tblseq,r_fk.nspname,r_fk.relname,array_to_string(v_groupNames,','); END LOOP; -- issue a warning if a table of the groups is referenced by a table outside the groups FOR r_fk IN SELECT c.conname,n.nspname,t.relname,r.rel_schema,r.rel_tblseq FROM pg_constraint c, pg_namespace n, pg_class t, pg_namespace nf,pg_class tf, emaj.emaj_relation r, emaj.emaj_group g WHERE contype = 'f' -- FK constraints only AND c.conrelid = t.oid AND t.relnamespace = n.oid -- join for table and namespace AND c.confrelid = tf.oid AND tf.relnamespace = nf.oid -- join for referenced table and namespace AND nf.nspname = r.rel_schema AND tf.relname = r.rel_tblseq -- join with emaj_relation table AND r.rel_group = g.group_name -- join on emaj_group table AND r.rel_group = ANY (v_groupNames) -- only tables of the selected groups AND g.group_is_rollbackable -- only tables from rollbackable groups AND (n.nspname,t.relname) NOT IN -- referenced table outside the groups (SELECT rel_schema,rel_tblseq FROM emaj.emaj_relation WHERE rel_group = ANY (v_groupNames)) LOOP RAISE WARNING '_check_fk_groups: table %.% is referenced by foreign key % from table %.% that is outside groups (%).', r_fk.rel_schema,r_fk.rel_tblseq,r_fk.conname,r_fk.nspname,r_fk.relname,array_to_string(v_groupNames,','); END LOOP; RETURN; END; $_check_fk_groups$; CREATE or REPLACE FUNCTION emaj._lock_groups(v_groupNames TEXT[], v_lockMode TEXT, v_multiGroup BOOLEAN) RETURNS void LANGUAGE plpgsql AS $_lock_groups$ -- This function locks all tables of a groups array. -- The lock mode is provided by the calling function -- Input: array of group names, lock mode, flag indicating whether the function is called to processed several groups DECLARE v_nbRetry SMALLINT := 0; v_nbTbl INT; v_ok BOOLEAN := false; v_fullTableName TEXT; v_mode TEXT; r_tblsq RECORD; BEGIN -- insert begin in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object) VALUES (CASE WHEN v_multiGroup THEN 'LOCK_GROUPS' ELSE 'LOCK_GROUP' END,'BEGIN', array_to_string(v_groupNames,',')); -- set the value for the lock mode that will be used in the LOCK statement IF v_lockMode = '' THEN v_mode = 'ACCESS EXCLUSIVE'; ELSE v_mode = v_lockMode; END IF; -- acquire lock on all tables -- in case of deadlock, retry up to 5 times WHILE NOT v_ok AND v_nbRetry < 5 LOOP BEGIN -- scan all tables of the group v_nbTbl = 0; FOR r_tblsq IN SELECT rel_priority, rel_schema, rel_tblseq FROM emaj.emaj_relation WHERE rel_group = ANY (v_groupNames) AND rel_kind = 'r' ORDER BY rel_priority, rel_schema, rel_tblseq LOOP -- lock the table v_fullTableName := quote_ident(r_tblsq.rel_schema) || '.' || quote_ident(r_tblsq.rel_tblseq); EXECUTE 'LOCK TABLE ' || v_fullTableName || ' IN ' || v_mode || ' MODE'; v_nbTbl = v_nbTbl + 1; END LOOP; -- ok, all tables locked v_ok = true; EXCEPTION WHEN deadlock_detected THEN v_nbRetry = v_nbRetry + 1; RAISE NOTICE '_lock_groups: a deadlock has been trapped while locking tables of group %.', v_groupNames; END; END LOOP; IF NOT v_ok THEN RAISE EXCEPTION '_lock_groups: too many (5) deadlocks encountered while locking tables of group %.',v_groupNames; END IF; -- insert end in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES (CASE WHEN v_multiGroup THEN 'LOCK_GROUPS' ELSE 'LOCK_GROUP' END, 'END', array_to_string(v_groupNames,','), v_nbTbl || ' tables locked, ' || v_nbRetry || ' deadlock(s)'); RETURN; END; $_lock_groups$; CREATE or REPLACE FUNCTION emaj.emaj_create_group(v_groupName TEXT) RETURNS INT LANGUAGE plpgsql AS $emaj_create_group$ -- This function is the simplified form of the emaj_create_group(v_groupName TEXT, v_isRollbackable BOOLEAN) function -- The created groups are considered 'rollbackable' -- Input: group name -- Output: number of processed tables and sequences BEGIN RETURN emaj.emaj_create_group(v_groupName,true); END; $emaj_create_group$; COMMENT ON FUNCTION emaj.emaj_create_group(TEXT) IS $$Creates a rollbackable E-Maj group.$$; CREATE or REPLACE FUNCTION emaj.emaj_create_group(v_groupName TEXT, v_isRollbackable BOOLEAN) RETURNS INT LANGUAGE plpgsql AS $emaj_create_group$ -- This function creates emaj objects for all tables of a group -- Input: group name, boolean indicating wether the group is rollbackable or not -- Output: number of processed tables and sequences DECLARE v_nbTbl INT := 0; v_nbSeq INT := 0; v_msg TEXT; v_relkind TEXT; v_stmt TEXT; v_nb_trg INT; r_tblsq RECORD; BEGIN -- insert begin in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('CREATE_GROUP', 'BEGIN', v_groupName, CASE WHEN v_isRollbackable THEN 'rollbackable' ELSE 'audit_only' END); -- check that the group name is valid (not '' and not containing ',' characters) IF v_groupName IS NULL THEN RAISE EXCEPTION 'emaj_create_group: group name can''t be NULL.'; END IF; IF v_groupName = '' THEN RAISE EXCEPTION 'emaj_create_group: group name must at least contain 1 character.'; END IF; -- check that the group is not yet recorded in emaj_group table PERFORM 1 FROM emaj.emaj_group WHERE group_name = v_groupName; IF FOUND THEN RAISE EXCEPTION 'emaj_create_group: group % is already created.', v_groupName; END IF; -- check that no table or sequence of the new group already belong to another created group v_msg = ''; FOR r_tblsq IN SELECT grpdef_schema, grpdef_tblseq, rel_group FROM emaj.emaj_group_def, emaj.emaj_relation WHERE grpdef_schema = rel_schema AND grpdef_tblseq = rel_tblseq AND grpdef_group = v_groupName LOOP IF v_msg <> '' THEN v_msg = v_msg || ', '; END IF; v_msg = v_msg || r_tblsq.grpdef_schema || '.' || r_tblsq.grpdef_tblseq || ' in ' || r_tblsq.rel_group; END LOOP; IF v_msg <> '' THEN RAISE EXCEPTION 'emaj_create_group: one or several tables already belong to another group (%).', v_msg; END IF; -- OK, insert group row in the emaj_group table INSERT INTO emaj.emaj_group (group_name, group_state, group_is_rollbackable) VALUES (v_groupName, 'IDLE',v_isRollbackable); -- scan all classes of the group (in priority order, NULLS being processed last) FOR r_tblsq IN SELECT grpdef_priority, grpdef_schema, grpdef_tblseq FROM emaj.emaj_group_def WHERE grpdef_group = v_groupName ORDER BY grpdef_priority, grpdef_schema, grpdef_tblseq LOOP -- check the class is valid v_relkind = emaj._check_class(r_tblsq.grpdef_schema, r_tblsq.grpdef_tblseq); IF v_relkind = 'r' THEN -- if it is a table, create the related emaj objects PERFORM emaj._create_tbl(r_tblsq.grpdef_schema, r_tblsq.grpdef_tblseq, v_isRollbackable); v_nbTbl = v_nbTbl + 1; ELSEIF v_relkind = 'S' THEN -- if it is a sequence, just count v_nbSeq = v_nbSeq + 1; END IF; -- record this table or sequence in the emaj_relation table INSERT INTO emaj.emaj_relation (rel_schema, rel_tblseq, rel_group, rel_priority, rel_kind) VALUES (r_tblsq.grpdef_schema, r_tblsq.grpdef_tblseq, v_groupName, r_tblsq.grpdef_priority, v_relkind); END LOOP; IF v_nbTbl + v_nbSeq = 0 THEN RAISE EXCEPTION 'emaj_create_group: Group % is unknown in emaj_group_def table.', v_groupName; END IF; -- update tables and sequences counters in the emaj_group table UPDATE emaj.emaj_group SET group_nb_table = v_nbTbl, group_nb_sequence = v_nbSeq WHERE group_name = v_groupName; -- check foreign keys with tables outside the group PERFORM emaj._check_fk_groups (array[v_groupName]); -- insert end in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('CREATE_GROUP', 'END', v_groupName, v_nbTbl + v_nbSeq || ' tables/sequences processed'); RETURN v_nbTbl + v_nbSeq; END; $emaj_create_group$; COMMENT ON FUNCTION emaj.emaj_create_group(TEXT, BOOLEAN) IS $$Creates an E-Maj group.$$; CREATE or REPLACE FUNCTION emaj.emaj_comment_group(v_groupName TEXT, v_comment TEXT) RETURNS void LANGUAGE plpgsql AS $emaj_comment_group$ -- This function sets or modifies a comment on a group by updating the group_comment of the emaj_group table. -- Input: group name, comment -- To reset an existing comment for a group, the supplied comment can be NULL. DECLARE v_groupState TEXT; BEGIN -- attempt to update the group_comment column from emaj_group table UPDATE emaj.emaj_group SET group_comment = v_comment WHERE group_name = v_groupName; -- check that the group has been found IF NOT FOUND THEN RAISE EXCEPTION 'emaj_comment_group: group % has not been created.', v_groupName; END IF; -- insert event in the history INSERT INTO emaj.emaj_hist (hist_function, hist_object) VALUES ('COMMENT_GROUP', v_groupName); RETURN; END; $emaj_comment_group$; COMMENT ON FUNCTION emaj.emaj_comment_group(TEXT,TEXT) IS $$Sets a comment on an E-Maj group.$$; CREATE or REPLACE FUNCTION emaj.emaj_drop_group(v_groupName TEXT) RETURNS INT LANGUAGE plpgsql AS $emaj_drop_group$ -- This function deletes the emaj objects for all tables of a group -- Input: group name -- Output: number of processed tables and sequences DECLARE v_nbTb INT; BEGIN -- insert begin in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object) VALUES ('DROP_GROUP', 'BEGIN', v_groupName); -- effectively drop the group SELECT emaj._drop_group(v_groupName, TRUE) INTO v_nbTb; -- insert end in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('DROP_GROUP', 'END', v_groupName, v_nbTb || ' tables/sequences processed'); RETURN v_nbTb; END; $emaj_drop_group$; COMMENT ON FUNCTION emaj.emaj_drop_group(TEXT) IS $$Drops an E-Maj group.$$; CREATE or REPLACE FUNCTION emaj.emaj_force_drop_group(v_groupName TEXT) RETURNS INT LANGUAGE plpgsql AS $emaj_force_drop_group$ -- This function deletes the emaj objects for all tables of a group. -- It differs from emaj_drop_group by the fact that no check is done on group's state. -- This allows to drop a group that is not consistent, following hasardeous operations. -- This function should not be used, except if the emaj_drop_group fails. -- Input: group name -- Output: number of processed tables and sequences DECLARE v_nbTb INT; BEGIN -- insert begin in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object) VALUES ('FORCE_DROP_GROUP', 'BEGIN', v_groupName); -- effectively drop the group SELECT emaj._drop_group(v_groupName, FALSE) INTO v_nbTb; -- insert end in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('FORCE_DROP_GROUP', 'END', v_groupName, v_nbTb || ' tables/sequences processed'); RETURN v_nbTb; END; $emaj_force_drop_group$; COMMENT ON FUNCTION emaj.emaj_force_drop_group(TEXT) IS $$Drops an E-Maj group, even in LOGGING state.$$; CREATE or REPLACE FUNCTION emaj._drop_group(v_groupName TEXT, v_checkState BOOLEAN) RETURNS INT LANGUAGE plpgsql AS $_drop_group$ -- This function effectively deletes the emaj objects for all tables of a group -- Input: group name, and a boolean indicating whether the group's state has to be checked -- Output: number of processed tables and sequences DECLARE v_groupState TEXT; v_isRollbackable BOOLEAN; v_nbTb INT := 0; r_tblsq RECORD; BEGIN -- check that the group is recorded in emaj_group table SELECT group_state, group_is_rollbackable INTO v_groupState, v_isRollbackable FROM emaj.emaj_group WHERE group_name = v_groupName FOR UPDATE; IF NOT FOUND THEN RAISE EXCEPTION '_drop_group: group % has not been created.', v_groupName; END IF; -- if the state of the group has to be checked, IF v_checkState THEN -- check that the group is IDLE (i.e. not in a LOGGING) state IF v_groupState <> 'IDLE' THEN RAISE EXCEPTION '_drop_group: The group % cannot be deleted because it is not in idle state.', v_groupName; END IF; END IF; -- OK, delete the emaj objets for each table of the group FOR r_tblsq IN SELECT rel_priority, rel_schema, rel_tblseq, rel_kind FROM emaj.emaj_relation WHERE rel_group = v_groupName ORDER BY rel_priority, rel_schema, rel_tblseq LOOP IF r_tblsq.rel_kind = 'r' THEN -- if it is a table, delete the related emaj objects PERFORM emaj._drop_tbl (r_tblsq.rel_schema, r_tblsq.rel_tblseq, v_isRollbackable); ELSEIF r_tblsq.rel_kind = 'S' THEN -- if it is a sequence, delete all related data from emaj_sequence table PERFORM emaj._drop_seq (r_tblsq.rel_schema, r_tblsq.rel_tblseq); END IF; v_nbTb = v_nbTb + 1; END LOOP; -- delete group rows from the emaj_fk table. DELETE FROM emaj.emaj_fk WHERE v_groupName = ANY (fk_groups); -- delete group row from the emaj_group table. -- By cascade, it also deletes rows from emaj_relation and emaj_mark DELETE FROM emaj.emaj_group WHERE group_name = v_groupName; RETURN v_nbTb; END; $_drop_group$; CREATE or REPLACE FUNCTION emaj.emaj_start_group(v_groupName TEXT, v_mark TEXT) RETURNS INT LANGUAGE plpgsql AS $emaj_start_group$ -- This function activates the log triggers of all the tables for a group and set a first mark -- This is the short form for emaj.emaj_start_group(v_groupName TEXT, v_mark TEXT, v_resetLog BOOLEAN) -- where v_resetLog is true. BEGIN RETURN emaj.emaj_start_group(v_groupName, v_mark, true); END; $emaj_start_group$; COMMENT ON FUNCTION emaj.emaj_start_group(TEXT,TEXT) IS $$Starts an E-Maj group (short form).$$; CREATE or REPLACE FUNCTION emaj.emaj_start_group(v_groupName TEXT, v_mark TEXT, v_resetLog BOOLEAN) RETURNS INT LANGUAGE plpgsql AS $emaj_start_group$ -- This function activates the log triggers of all the tables for a group and set a first mark -- It may reset log tables. -- Input: group name, -- name of the mark to set -- '%' wild characters in mark name are transformed into a characters sequence built from the current timestamp -- a null or '' mark is transformed into 'MARK_%', -- boolean indicating whether the log tables of the group must be reset. -- Output: number of processed tables and sequences DECLARE v_nbTblSeq INT; BEGIN -- insert begin in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('START_GROUP', 'BEGIN', v_groupName, CASE WHEN v_resetLog THEN 'With log reset' ELSE 'Without log reset' END); -- call the common _start_groups function SELECT emaj._start_groups(array[v_groupName], v_mark, false, v_resetLog) INTO v_nbTblSeq; -- insert end in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('START_GROUP', 'END', v_groupName, v_nbTblSeq || ' tables/sequences processed'); RETURN v_nbTblSeq; END; $emaj_start_group$; COMMENT ON FUNCTION emaj.emaj_start_group(TEXT,TEXT,BOOLEAN) IS $$Starts an E-Maj group.$$; CREATE or REPLACE FUNCTION emaj.emaj_start_groups(v_groupNames TEXT[], v_mark TEXT) RETURNS INT LANGUAGE plpgsql AS $emaj_start_groups$ -- This function activates the log triggers of all the tables for a groups array and set a first mark -- This is the short form for emaj.emaj_start_groups(v_groupName TEXT, v_mark TEXT, v_resetLog BOOLEAN) -- where v_resetLog is true. BEGIN RETURN emaj.emaj_start_groups(v_groupNames, v_mark, true); END; $emaj_start_groups$; COMMENT ON FUNCTION emaj.emaj_start_groups(TEXT[],TEXT) IS $$Starts several E-Maj groups (short form).$$; CREATE or REPLACE FUNCTION emaj.emaj_start_groups(v_groupNames TEXT[], v_mark TEXT, v_resetLog BOOLEAN) RETURNS INT LANGUAGE plpgsql AS $emaj_start_groups$ -- This function activates the log triggers of all the tables for a groups array and set a first mark -- Input: array of group names, -- name of the mark to set -- '%' wild characters in mark name are transformed into a characters sequence built from the current timestamp -- a null or '' mark is transformed into 'MARK_%', -- boolean indicating whether the log tables of the group must be reset. -- Output: total number of processed tables and sequences DECLARE v_nbTblSeq INT; BEGIN -- insert begin in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('START_GROUPS', 'BEGIN', array_to_string(v_groupNames,','), CASE WHEN v_resetLog THEN 'With log reset' ELSE 'Without log reset' END); -- call the common _start_groups function SELECT emaj._start_groups(emaj._check_group_names_array(v_groupNames), v_mark, true, v_resetLog) INTO v_nbTblSeq; -- insert end in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('START_GROUPS', 'END', array_to_string(v_groupNames,','), v_nbTblSeq || ' tables/sequences processed'); RETURN v_nbTblSeq; END; $emaj_start_groups$; COMMENT ON FUNCTION emaj.emaj_start_groups(TEXT[],TEXT, BOOLEAN) IS $$Starts several E-Maj groups.$$; CREATE or REPLACE FUNCTION emaj._start_groups(v_groupNames TEXT[], v_mark TEXT, v_multiGroup BOOLEAN, v_resetLog BOOLEAN) RETURNS INT LANGUAGE plpgsql SECURITY DEFINER AS $_start_groups$ -- This function activates the log triggers of all the tables for one or several groups and set a first mark -- It also delete oldest rows in emaj_hist table -- Input: array of group names, name of the mark to set, boolean indicating whether the function is called by a multi group function, boolean indicating whether the function must reset the group at start time -- Output: number of processed tables -- The function is defined as SECURITY DEFINER so that emaj_adm role can use it even if he is not the owner of application tables and sequences. DECLARE v_pgVersion TEXT := emaj._pg_version(); v_i INT; v_groupState TEXT; v_nbTb INT := 0; v_markName TEXT; v_logTableName TEXT; v_fullTableName TEXT; v_logTriggerName TEXT; v_truncTriggerName TEXT; v_cpt BIGINT; r_tblsq RECORD; BEGIN -- purge the emaj history, if needed PERFORM emaj._purge_hist(); -- if the group names array is null, immediately return 0 IF v_groupNames IS NULL THEN RETURN 0; END IF; -- check that each group is recorded in emaj_group table FOR v_i in 1 .. array_upper(v_groupNames,1) LOOP SELECT group_state INTO v_groupState FROM emaj.emaj_group WHERE group_name = v_groupNames[v_i] FOR UPDATE; IF NOT FOUND THEN RAISE EXCEPTION '_start_group: group % has not been created.', v_groupNames[v_i]; END IF; -- ... and is in IDLE (i.e. not in a LOGGING) state IF v_groupState <> 'IDLE' THEN RAISE EXCEPTION '_start_group: The group % cannot be started because it is not in idle state. An emaj_stop_group function must be previously executed.', v_groupNames[v_i]; END IF; -- ... and is not damaged PERFORM 0 FROM emaj._verify_group(v_groupNames[v_i], true); END LOOP; -- for each group, FOR v_i in 1 .. array_upper(v_groupNames,1) LOOP if v_resetLog THEN -- ... if requested by the user, call the emaj_reset_group function to erase remaining traces from previous logs SELECT emaj._rst_group(v_groupNames[v_i]) INTO v_nbTb; END IF; -- ... and check foreign keys with tables outside the group PERFORM emaj._check_fk_groups(array[v_groupNames[v_i]]); END LOOP; -- check and process the supplied mark name SELECT emaj._check_new_mark(v_mark, v_groupNames) INTO v_markName; -- OK, lock all tables to get a stable point ... -- (the ALTER TABLE statements will also set EXCLUSIVE locks, but doing this for all tables at the beginning of the operation decreases the risk for deadlock) PERFORM emaj._lock_groups(v_groupNames,'',v_multiGroup); -- ... and enable all log triggers for the groups v_nbTb = 0; -- for each relation of the group, FOR r_tblsq IN SELECT rel_priority, rel_schema, rel_tblseq, rel_kind FROM emaj.emaj_relation WHERE rel_group = ANY (v_groupNames) ORDER BY rel_priority, rel_schema, rel_tblseq LOOP IF r_tblsq.rel_kind = 'r' THEN -- if it is a table, enable the emaj log and truncate triggers v_fullTableName := quote_ident(r_tblsq.rel_schema) || '.' || quote_ident(r_tblsq.rel_tblseq); v_logTriggerName := quote_ident(r_tblsq.rel_schema || '_' || r_tblsq.rel_tblseq || '_emaj_log_trg'); v_truncTriggerName := quote_ident(r_tblsq.rel_schema || '_' || r_tblsq.rel_tblseq || '_emaj_trunc_trg'); EXECUTE 'ALTER TABLE ' || v_fullTableName || ' ENABLE TRIGGER ' || v_logTriggerName; IF v_pgVersion >= '8.4' THEN EXECUTE 'ALTER TABLE ' || v_fullTableName || ' ENABLE TRIGGER ' || v_truncTriggerName; END IF; ELSEIF r_tblsq.rel_kind = 'S' THEN -- if it is a sequence, nothing to do END IF; v_nbTb = v_nbTb + 1; END LOOP; -- update the state of the group row from the emaj_group table UPDATE emaj.emaj_group SET group_state = 'LOGGING' WHERE group_name = ANY (v_groupNames); -- Set the first mark for each group PERFORM emaj._set_mark_groups(v_groupNames, v_markName, v_multiGroup, true); -- RETURN v_nbTb; END; $_start_groups$; CREATE or REPLACE FUNCTION emaj.emaj_stop_group(v_groupName TEXT) RETURNS INT LANGUAGE plpgsql AS $emaj_stop_group$ -- This function de-activates the log triggers of all the tables for a group. -- Execute several emaj_stop_group functions for the same group doesn't produce any error. -- Input: group name -- Output: number of processed tables and sequences BEGIN -- just call the common _stop_groups function RETURN emaj._stop_groups(array[v_groupName], 'STOP_%', false); END; $emaj_stop_group$; COMMENT ON FUNCTION emaj.emaj_stop_group(TEXT) IS $$Stops an E-Maj group.$$; CREATE or REPLACE FUNCTION emaj.emaj_stop_groups(v_groupNames TEXT[]) RETURNS INT LANGUAGE plpgsql AS $emaj_stop_groups$ -- This function de-activates the log triggers of all the tables for a groups array. -- Groups already in IDDLE state are simply not processed. -- Input: array of group names -- Output: number of processed tables and sequences BEGIN -- just call the common _stop_groups function RETURN emaj._stop_groups(emaj._check_group_names_array(v_groupNames), 'STOP_%', true); END; $emaj_stop_groups$; COMMENT ON FUNCTION emaj.emaj_stop_groups(TEXT[]) IS $$Stops several E-Maj groups.$$; CREATE or REPLACE FUNCTION emaj._stop_groups(v_groupNames TEXT[], v_mark TEXT, v_multiGroup BOOLEAN) RETURNS INT LANGUAGE plpgsql SECURITY DEFINER AS $_stop_groups$ -- This function effectively de-activates the log triggers of all the tables for a group. -- Input: array of group names, a mark name to set, and a boolean indicating if the function is called by a multi group function -- Output: number of processed tables and sequences -- The function is defined as SECURITY DEFINER so that emaj_adm role can use it even if he is not the owner of application tables and sequences. DECLARE v_pgVersion TEXT := emaj._pg_version(); v_validGroupNames TEXT[]; v_i INT; v_groupState TEXT; v_nbTb INT := 0; v_markName TEXT; v_fullTableName TEXT; v_logTriggerName TEXT; v_truncTriggerName TEXT; r_tblsq RECORD; BEGIN -- if the group names array is null, immediately return 0 IF v_groupNames IS NULL THEN RETURN 0; END IF; -- insert begin in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object) VALUES (CASE WHEN v_multiGroup THEN 'STOP_GROUPS' ELSE 'STOP_GROUP' END, 'BEGIN', array_to_string(v_groupNames,',')); -- for each group of the array, FOR v_i in 1 .. array_upper(v_groupNames,1) LOOP -- ... check that the group is recorded in emaj_group table SELECT group_state INTO v_groupState FROM emaj.emaj_group WHERE group_name = v_groupNames[v_i] FOR UPDATE; IF NOT FOUND THEN RAISE EXCEPTION '_stop_group: group % has not been created.', v_groupNames[v_i]; END IF; -- ... check that the group is in LOGGING state IF v_groupState <> 'LOGGING' THEN RAISE WARNING '_stop_group: Group % cannot be stopped because it is not in logging state.', v_groupNames[v_i]; ELSE -- ... if OK, add the group into the array of groups to process v_validGroupNames = v_validGroupNames || array[v_groupNames[v_i]]; END IF; END LOOP; -- check and process the supplied mark name SELECT emaj._check_new_mark(v_mark, v_groupNames) INTO v_markName; -- IF v_validGroupNames IS NOT NULL THEN -- OK, lock all tables to get a stable point ... -- (the ALTER TABLE statements will also set EXCLUSIVE locks, but doing this for all tables at the beginning of the operation decreases the risk for deadlock) PERFORM emaj._lock_groups(v_validGroupNames,'',v_multiGroup); -- for each relation of the groups to process, FOR r_tblsq IN SELECT rel_priority, rel_schema, rel_tblseq, rel_kind FROM emaj.emaj_relation WHERE rel_group = ANY (v_validGroupNames) ORDER BY rel_priority, rel_schema, rel_tblseq LOOP IF r_tblsq.rel_kind = 'r' THEN -- if it is a table, disable the emaj log and truncate triggers v_fullTableName := quote_ident(r_tblsq.rel_schema) || '.' || quote_ident(r_tblsq.rel_tblseq); v_logTriggerName := quote_ident(r_tblsq.rel_schema || '_' || r_tblsq.rel_tblseq || '_emaj_log_trg'); v_truncTriggerName := quote_ident(r_tblsq.rel_schema || '_' || r_tblsq.rel_tblseq || '_emaj_trunc_trg'); EXECUTE 'ALTER TABLE ' || v_fullTableName || ' DISABLE TRIGGER ' || v_logTriggerName; IF v_pgVersion >= '8.4' THEN EXECUTE 'ALTER TABLE ' || v_fullTableName || ' DISABLE TRIGGER ' || v_truncTriggerName; END IF; ELSEIF r_tblsq.rel_kind = 'S' THEN -- if it is a sequence, nothing to do END IF; v_nbTb = v_nbTb + 1; END LOOP; -- record the number of log rows for the old last mark of each group UPDATE emaj.emaj_mark m SET mark_log_rows_before_next = (SELECT sum(stat_rows) FROM emaj.emaj_log_stat_group(m.mark_group,'EMAJ_LAST_MARK',NULL)) WHERE mark_group = ANY (v_groupNames) AND (mark_group, mark_id) IN -- select only last mark of each concerned group (SELECT mark_group, MAX(mark_id) FROM emaj.emaj_mark WHERE mark_group = ANY (v_groupNames) AND mark_state = 'ACTIVE' GROUP BY mark_group); -- set all marks for the groups from the emaj_mark table in 'DELETED' state to avoid any further rollback UPDATE emaj.emaj_mark SET mark_state = 'DELETED' WHERE mark_group = ANY (v_validGroupNames) AND mark_state <> 'DELETED'; -- update the state of the groups rows from the emaj_group table UPDATE emaj.emaj_group SET group_state = 'IDLE' WHERE group_name = ANY (v_validGroupNames); END IF; -- Set the stop mark for each group PERFORM emaj._set_mark_groups(v_groupNames, v_markName, v_multiGroup, true); -- insert end in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES (CASE WHEN v_multiGroup THEN 'STOP_GROUPS' ELSE 'STOP_GROUP' END, 'END', array_to_string(v_groupNames,','), v_nbTb || ' tables/sequences processed'); RETURN v_nbTb; END; $_stop_groups$; CREATE or REPLACE FUNCTION emaj.emaj_set_mark_group(v_groupName TEXT, v_mark TEXT) RETURNS int LANGUAGE plpgsql AS $emaj_set_mark_group$ -- This function inserts a mark in the emaj_mark table and takes an image of the sequences definitions for the group -- Input: group name, mark to set -- '%' wild characters in mark name are transformed into a characters sequence built from the current timestamp -- a null or '' mark is transformed into 'MARK_%' -- Output: number of processed tables and sequences DECLARE v_groupState TEXT; v_markName TEXT; v_nbTb INT; BEGIN -- insert begin into the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('SET_MARK_GROUP', 'BEGIN', v_groupName, v_markName); -- check that the group is recorded in emaj_group table -- (the SELECT is coded FOR UPDATE to lock the accessed group, avoiding any operation on this group at the same time) SELECT group_state INTO v_groupState FROM emaj.emaj_group WHERE group_name = v_groupName FOR UPDATE; IF NOT FOUND THEN RAISE EXCEPTION 'emaj_set_mark_group: group % has not been created.', v_groupName; END IF; -- check that the group is in LOGGING state IF v_groupState <> 'LOGGING' THEN RAISE EXCEPTION 'emaj_set_mark_group: A mark cannot be set for group % because it is not in logging state. An emaj_start_group function must be previously executed.', v_groupName; END IF; -- check if the emaj group is OK PERFORM 0 FROM emaj._verify_group(v_groupName, true); -- check and process the supplied mark name SELECT emaj._check_new_mark(v_mark, array[v_groupName]) INTO v_markName; -- OK, lock all tables to get a stable point ... -- use a ROW EXCLUSIVE lock mode, preventing for a transaction currently updating data, but not conflicting with simple read access or vacuum operation. PERFORM emaj._lock_groups(array[v_groupName],'ROW EXCLUSIVE',false); -- Effectively set the mark using the internal _set_mark_groups() function SELECT emaj._set_mark_groups(array[v_groupName], v_markName, false, false) into v_nbTb; -- insert end into the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('SET_MARK_GROUP', 'END', v_groupName, v_markName); -- RETURN v_nbTb; END; $emaj_set_mark_group$; COMMENT ON FUNCTION emaj.emaj_set_mark_group(TEXT,TEXT) IS $$Sets a mark on an E-Maj group.$$; CREATE or REPLACE FUNCTION emaj.emaj_set_mark_groups(v_groupNames TEXT[], v_mark TEXT) RETURNS int LANGUAGE plpgsql AS $emaj_set_mark_groups$ -- This function inserts a mark in the emaj_mark table and takes an image of the sequences definitions for several groups at a time -- Input: array of group names, mark to set -- '%' wild characters in mark name are transformed into a characters sequence built from the current timestamp -- a null or '' mark is transformed into 'MARK_%' -- Output: number of processed tables and sequences DECLARE v_validGroupNames TEXT[]; v_groupState TEXT; v_markName TEXT; v_nbTb INT; BEGIN -- validate the group names array v_validGroupNames=emaj._check_group_names_array(v_groupNames); -- if the group names array is null, immediately return 0 IF v_validGroupNames IS NULL THEN INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('SET_MARK_GROUPS', 'BEGIN', NULL, v_mark); INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('SET_MARK_GROUPS', 'END', NULL, v_mark); RETURN 0; END IF; -- insert begin in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('SET_MARK_GROUPS', 'BEGIN', array_to_string(v_groupNames,','), v_mark); -- for each group... FOR v_i in 1 .. array_upper(v_validGroupNames,1) LOOP -- ... check that the group is recorded in emaj_group table -- (the SELECT is coded FOR UPDATE to lock the accessed group, avoiding any operation on this group at the same time) SELECT group_state INTO v_groupState FROM emaj.emaj_group WHERE group_name = v_validGroupNames[v_i] FOR UPDATE; IF NOT FOUND THEN RAISE EXCEPTION 'emaj_set_mark_groups: group % has not been created.', v_validGroupNames[v_i]; END IF; -- ... check that the group is in LOGGING state IF v_groupState <> 'LOGGING' THEN RAISE EXCEPTION 'emaj_set_mark_groups: A mark cannot be set for group % because it is not in logging state. An emaj_start_group function must be previously executed.', v_validGroupNames[v_i]; END IF; -- ... check if the group is OK PERFORM 0 FROM emaj._verify_group(v_validGroupNames[v_i], true); END LOOP; -- check and process the supplied mark name SELECT emaj._check_new_mark(v_mark, v_validGroupNames) INTO v_markName; -- OK, lock all tables to get a stable point ... -- use a ROW EXCLUSIVE lock mode, preventing for a transaction currently updating data, but not conflicting with simple read access or vacuum operation. PERFORM emaj._lock_groups(v_validGroupNames,'ROW EXCLUSIVE',true); -- Effectively set the mark using the internal _set_mark_groups() function SELECT emaj._set_mark_groups(v_validGroupNames, v_markName, true, false) into v_nbTb; -- insert end into the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('SET_MARK_GROUPS', 'END', array_to_string(v_groupNames,','), v_mark); -- RETURN v_nbTb; END; $emaj_set_mark_groups$; COMMENT ON FUNCTION emaj.emaj_set_mark_groups(TEXT[],TEXT) IS $$Sets a mark on several E-Maj groups.$$; CREATE or REPLACE FUNCTION emaj._set_mark_groups(v_groupNames TEXT[], v_mark TEXT, v_multiGroup BOOLEAN, v_eventToRecord BOOLEAN) RETURNS int LANGUAGE plpgsql AS $_set_mark_groups$ -- This function effectively inserts a mark in the emaj_mark table and takes an image of the sequences definitions for the array of groups. -- It also updates the previous mark of each group to setup the mark_log_rows_before_next column with the number of rows recorded into all log tables between this previous mark and the new mark. -- It is called by emaj_set_mark_group and emaj_set_mark_groups functions but also by other functions that set internal marks, like functions that start or rollback groups. -- Input: group names array, mark to set, -- boolean indicating whether the function is called by a multi group function -- boolean indicating whether the event has to be recorded into the emaj_hist table -- Output: number of processed tables and sequences -- The insertion of the corresponding event in the emaj_hist table is performed by callers. DECLARE v_pgVersion TEXT := emaj._pg_version(); v_emajSchema TEXT := 'emaj'; v_nbTb INT := 0; v_timestamp TIMESTAMPTZ; v_lastSequenceId BIGINT; v_lastSeqHoleId BIGINT; v_lastGlobalSeq BIGINT; v_fullSeqName TEXT; v_seqName TEXT; v_stmt TEXT; r_tblsq RECORD; BEGIN -- if requested, record the set mark begin in emaj_hist IF v_eventToRecord THEN INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES (CASE WHEN v_multiGroup THEN 'SET_MARK_GROUPS' ELSE 'SET_MARK_GROUP' END, 'BEGIN', array_to_string(v_groupNames,','), v_mark); END IF; -- look at the clock to get the 'official' timestamp representing the mark v_timestamp = clock_timestamp(); -- record the number of log rows for the old last mark of each group -- the statement returns no row in case of emaj_start_group(s) UPDATE emaj.emaj_mark m SET mark_log_rows_before_next = coalesce( (SELECT sum(stat_rows) FROM emaj.emaj_log_stat_group(m.mark_group,'EMAJ_LAST_MARK',NULL)) ,0) WHERE mark_group = ANY (v_groupNames) AND (mark_group, mark_id) IN -- select only last mark of each concerned group (SELECT mark_group, MAX(mark_id) FROM emaj.emaj_mark WHERE mark_group = ANY (v_groupNames) AND mark_state = 'ACTIVE' GROUP BY mark_group); -- for each member of the groups, ... FOR r_tblsq IN SELECT rel_priority, rel_schema, rel_tblseq, rel_kind FROM emaj.emaj_relation WHERE rel_group = ANY (v_groupNames) ORDER BY rel_priority, rel_schema, rel_tblseq LOOP IF r_tblsq.rel_kind = 'r' THEN -- ... if it is a table, record the associated sequence parameters in the emaj sequence table v_seqName := emaj._build_log_seq_name(r_tblsq.rel_schema, r_tblsq.rel_tblseq); v_fullSeqName := quote_ident(v_emajSchema) || '.' || quote_ident(v_seqName); v_stmt = 'INSERT INTO emaj.emaj_sequence (' || 'sequ_schema, sequ_name, sequ_datetime, sequ_mark, sequ_last_val, sequ_start_val, ' || 'sequ_increment, sequ_max_val, sequ_min_val, sequ_cache_val, sequ_is_cycled, sequ_is_called ' || ') SELECT '|| quote_literal(v_emajSchema) || ', ' || quote_literal(v_seqName) || ', ' || quote_literal(v_timestamp) || ', ' || quote_literal(v_mark) || ', ' || 'last_value, '; IF v_pgVersion <= '8.3' THEN v_stmt = v_stmt || '0, '; ELSE v_stmt = v_stmt || 'start_value, '; END IF; v_stmt = v_stmt || 'increment_by, max_value, min_value, cache_value, is_cycled, is_called ' || 'FROM ' || v_fullSeqName; ELSEIF r_tblsq.rel_kind = 'S' THEN -- ... if it is a sequence, record the sequence parameters in the emaj sequence table v_fullSeqName := quote_ident(r_tblsq.rel_schema) || '.' || quote_ident(r_tblsq.rel_tblseq); v_stmt = 'INSERT INTO emaj.emaj_sequence (' || 'sequ_schema, sequ_name, sequ_datetime, sequ_mark, sequ_last_val, sequ_start_val, ' || 'sequ_increment, sequ_max_val, sequ_min_val, sequ_cache_val, sequ_is_cycled, sequ_is_called ' || ') SELECT ' || quote_literal(r_tblsq.rel_schema) || ', ' || quote_literal(r_tblsq.rel_tblseq) || ', ' || quote_literal(v_timestamp) || ', ' || quote_literal(v_mark) || ', last_value, '; IF v_pgVersion <= '8.3' THEN v_stmt = v_stmt || '0, '; ELSE v_stmt = v_stmt || 'start_value, '; END IF; v_stmt = v_stmt || 'increment_by, max_value, min_value, cache_value, is_cycled, is_called ' || 'FROM ' || v_fullSeqName; END IF; EXECUTE v_stmt; v_nbTb = v_nbTb + 1; END LOOP; -- record the marks -- get the last id for emaj_sequence and emaj_seq_hole tables, and the last value for emaj_global_seq SELECT CASE WHEN is_called THEN last_value ELSE last_value - increment_by END INTO v_lastSequenceId FROM emaj.emaj_sequence_sequ_id_seq; SELECT CASE WHEN is_called THEN last_value ELSE last_value - increment_by END INTO v_lastSeqHoleId FROM emaj.emaj_seq_hole_sqhl_id_seq; SELECT CASE WHEN is_called THEN last_value ELSE last_value - increment_by END INTO v_lastGlobalSeq FROM emaj.emaj_global_seq; -- insert the marks into the emaj_mark table FOR v_i in 1 .. array_upper(v_groupNames,1) LOOP INSERT INTO emaj.emaj_mark (mark_group, mark_name, mark_datetime, mark_global_seq, mark_state, mark_last_sequence_id, mark_last_seq_hole_id) VALUES (v_groupNames[v_i], v_mark, v_timestamp, v_lastGlobalSeq, 'ACTIVE', v_lastSequenceId, v_lastSeqHoleId); END LOOP; -- if requested, record the set mark end in emaj_hist IF v_eventToRecord THEN INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES (CASE WHEN v_multiGroup THEN 'SET_MARK_GROUPS' ELSE 'SET_MARK_GROUP' END, 'END', array_to_string(v_groupNames,','), v_mark); END IF; -- RETURN v_nbTb; END; $_set_mark_groups$; CREATE or REPLACE FUNCTION emaj.emaj_comment_mark_group(v_groupName TEXT, v_mark TEXT, v_comment TEXT) RETURNS void LANGUAGE plpgsql AS $emaj_comment_mark_group$ -- This function sets or modifies a comment on a mark by updating the mark_comment of the emaj_mark table. -- Input: group name, mark to comment, comment -- The keyword 'EMAJ_LAST_MARK' can be used as mark to delete to specify the last set mark. -- To reset an existing comment for a mark, the supplied comment can be NULL. DECLARE v_groupState TEXT; v_realMark TEXT; BEGIN -- check that the group is recorded in emaj_group table PERFORM 1 FROM emaj.emaj_group WHERE group_name = v_groupName; IF NOT FOUND THEN RAISE EXCEPTION 'emaj_comment_mark_group: group % has not been created.', v_groupName; END IF; -- retrieve and check the mark name SELECT emaj._get_mark_name(v_groupName,v_mark) INTO v_realMark; IF v_realMark IS NULL THEN RAISE EXCEPTION 'emaj_comment_mark_group: % is not a known mark for group %.', v_mark, v_groupName; END IF; -- OK, update the mark_comment from emaj_mark table UPDATE emaj.emaj_mark SET mark_comment = v_comment WHERE mark_group = v_groupName AND mark_name = v_realMark; -- insert event in the history INSERT INTO emaj.emaj_hist (hist_function, hist_object, hist_wording) VALUES ('COMMENT_MARK_GROUP', v_groupName, 'Mark ' || v_realMark); RETURN; END; $emaj_comment_mark_group$; COMMENT ON FUNCTION emaj.emaj_comment_mark_group(TEXT,TEXT,TEXT) IS $$Sets a comment on a mark for an E-Maj group.$$; CREATE or REPLACE FUNCTION emaj.emaj_get_previous_mark_group(v_groupName TEXT, v_datetime TIMESTAMPTZ) RETURNS text LANGUAGE plpgsql AS $emaj_get_previous_mark_group$ -- This function returns the name of the mark that immediately precedes a given date and time. -- It may return unpredictable result in case of system date or time change. -- The function can be called by both emaj_adm and emaj_viewer roles. -- Input: group name, date and time -- Output: mark name, or NULL if there is no mark before the given date and time DECLARE v_groupState TEXT; v_markName TEXT; BEGIN -- check that the group is recorded in emaj_group table SELECT group_state INTO v_groupState FROM emaj.emaj_group WHERE group_name = v_groupName; IF NOT FOUND THEN RAISE EXCEPTION 'emaj_get_previous_mark_group: group % has not been created.', v_groupName; END IF; -- find the requested mark SELECT mark_name INTO v_markName FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_datetime < v_datetime ORDER BY mark_datetime DESC LIMIT 1; IF NOT FOUND THEN RETURN NULL; ELSE RETURN v_markName; END IF; END; $emaj_get_previous_mark_group$; COMMENT ON FUNCTION emaj.emaj_get_previous_mark_group(TEXT,TIMESTAMPTZ) IS $$Returns the latest mark name preceeding a point in time.$$; CREATE or REPLACE FUNCTION emaj.emaj_delete_mark_group(v_groupName TEXT, v_mark TEXT) RETURNS integer LANGUAGE plpgsql AS $emaj_delete_mark_group$ -- This function deletes all traces from a previous set_mark_group(s) function. -- Then, any rollback on the deleted mark will not be possible. -- It deletes rows corresponding to the mark to delete from emaj_mark and emaj_sequence -- If this mark is the first mark, it also deletes rows from all concerned log tables and holes from emaj_seq_hole. -- The statistical mark_log_rows_before_next column's content of the previous mark is also maintained -- At least one mark must remain after the operation (otherwise it is not worth having a group in LOGGING state !). -- Input: group name, mark to delete -- The keyword 'EMAJ_LAST_MARK' can be used as mark to delete to specify the last set mark. -- Output: number of deleted marks, i.e. 1 DECLARE v_groupState TEXT; v_realMark TEXT; v_markId BIGINT; v_datetimeMark TIMESTAMPTZ; v_idNewMin BIGINT; v_markNewMin TEXT; v_datetimeNewMin TIMESTAMPTZ; v_cpt INT; v_previousMark TEXT; v_nextMark TEXT; BEGIN -- insert begin in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('DELETE_MARK_GROUP', 'BEGIN', v_groupName, v_mark); -- check that the group is recorded in emaj_group table SELECT group_state INTO v_groupState FROM emaj.emaj_group WHERE group_name = v_groupName FOR UPDATE; IF NOT FOUND THEN RAISE EXCEPTION 'emaj_delete_mark_group: group % has not been created.', v_groupName; END IF; -- retrieve and check the mark name SELECT emaj._get_mark_name(v_groupName,v_mark) INTO v_realMark; IF v_realMark IS NULL THEN RAISE EXCEPTION 'emaj_delete_mark_group: % is not a known mark for group %.', v_mark, v_groupName; END IF; -- count the number of mark in the group SELECT count(*) INTO v_cpt FROM emaj.emaj_mark WHERE mark_group = v_groupName; -- and check there are at least 2 marks for the group IF v_cpt < 2 THEN RAISE EXCEPTION 'emaj_delete_mark_group: % is the only mark. It cannot be deleted.', v_mark; END IF; -- OK, now get the id and timestamp of the mark to delete SELECT mark_id, mark_datetime INTO v_markId, v_datetimeMark FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_name = v_realMark; -- ... and the id and timestamp of the future first mark SELECT mark_id, mark_name, mark_datetime INTO v_idNewMin, v_markNewMin, v_datetimeNewMin FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_name <> v_realMark ORDER BY mark_id LIMIT 1; IF v_markId < v_idNewMin THEN -- if the mark to delete is the first one, -- ... process its deletion with _delete_before_mark_group(), as the first rows of log tables become useless PERFORM emaj._delete_before_mark_group(v_groupName, v_markNewMin); ELSE -- otherwise, -- ... the sequences related to the mark to delete can be suppressed -- Delete first application sequences related data for the group DELETE FROM emaj.emaj_sequence USING emaj.emaj_relation WHERE sequ_mark = v_realMark AND sequ_datetime = v_datetimeMark AND rel_group = v_groupName AND rel_kind = 'S' AND sequ_schema = rel_schema AND sequ_name = rel_tblseq; -- Delete then emaj sequences related data for the group DELETE FROM emaj.emaj_sequence USING emaj.emaj_relation WHERE sequ_mark = v_realMark AND sequ_datetime = v_datetimeMark AND rel_group = v_groupName AND rel_kind = 'r' AND sequ_schema = 'emaj' AND sequ_name = emaj._build_log_seq_name(rel_schema,rel_tblseq); -- ... the mark to delete can be physicaly deleted DELETE FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_name = v_realMark; -- ... adjust the mark_log_rows_before_next column of the previous mark -- get the name of the mark immediately preceeding the mark to delete SELECT mark_name INTO v_previousMark FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_id < v_markId ORDER BY mark_id DESC LIMIT 1; -- get the name of the first mark succeeding the mark to delete SELECT mark_name INTO v_nextMark FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_id > v_markId ORDER BY mark_id LIMIT 1; IF NOT FOUND THEN -- no next mark, so update the previous mark with NULL UPDATE emaj.emaj_mark SET mark_log_rows_before_next = NULL WHERE mark_group = v_groupName AND mark_name = v_previousMark; ELSE -- update the previous mark with the emaj_log_stat_group() call's result UPDATE emaj.emaj_mark SET mark_log_rows_before_next = (SELECT sum(stat_rows) FROM emaj.emaj_log_stat_group(v_groupName, v_previousMark, v_nextMark)) WHERE mark_group = v_groupName AND mark_name = v_previousMark; END IF; END IF; -- insert end in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('DELETE_MARK_GROUP', 'END', v_groupName, v_mark); RETURN 1; END; $emaj_delete_mark_group$; COMMENT ON FUNCTION emaj.emaj_delete_mark_group(TEXT,TEXT) IS $$Deletes a mark for an E-Maj group.$$; CREATE or REPLACE FUNCTION emaj.emaj_delete_before_mark_group(v_groupName TEXT, v_mark TEXT) RETURNS integer LANGUAGE plpgsql AS $emaj_delete_before_mark_group$ -- This function deletes all marks set before a given mark. -- Then, any rollback on the deleted marks will not be possible. -- It deletes rows corresponding to the marks to delete from emaj_mark, emaj_sequence, emaj_seq_hole. -- It also deletes rows from all concerned log tables. -- Input: group name, name of the new first mark -- The keyword 'EMAJ_LAST_MARK' can be used as mark name. -- Output: number of deleted marks -- or NULL if the provided mark name is NULL DECLARE v_groupState TEXT; v_realMark TEXT; v_markId BIGINT; v_datetimeMark TIMESTAMPTZ; v_nbMark INT; BEGIN -- insert begin in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('DELETE_BEFORE_MARK_GROUP', 'BEGIN', v_groupName, v_mark); -- check that the group is recorded in emaj_group table SELECT group_state INTO v_groupState FROM emaj.emaj_group WHERE group_name = v_groupName FOR UPDATE; IF NOT FOUND THEN RAISE EXCEPTION 'emaj_delete_before_mark_group: group % has not been created.', v_groupName; END IF; -- return NULL if mark name is NULL IF v_mark IS NULL THEN RETURN NULL; END IF; -- retrieve and check the mark name SELECT emaj._get_mark_name(v_groupName,v_mark) INTO v_realMark; IF v_realMark IS NULL THEN RAISE EXCEPTION 'emaj_delete_before_mark_group: % is not a known mark for group %.', v_mark, v_groupName; END IF; -- effectively delete all marks before the supplied mark SELECT emaj._delete_before_mark_group(v_groupName, v_realMark) INTO v_nbMark; -- insert end in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('DELETE_BEFORE_MARK_GROUP', 'END', v_groupName, v_nbMark || ' marks deleted'); RETURN v_nbMark; END; $emaj_delete_before_mark_group$; COMMENT ON FUNCTION emaj.emaj_delete_before_mark_group(TEXT,TEXT) IS $$Deletes all marks preceeding a given mark for an E-Maj group.$$; CREATE or REPLACE FUNCTION emaj._delete_before_mark_group(v_groupName TEXT, v_mark TEXT) RETURNS integer LANGUAGE plpgsql AS $_delete_before_mark_group$ -- This function effectively deletes all marks set before a given mark. -- It deletes rows corresponding to the marks to delete from emaj_mark, emaj_sequence, emaj_seq_hole. -- It also deletes rows from all concerned log tables. -- Input: group name, name of the new first mark -- Output: number of deleted marks DECLARE v_markId BIGINT; v_markGlobalSeq BIGINT; v_datetimeMark TIMESTAMPTZ; v_emajSchema TEXT := 'emaj'; v_logTableName TEXT; v_nbMark INT; r_tblsq RECORD; BEGIN -- retrieve the id and datetime of the new first mark SELECT mark_id, mark_global_seq, mark_datetime INTO v_markId, v_markGlobalSeq, v_datetimeMark FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_name = v_mark; -- delete rows from all log tables -- loop on all tables of the group FOR r_tblsq IN SELECT rel_priority, rel_schema, rel_tblseq FROM emaj.emaj_relation WHERE rel_group = v_groupName AND rel_kind = 'r' ORDER BY rel_priority, rel_schema, rel_tblseq LOOP v_logTableName := quote_ident(v_emajSchema) || '.' || quote_ident(r_tblsq.rel_schema || '_' || r_tblsq.rel_tblseq || '_log'); -- delete log rows prior to the new first mark EXECUTE 'DELETE FROM ' || v_logTableName || ' WHERE emaj_gid <= ' || v_markGlobalSeq; END LOOP; -- delete all sequence holes that are prior the new first mark for the tables of the group DELETE FROM emaj.emaj_seq_hole USING emaj.emaj_relation WHERE rel_group = v_groupName AND rel_kind = 'r' AND rel_schema = sqhl_schema AND rel_tblseq = sqhl_table AND sqhl_id <= (SELECT mark_last_seq_hole_id FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_name = v_mark); -- now the sequences related to the mark to delete can be suppressed -- Delete first application sequences related data for the group DELETE FROM emaj.emaj_sequence USING emaj.emaj_relation WHERE rel_group = v_groupName AND rel_kind = 'S' AND sequ_schema = rel_schema AND sequ_name = rel_tblseq AND (sequ_mark, sequ_datetime) IN (SELECT mark_name, mark_datetime FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_id < v_markId); -- Delete then emaj sequences related data for the group DELETE FROM emaj.emaj_sequence USING emaj.emaj_relation WHERE rel_group = v_groupName AND rel_kind = 'r' AND sequ_schema = 'emaj' AND sequ_name = emaj._build_log_seq_name(rel_schema,rel_tblseq) AND (sequ_mark, sequ_datetime) IN (SELECT mark_name, mark_datetime FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_id < v_markId); -- and finaly delete marks DELETE FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_id < v_markId; GET DIAGNOSTICS v_nbMark = ROW_COUNT; RETURN v_nbMark; END; $_delete_before_mark_group$; CREATE or REPLACE FUNCTION emaj.emaj_rename_mark_group(v_groupName TEXT, v_mark TEXT, v_newName TEXT) RETURNS void LANGUAGE plpgsql AS $emaj_rename_mark_group$ -- This function renames an existing mark. -- The group can be either in LOGGING or IDLE state. -- Rows from emaj_mark and emaj_sequence tables are updated accordingly. -- Input: group name, mark to rename, new name for the mark -- The keyword 'EMAJ_LAST_MARK' can be used as mark to rename to specify the last set mark. DECLARE v_realMark TEXT; BEGIN -- insert begin in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('RENAME_MARK_GROUP', 'BEGIN', v_groupName, v_mark); -- check that the group is recorded in emaj_group table PERFORM 1 FROM emaj.emaj_group WHERE group_name = v_groupName FOR UPDATE; IF NOT FOUND THEN RAISE EXCEPTION 'emaj_rename_mark_group: group % has not been created.', v_groupName; END IF; -- retrieve and check the mark name SELECT emaj._get_mark_name(v_groupName,v_mark) INTO v_realMark; IF v_realMark IS NULL THEN RAISE EXCEPTION 'emaj_rename_mark_group: mark % doesn''t exist for group %.', v_mark, v_groupName; END IF; -- check the new mark name is not 'EMAJ_LAST_MARK' IF v_newName = 'EMAJ_LAST_MARK' THEN RAISE EXCEPTION 'emaj_rename_mark_group: % is not an allowed name for a new mark.', v_newName; END IF; -- check if the new mark name doesn't exist for the group PERFORM 1 FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_name = v_newName LIMIT 1; IF FOUND THEN RAISE EXCEPTION 'emaj_rename_mark_group: a mark % already exists for group %.', v_newName, v_groupName; END IF; -- OK, update the sequences table as well UPDATE emaj.emaj_sequence SET sequ_mark = v_newName WHERE sequ_datetime = (SELECT mark_datetime FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_name = v_realMark); -- and then update the emaj_mark table UPDATE emaj.emaj_mark SET mark_name = v_newName WHERE mark_group = v_groupName AND mark_name = v_realMark; -- insert end in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('RENAME_MARK_GROUP', 'END', v_groupName, v_realMark || ' renamed ' || v_newName); RETURN; END; $emaj_rename_mark_group$; COMMENT ON FUNCTION emaj.emaj_rename_mark_group(TEXT,TEXT,TEXT) IS $$Renames a mark for an E-Maj group.$$; CREATE or REPLACE FUNCTION emaj.emaj_rollback_group(v_groupName TEXT, v_mark TEXT) RETURNS INT LANGUAGE plpgsql AS $emaj_rollback_group$ -- The function rollbacks all tables and sequences of a group up to a mark in the history -- Input: group name, mark in the history, as it is inserted by emaj.emaj_set_mark_group(s) -- Output: number of processed tables and sequences BEGIN -- just (unlogged) rollback the group, with log table deletion -- (with boolean: unloggedRlbk = true, deleteLog = true, multiGroup = false) return emaj._rlbk_groups(array[v_groupName], v_mark, true, true, false); END; $emaj_rollback_group$; COMMENT ON FUNCTION emaj.emaj_rollback_group(TEXT,TEXT) IS $$Rollbacks an E-Maj group to a given mark.$$; CREATE or REPLACE FUNCTION emaj.emaj_rollback_groups(v_groupNames TEXT[], v_mark TEXT) RETURNS INT LANGUAGE plpgsql AS $emaj_rollback_groups$ -- The function rollbacks all tables and sequences of a group array up to a mark in the history -- Input: array of group names, mark in the history, as it is inserted by emaj.emaj_set_mark_group(s) -- Output: number of processed tables and sequences BEGIN -- just (unlogged) rollback the groups, with log table deletion -- (with boolean: unloggedRlbk = true, deleteLog = true, multiGroup = true) return emaj._rlbk_groups(emaj._check_group_names_array(v_groupNames), v_mark, true, true, true); END; $emaj_rollback_groups$; COMMENT ON FUNCTION emaj.emaj_rollback_groups(TEXT[],TEXT) IS $$Rollbacks an set of E-Maj groups to a given mark.$$; CREATE or REPLACE FUNCTION emaj.emaj_logged_rollback_group(v_groupName TEXT, v_mark TEXT) RETURNS INT LANGUAGE plpgsql AS $emaj_logged_rollback_group$ -- The function performs a logged rollback of all tables and sequences of a group up to a mark in the history. -- A logged rollback is a rollback which can be later rollbacked! To achieve this: -- - log triggers are not disabled at rollback time, -- - a mark is automaticaly set at the beginning and at the end of the rollback operation, -- - rollbacked log rows and any marks inside the rollback time frame are kept. -- Input: group name, mark in the history, as it is inserted by emaj.emaj_set_mark_group(s) -- Output: number of processed tables and sequences BEGIN -- just "logged-rollback" the group, with log table deletion -- (with boolean: unloggedRlbk = false, deleteLog = false, multiGroup = false) return emaj._rlbk_groups(array[v_groupName], v_mark, false, false, false); END; $emaj_logged_rollback_group$; COMMENT ON FUNCTION emaj.emaj_logged_rollback_group(TEXT,TEXT) IS $$Performs a logged (cancellable) rollbacks of an E-Maj group to a given mark.$$; CREATE or REPLACE FUNCTION emaj.emaj_logged_rollback_groups(v_groupNames TEXT[], v_mark TEXT) RETURNS INT LANGUAGE plpgsql AS $emaj_logged_rollback_groups$ -- The function performs a logged rollback of all tables and sequences of a groups array up to a mark in the history. -- A logged rollback is a rollback which can be later rollbacked! To achieve this: -- - log triggers are not disabled at rollback time, -- - a mark is automaticaly set at the beginning and at the end of the rollback operation, -- - rollbacked log rows and any marks inside the rollback time frame are kept. -- Input: array of group names, mark in the history, as it is inserted by emaj.emaj_set_mark_group(s) -- Output: number of processed tables and sequences BEGIN -- just "logged-rollback" the groups, with log table deletion -- (with boolean: unloggedRlbk = false, deleteLog = false, multiGroup = true) return emaj._rlbk_groups(emaj._check_group_names_array(v_groupNames), v_mark, false, false, true); END; $emaj_logged_rollback_groups$; COMMENT ON FUNCTION emaj.emaj_logged_rollback_groups(TEXT[],TEXT) IS $$Performs a logged (cancellable) rollbacks for a set of E-Maj groups to a given mark.$$; CREATE or REPLACE FUNCTION emaj._rlbk_groups(v_groupNames TEXT[], v_mark TEXT, v_unloggedRlbk BOOLEAN, v_deleteLog BOOLEAN, v_multiGroup BOOLEAN) RETURNS INT LANGUAGE plpgsql AS $_rlbk_groups$ -- The function rollbacks all tables and sequences of a groups array up to a mark in the history. -- It is called by emaj_rollback_group. -- It effectively manages the rollback operation for each table or sequence, deleting rows from log tables -- only when asked by the calling functions. -- Its activity is split into 6 smaller functions that are also called by the parallel restore php function -- Input: group name, mark in the history, as it is inserted by emaj.emaj_set_mark_group -- and a boolean saying if the log rows must be deleted from log table or not -- Output: number of tables and sequences effectively processed DECLARE v_nbTbl INT; v_nbTblInGroup INT; v_nbSeq INT; BEGIN -- if the group names array is null, immediately return 0 IF v_groupNames IS NULL THEN RETURN 0; END IF; -- Step 1: prepare the rollback operation SELECT emaj._rlbk_groups_step1(v_groupNames, v_mark, v_unloggedRlbk, 1, v_multiGroup) INTO v_nbTblInGroup; -- Step 2: lock all tables PERFORM emaj._rlbk_groups_step2(v_groupNames, 1, v_multiGroup); -- Step 3: set a rollback start mark if logged rollback PERFORM emaj._rlbk_groups_step3(v_groupNames, v_mark, v_unloggedRlbk, v_multiGroup); -- Step 4: record and drop foreign keys PERFORM emaj._rlbk_groups_step4(v_groupNames, 1); -- Step 5: effectively rollback tables SELECT emaj._rlbk_groups_step5(v_groupNames, v_mark, 1, v_unloggedRlbk, v_deleteLog) INTO v_nbTbl; -- checks that we have the expected number of processed tables IF v_nbTbl <> v_nbTblInGroup THEN RAISE EXCEPTION '_rlbk_group: Internal error 1 (%,%).',v_nbTbl,v_nbTblInGroup; END IF; -- Step 6: recreate foreign keys PERFORM emaj._rlbk_groups_step6(v_groupNames, 1); -- Step 7: process sequences and complete the rollback operation record SELECT emaj._rlbk_groups_step7(v_groupNames, v_mark, v_nbTbl, v_unloggedRlbk, v_deleteLog, v_multiGroup) INTO v_nbSeq; RETURN v_nbTbl + v_nbSeq; END; $_rlbk_groups$; CREATE or REPLACE FUNCTION emaj._rlbk_groups_step1(v_groupNames TEXT[], v_mark TEXT, v_unloggedRlbk BOOLEAN, v_nbSession INT, v_multiGroup BOOLEAN) RETURNS INT LANGUAGE plpgsql AS $_rlbk_groups_step1$ -- This is the first step of a rollback group processing. -- It tests the environment, the supplied parameters and the foreign key constraints. -- It builds the requested number of sessions with the list of tables to process, trying to spread the load over all sessions. -- It finaly inserts into the history the event about the rollback start DECLARE v_i INT; v_groupState TEXT; v_isRollbackable BOOLEAN; v_markName TEXT; v_markState TEXT; v_cpt INT; v_nbTblInGroup INT; v_nbUnchangedTbl INT; v_timestampMark TIMESTAMPTZ; v_session INT; v_sessionLoad INT []; v_minSession INT; v_minRows INT; v_fullTableName TEXT; v_msg TEXT; r_tbl RECORD; r_tbl2 RECORD; BEGIN -- check that each group ... -- ...is recorded in emaj_group table FOR v_i in 1 .. array_upper(v_groupNames,1) LOOP SELECT group_state, group_is_rollbackable INTO v_groupState, v_isRollbackable FROM emaj.emaj_group WHERE group_name = v_groupNames[v_i] FOR UPDATE; IF NOT FOUND THEN RAISE EXCEPTION '_rlbk_groups_step1: group % has not been created.', v_groupNames[v_i]; END IF; -- ... is in LOGGING state IF v_groupState <> 'LOGGING' THEN RAISE EXCEPTION '_rlbk_groups_step1: Group % cannot be rollbacked because it is not in logging state.', v_groupNames[v_i]; END IF; -- ... is ROLLBACKABLE IF NOT v_isRollbackable THEN RAISE EXCEPTION '_rlbk_groups_step1: Group % has been created for audit only purpose. It cannot be rollbacked.', v_groupNames[v_i]; END IF; -- ... is not damaged PERFORM 0 FROM emaj._verify_group(v_groupNames[v_i],true); -- ... owns the requested mark SELECT emaj._get_mark_name(v_groupNames[v_i],v_mark) INTO v_markName; IF NOT FOUND OR v_markName IS NULL THEN RAISE EXCEPTION '_rlbk_groups_step1: No mark % exists for group %.', v_mark, v_groupNames[v_i]; END IF; -- ... and this mark is ACTIVE SELECT mark_state INTO v_markState FROM emaj.emaj_mark WHERE mark_group = v_groupNames[v_i] AND mark_name = v_markName; IF v_markState <> 'ACTIVE' THEN RAISE EXCEPTION '_rlbk_groups_step1: mark % for group % is not in ACTIVE state.', v_markName, v_groupNames[v_i]; END IF; END LOOP; -- get the mark timestamp and check it is the same for all groups of the array SELECT count(distinct emaj._get_mark_datetime(group_name,v_mark)) INTO v_cpt FROM emaj.emaj_group WHERE group_name = ANY (v_groupNames); IF v_cpt > 1 THEN RAISE EXCEPTION '_rlbk_groups_step1: Mark % does not represent the same point in time for all groups.', v_mark; END IF; -- get the mark timestamp for the 1st group (as we know this timestamp is the same for all groups of the array) SELECT emaj._get_mark_datetime(v_groupNames[1],v_mark) INTO v_timestampMark; -- insert begin in the history IF v_unloggedRlbk THEN v_msg = 'Unlogged'; ELSE v_msg = 'Logged'; END IF; INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES (CASE WHEN v_multiGroup THEN 'ROLLBACK_GROUPS' ELSE 'ROLLBACK_GROUP' END, 'BEGIN', array_to_string(v_groupNames,','), v_msg || ' rollback to mark ' || v_mark || ' [' || v_timestampMark || ']'); -- get the total number of tables for these groups SELECT sum(group_nb_table) INTO v_nbTblInGroup FROM emaj.emaj_group WHERE group_name = ANY (v_groupNames) ; -- issue warnings in case of foreign keys with tables outside the groups PERFORM emaj._check_fk_groups(v_groupNames); -- create sessions, using the number of sessions requested by the caller -- session id for sequences will remain NULL -- initialisation -- accumulated counters of number of log rows to rollback for each parallel session FOR v_session IN 1 .. v_nbSession LOOP v_sessionLoad [v_session] = 0; END LOOP; FOR v_i in 1 .. array_upper(v_groupNames,1) LOOP -- fkey table DELETE FROM emaj.emaj_fk WHERE v_groupNames[v_i] = ANY (fk_groups); -- relation table: for each group, session set to NULL and -- numbers of log rows computed by emaj_log_stat_group function UPDATE emaj.emaj_relation SET rel_session = NULL, rel_rows = stat_rows FROM emaj.emaj_log_stat_group (v_groupNames[v_i], v_mark, NULL) stat WHERE rel_group = v_groupNames[v_i] AND rel_group = stat_group AND rel_schema = stat_schema AND rel_tblseq = stat_table; END LOOP; -- count the number of tables that have no update to rollback SELECT count(*) INTO v_nbUnchangedTbl FROM emaj.emaj_relation WHERE rel_group = ANY (v_groupNames) AND rel_rows = 0; -- allocate tables with rows to rollback to sessions starting with the heaviest to rollback tables -- as reported by emaj_log_stat_group function FOR r_tbl IN SELECT * FROM emaj.emaj_relation WHERE rel_group = ANY (v_groupNames) AND rel_kind = 'r' ORDER BY rel_rows DESC LOOP -- is the table already allocated to a session (it may have been already allocated because of a fkey link) ? PERFORM 1 FROM emaj.emaj_relation WHERE rel_group = ANY (v_groupNames) AND rel_schema = r_tbl.rel_schema AND rel_tblseq = r_tbl.rel_tblseq AND rel_session IS NULL; -- no, IF FOUND THEN -- compute the least loaded session v_minSession=1; v_minRows = v_sessionLoad [1]; FOR v_session IN 2 .. v_nbSession LOOP IF v_sessionLoad [v_session] < v_minRows THEN v_minSession = v_session; v_minRows = v_sessionLoad [v_session]; END IF; END LOOP; -- allocate the table to the session, with all other tables linked by foreign key constraints v_sessionLoad [v_minSession] = v_sessionLoad [v_minSession] + emaj._rlbk_groups_set_session(v_groupNames, r_tbl.rel_schema, r_tbl.rel_tblseq, v_minSession, r_tbl.rel_rows); END IF; END LOOP; RETURN v_nbTblInGroup - v_nbUnchangedTbl; END; $_rlbk_groups_step1$; CREATE or REPLACE FUNCTION emaj._rlbk_groups_set_session(v_groupNames TEXT[], v_schema TEXT, v_table TEXT, v_session INT, v_rows BIGINT) RETURNS BIGINT LANGUAGE plpgsql AS $_rlbk_groups_set_session$ -- This function updates the emaj_relation table and set the predefined session id for one table. -- It also looks for all tables that are linked to this table by foreign keys to force them to be allocated to the same session. -- As those linked table can also be linked to other tables by other foreign keys, the function has to be recursiley called. -- The function returns the accumulated number of rows contained into all log tables of these linked by foreign keys tables. DECLARE v_cumRows BIGINT; -- accumulate the number of rows of the related log tables ; will be returned by the function v_fullTableName TEXT; r_tbl RECORD; BEGIN v_cumRows=v_rows; -- first set the session of the emaj_relation table for this application table UPDATE emaj.emaj_relation SET rel_session = v_session WHERE rel_group = ANY (v_groupNames) AND rel_schema = v_schema AND rel_tblseq = v_table; -- then look for other application tables linked by foreign key relationships v_fullTableName := quote_ident(v_schema) || '.' || quote_ident(v_table); FOR r_tbl IN SELECT rel_schema, rel_tblseq, rel_rows FROM emaj.emaj_relation WHERE rel_group = ANY (v_groupNames) AND rel_session IS NULL -- not yet allocated AND (rel_schema, rel_tblseq) IN ( -- list of (schema,table) linked to the original table by foreign keys SELECT nspname, relname FROM pg_constraint, pg_class t, pg_namespace n WHERE contype = 'f' AND confrelid = v_fullTableName::regclass AND t.oid = conrelid AND relnamespace = n.oid UNION SELECT nspname, relname FROM pg_constraint, pg_class t, pg_namespace n WHERE contype = 'f' AND conrelid = v_fullTableName::regclass AND t.oid = confrelid AND relnamespace = n.oid ) LOOP -- recursive call to allocate these linked tables to the same session id and get the accumulated number of rows to rollback SELECT v_cumRows + emaj._rlbk_groups_set_session(v_groupNames, r_tbl.rel_schema, r_tbl.rel_tblseq, v_session, r_tbl.rel_rows) INTO v_cumRows; END LOOP; RETURN v_cumRows; END; $_rlbk_groups_set_session$; CREATE or REPLACE FUNCTION emaj._rlbk_groups_step2(v_groupNames TEXT[], v_session INT, v_multiGroup BOOLEAN) RETURNS void LANGUAGE plpgsql AS $_rlbk_groups_step2$ -- This is the second step of a rollback group processing. It just locks the tables for a session. DECLARE v_nbRetry SMALLINT := 0; v_ok BOOLEAN := false; v_nbTbl INT; r_tblsq RECORD; BEGIN -- insert begin in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES (CASE WHEN v_multiGroup THEN 'LOCK_SESSIONS' ELSE 'LOCK_SESSION' END, 'BEGIN', array_to_string(v_groupNames,','), 'Session #' || v_session); -- acquire lock on all tables -- in case of deadlock, retry up to 5 times WHILE NOT v_ok AND v_nbRetry < 5 LOOP BEGIN v_nbTbl = 0; -- scan all tables of the session FOR r_tblsq IN SELECT rel_priority, rel_schema, rel_tblseq FROM emaj.emaj_relation WHERE rel_group = ANY (v_groupNames) AND rel_session = v_session AND rel_kind = 'r' ORDER BY rel_priority, rel_schema, rel_tblseq LOOP -- lock each table EXECUTE 'LOCK TABLE ' || quote_ident(r_tblsq.rel_schema) || '.' || quote_ident(r_tblsq.rel_tblseq); v_nbTbl = v_nbTbl + 1; END LOOP; -- ok, all tables locked v_ok = true; EXCEPTION WHEN deadlock_detected THEN v_nbRetry = v_nbRetry + 1; RAISE NOTICE '_rlbk_group_step2: a deadlock has been trapped while locking tables for groups %.', array_to_string(v_groupNames,','); END; END LOOP; IF NOT v_ok THEN RAISE EXCEPTION '_rlbk_group_step2: too many (5) deadlocks encountered while locking tables of group %.',v_groupName; END IF; -- insert end in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES (CASE WHEN v_multiGroup THEN 'LOCK_SESSIONS' ELSE 'LOCK_SESSION' END, 'END', array_to_string(v_groupNames,','), 'Session #' || v_session || ' : ' || v_nbTbl || ' tables locked, ' || v_nbRetry || ' deadlock(s)'); RETURN; END; $_rlbk_groups_step2$; CREATE or REPLACE FUNCTION emaj._rlbk_groups_step3(v_groupNames TEXT[], v_mark TEXT, v_unloggedRlbk BOOLEAN, v_multiGroup BOOLEAN) RETURNS VOID LANGUAGE plpgsql AS $_rlbk_groups_step3$ -- This is the third step of a rollback group processing. -- For logged rollback, it sets a mark that materialize the point in time just before the tables rollback. -- All concerned tables are already locked. DECLARE v_realMark TEXT; v_markName TEXT; BEGIN IF NOT v_unloggedRlbk THEN -- If rollback is "logged" rollback, build a mark name with the pattern: -- 'RLBK__%_START', where % represents the current time -- Get the real mark name (using first supplied group name, and check that all groups -- can use the same name has already been done at step 1) SELECT emaj._get_mark_name(v_groupNames[1],v_mark) INTO v_realMark; IF v_realMark IS NULL THEN RAISE EXCEPTION '_rlbk_groups_step3: Internal error - mark % not found for group %.', v_mark, v_groupNames[1]; END IF; -- compute the generated mark name v_markName = 'RLBK_' || v_realMark || '_' || to_char(current_timestamp, 'HH24.MI.SS.MS') || '_START'; -- ... and set it PERFORM emaj._set_mark_groups(v_groupNames, v_markName, v_multiGroup, true); END IF; RETURN; END; $_rlbk_groups_step3$; CREATE or REPLACE FUNCTION emaj._rlbk_groups_step4(v_groupNames TEXT[], v_session INT) RETURNS void LANGUAGE plpgsql SECURITY DEFINER AS $_rlbk_groups_step4$ -- This is the fourth step of a rollback group processing. It drops all foreign keys involved in a rollback session. -- Before dropping, it records them to be able to recreate them at step 5. -- The function is defined as SECURITY DEFINER so that emaj_adm role can use it even if he is not the owner of application tables. DECLARE r_fk RECORD; BEGIN -- record and drop the foreign keys involved in all tables of the group, if any INSERT INTO emaj.emaj_fk (fk_groups, fk_session, fk_name, fk_schema, fk_table, fk_def) -- record the foreign keys of the session's tables SELECT v_groupNames, v_session, c.conname, n.nspname, t.relname, pg_get_constraintdef(c.oid) FROM pg_constraint c, pg_namespace n, pg_class t, emaj.emaj_relation r WHERE c.contype = 'f' -- FK constraints only AND c.conrelid = t.oid AND t.relnamespace = n.oid -- joins for table and namespace AND n.nspname = r.rel_schema AND t.relname = r.rel_tblseq -- join on group table AND r.rel_group = ANY (v_groupNames) AND r.rel_session = v_session UNION -- and the foreign keys referencing the session's tables SELECT v_groupNames, v_session, c.conname, n.nspname, t.relname, pg_get_constraintdef(c.oid) FROM pg_constraint c, pg_namespace n, pg_class t, pg_namespace rn, pg_class rt, emaj.emaj_relation r WHERE c.contype = 'f' -- FK constraints only AND c.conrelid = t.oid AND t.relnamespace = n.oid -- joins for table and namespace AND c.confrelid = rt.oid AND rt.relnamespace = rn.oid -- joins for referenced table and namespace AND rn.nspname = r.rel_schema AND rt.relname = r.rel_tblseq -- join on group table AND r.rel_group = ANY (v_groupNames) AND r.rel_session = v_session; -- and drop all these foreign keys FOR r_fk IN SELECT fk_schema, fk_table, fk_name FROM emaj.emaj_fk WHERE fk_groups = v_groupNames AND fk_session = v_session ORDER BY fk_schema, fk_table, fk_name LOOP RAISE NOTICE '_rlbk_groups_step4: groups (%), session #% -> foreign key constraint % dropped for table %.%.', array_to_string(v_groupNames,','), v_session, r_fk.fk_name, r_fk.fk_schema, r_fk.fk_table; EXECUTE 'ALTER TABLE ' || quote_ident(r_fk.fk_schema) || '.' || quote_ident(r_fk.fk_table) || ' DROP CONSTRAINT ' || quote_ident(r_fk.fk_name); END LOOP; END; $_rlbk_groups_step4$; CREATE or REPLACE FUNCTION emaj._rlbk_groups_step5(v_groupNames TEXT[], v_mark TEXT, v_session INT, v_unloggedRlbk BOOLEAN, v_deleteLog BOOLEAN) RETURNS INT LANGUAGE plpgsql AS $_rlbk_groups_step5$ -- This is the fifth step of a rollback group processing. It performs the rollback of all tables of a session. DECLARE v_nbTbl INT := 0; v_timestampMark TIMESTAMPTZ; v_lastGlobalSeq BIGINT; v_lastSequenceId BIGINT; v_lastSeqHoleId BIGINT; BEGIN -- fetch the timestamp mark again SELECT emaj._get_mark_datetime(v_groupNames[1],v_mark) INTO v_timestampMark; IF v_timestampMark IS NULL THEN RAISE EXCEPTION '_rlbk_groups_step5: Internal error - mark % not found for group %.', v_mark, v_groupNames[1]; END IF; -- fetch the last global sequence and the last id values of emaj_sequence and emaj_seq_hole tables at set mark time SELECT mark_global_seq, mark_last_sequence_id, mark_last_seq_hole_id INTO v_lastGlobalSeq, v_lastSequenceId, v_lastSeqHoleId FROM emaj.emaj_mark WHERE mark_group = v_groupNames[1] AND mark_name = emaj._get_mark_name(v_groupNames[1],v_mark); -- rollback all tables of the session, having rows to rollback, in priority order (sequences are processed later) -- (for the _rlbk_tbl() function call, the disableTrigger boolean always equals unloggedRlbk boolean) PERFORM emaj._rlbk_tbl(rel_schema, rel_tblseq, v_lastGlobalSeq, v_timestampMark, v_unloggedRlbk, v_deleteLog, v_lastSequenceId, v_lastSeqHoleId) FROM (SELECT rel_priority, rel_schema, rel_tblseq FROM emaj.emaj_relation WHERE rel_group = ANY (v_groupNames) AND rel_session = v_session AND rel_kind = 'r' AND rel_rows > 0 ORDER BY rel_priority, rel_schema, rel_tblseq) as t; -- and return the number of processed tables GET DIAGNOSTICS v_nbTbl = ROW_COUNT; RETURN v_nbTbl; END; $_rlbk_groups_step5$; CREATE or REPLACE FUNCTION emaj._rlbk_groups_step6(v_groupNames TEXT[], v_session INT) RETURNS void LANGUAGE plpgsql SECURITY DEFINER AS $_rlbk_groups_step6$ -- This is the sixth step of a rollback group processing. It recreates the previously deleted foreign keys. -- The function is defined as SECURITY DEFINER so that emaj_adm role can use it even if he is not the owner of application tables. DECLARE v_ts_start TIMESTAMP; v_ts_end TIMESTAMP; r_fk RECORD; BEGIN FOR r_fk IN -- get all recorded fk plus the number of rows of the related table as estimated by postgresql (pg_class.reltuples) SELECT fk_schema, fk_table, fk_name, fk_def, pg_class.reltuples FROM emaj.emaj_fk, pg_namespace, pg_class WHERE fk_groups = v_groupNames AND fk_session = v_session AND -- restrictions pg_namespace.oid = relnamespace AND relname = fk_table AND nspname = fk_schema -- joins ORDER BY fk_schema, fk_table, fk_name LOOP -- record the time at the alter table start SELECT clock_timestamp() INTO v_ts_start; -- recreate the foreign key EXECUTE 'ALTER TABLE ' || quote_ident(r_fk.fk_schema) || '.' || quote_ident(r_fk.fk_table) || ' ADD CONSTRAINT ' || quote_ident(r_fk.fk_name) || ' ' || r_fk.fk_def; -- record the time after the alter table and insert FK creation duration into the emaj_rlbk_stat table SELECT clock_timestamp() INTO v_ts_end; INSERT INTO emaj.emaj_rlbk_stat (rlbk_operation, rlbk_schema, rlbk_tbl_fk, rlbk_datetime, rlbk_nb_rows, rlbk_duration) VALUES ('add_fk', r_fk.fk_schema, r_fk.fk_name, v_ts_start, r_fk.reltuples, v_ts_end - v_ts_start); -- send a message about the fk creation completion RAISE NOTICE '_rlbk_group_step6: groups (%), session #% -> foreign key constraint % recreated for table %.%.', array_to_string(v_groupNames,','), v_session, r_fk.fk_name, r_fk.fk_schema, r_fk.fk_table; END LOOP; RETURN; END; $_rlbk_groups_step6$; CREATE or REPLACE FUNCTION emaj._rlbk_groups_step7(v_groupNames TEXT[], v_mark TEXT, v_nbTb INT, v_unloggedRlbk BOOLEAN, v_deleteLog BOOLEAN, v_multiGroup BOOLEAN) RETURNS INT LANGUAGE plpgsql AS $_rlbk_groups_step7$ -- This is the last step of a rollback group processing. It : -- - deletes the marks that are no longer available, -- - rollbacks all sequences of the group. -- It returns the number of processed sequences. DECLARE v_realMark TEXT; v_markId BIGINT; v_timestampMark TIMESTAMPTZ; v_lastSequenceId BIGINT; v_nbSeq INT; v_markName TEXT; BEGIN -- get the real mark name SELECT emaj._get_mark_name(v_groupNames[1],v_mark) INTO v_realMark; IF NOT FOUND OR v_realMark IS NULL THEN RAISE EXCEPTION '_rlbk_groups_step7: Internal error - mark % not found for group %.', v_mark, v_groupNames[1]; END IF; -- if "unlogged" rollback, delete all marks later than the now rollbacked mark IF v_unloggedRlbk THEN -- get the highest mark id of the mark used for rollback, for all groups SELECT max(mark_id) INTO v_markId FROM emaj.emaj_mark WHERE mark_group = ANY (v_groupNames) AND mark_name = v_realMark; -- log in the history the name of all marks that must be deleted due to the rollback INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) SELECT CASE WHEN v_multiGroup THEN 'ROLLBACK_GROUPS' ELSE 'ROLLBACK_GROUP' END, 'MARK DELETED', mark_group, 'mark ' || mark_name || ' has been deleted' FROM emaj.emaj_mark WHERE mark_group = ANY (v_groupNames) AND mark_id > v_markId ORDER BY mark_id; -- delete these useless marks (the related sequences have been already deleted by rollback functions) DELETE FROM emaj.emaj_mark WHERE mark_group = ANY (v_groupNames) AND mark_id > v_markId; -- and finaly reset the mark_log_rows_before_next column for the new last mark UPDATE emaj.emaj_mark set mark_log_rows_before_next = NULL WHERE mark_group = ANY (v_groupNames) AND (mark_group, mark_id) IN -- select only last mark of each concerned group (SELECT mark_group, MAX(mark_id) FROM emaj.emaj_mark WHERE mark_group = ANY (v_groupNames) AND mark_state = 'ACTIVE' GROUP BY mark_group); END IF; -- rollback the application sequences belonging to the groups -- warning, this operation is not transaction safe (that's why it is placed at the end of the operation)! -- get the mark timestamp and last sequence id for the 1st group SELECT mark_datetime, mark_last_sequence_id INTO v_timestampMark, v_lastSequenceId FROM emaj.emaj_mark WHERE mark_group = v_groupNames[1] AND mark_name = v_realMark; -- and rollback PERFORM emaj._rlbk_seq(rel_schema, rel_tblseq, v_timestampMark, v_deleteLog, v_lastSequenceId) FROM emaj.emaj_relation WHERE rel_group = ANY (v_groupNames) AND rel_kind = 'S' ORDER BY rel_priority, rel_schema, rel_tblseq; GET DIAGNOSTICS v_nbSeq = ROW_COUNT; -- if rollback is "logged" rollback, automaticaly set a mark representing the tables state just after the rollback. -- this mark is named 'RLBK__%_DONE', where % represents the current time IF NOT v_unloggedRlbk THEN -- get the mark name set at the beginning of the rollback operation (i.e. the last set mark) SELECT mark_name INTO v_markName FROM emaj.emaj_mark WHERE mark_group = v_groupNames[1] ORDER BY mark_id DESC LIMIT 1; IF NOT FOUND OR v_markName NOT LIKE 'RLBK%START' THEN RAISE EXCEPTION '_rlbk_groups_step7: Internal error - rollback start mark not found for group %.', v_groupNames[1]; END IF; -- compute the mark name that ends the rollback operation, replacing the '_START' suffix of the rollback start mark by '_DONE' v_markName = substring(v_markName FROM '(.*)_START$') || '_DONE'; -- ... and set it PERFORM emaj._set_mark_groups(v_groupNames, v_markName, v_multiGroup, true); END IF; -- insert end in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES (CASE WHEN v_multiGroup THEN 'ROLLBACK_GROUPS' ELSE 'ROLLBACK_GROUP' END, 'END', array_to_string(v_groupNames,','), v_nbTb || ' tables and ' || v_nbSeq || ' sequences effectively processed'); RETURN v_nbSeq; END; $_rlbk_groups_step7$; CREATE or REPLACE FUNCTION emaj.emaj_reset_group(v_groupName TEXT) RETURNS INT LANGUAGE plpgsql SECURITY DEFINER AS $emaj_reset_group$ -- This function empties the log tables for all tables of a group and deletes the sequences saves -- It calls the emaj_rst_group function to do the job -- Input: group name -- Output: number of processed tables -- The function is defined as SECURITY DEFINER so that emaj_adm role can use it even if he is not the owner of application tables. DECLARE v_groupState TEXT; v_nbTb INT := 0; BEGIN -- insert begin in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object) VALUES ('RESET_GROUP', 'BEGIN', v_groupName); -- check that the group is recorded in emaj_group table SELECT group_state INTO v_groupState FROM emaj.emaj_group WHERE group_name = v_groupName FOR UPDATE; IF NOT FOUND THEN RAISE EXCEPTION 'emaj_reset_group: group % has not been created.', v_groupName; END IF; -- check that the group is in IDLE state (i.e. not in LOGGING) state IF v_groupState <> 'IDLE' THEN RAISE EXCEPTION 'emaj_reset_group: Group % cannot be reset because it is not in idle state. An emaj_stop_group function must be previously executed.', v_groupName; END IF; -- perform the reset operation SELECT emaj._rst_group(v_groupName) INTO v_nbTb; -- insert end in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('RESET_GROUP', 'END', v_groupName, v_nbTb || ' tables/sequences processed'); RETURN v_nbTb; END; $emaj_reset_group$; COMMENT ON FUNCTION emaj.emaj_reset_group(TEXT) IS $$Resets all log tables content of a stopped E-Maj group.$$; CREATE or REPLACE FUNCTION emaj._rst_group(v_groupName TEXT) RETURNS INT LANGUAGE plpgsql AS $_rst_group$ -- This function empties the log tables for all tables of a group, using a TRUNCATE, and deletes the sequences saves -- It is called by both emaj_reset_group and emaj_start_group functions -- Input: group name -- Output: number of processed tables -- There is no check of the group state DECLARE v_nbTb INT := 0; v_emajSchema TEXT := 'emaj'; v_logTableName TEXT; v_seqName TEXT; r_tblsq RECORD; BEGIN -- delete all marks for the group from the emaj_mark table DELETE FROM emaj.emaj_mark WHERE mark_group = v_groupName; -- delete all sequence holes for the tables of the group DELETE FROM emaj.emaj_seq_hole USING emaj.emaj_relation WHERE rel_group = v_groupName AND rel_kind = 'r' AND rel_schema = sqhl_schema AND rel_tblseq = sqhl_table; -- then, truncate log tables FOR r_tblsq IN SELECT rel_schema, rel_tblseq, rel_kind FROM emaj.emaj_relation WHERE rel_group = v_groupName ORDER BY rel_priority, rel_schema, rel_tblseq LOOP IF r_tblsq.rel_kind = 'r' THEN -- if it is a table, -- truncate the related log table v_logTableName := quote_ident(v_emajSchema) || '.' || quote_ident(r_tblsq.rel_schema || '_' || r_tblsq.rel_tblseq || '_log'); EXECUTE 'TRUNCATE ' || v_logTableName; -- delete rows from emaj_sequence related to the associated log sequence v_seqName := emaj._build_log_seq_name(r_tblsq.rel_schema, r_tblsq.rel_tblseq); DELETE FROM emaj.emaj_sequence WHERE sequ_name = v_seqName; -- and reset the log sequence PERFORM setval(quote_ident(v_emajSchema) || '.' || quote_ident(v_seqName), 1, false); ELSEIF r_tblsq.rel_kind = 'S' THEN -- if it is a sequence, delete all related data from emaj_sequence table PERFORM emaj._drop_seq (r_tblsq.rel_schema, r_tblsq.rel_tblseq); END IF; v_nbTb = v_nbTb + 1; END LOOP; IF v_nbTb = 0 THEN RAISE EXCEPTION '_rst_group: Internal error (Group % is empty).', v_groupName; END IF; RETURN v_nbTb; END; $_rst_group$; CREATE or REPLACE FUNCTION emaj.emaj_log_stat_group(v_groupName TEXT, v_firstMark TEXT, v_lastMark TEXT) RETURNS SETOF emaj.emaj_log_stat_type LANGUAGE plpgsql AS $emaj_log_stat_group$ -- This function returns statistics on row updates executed between 2 marks or between a mark and the current situation. -- It is used to quickly get simple statistics of updates logged between 2 marks (i.e. for one or several processing) -- It is also used to estimate the cost of a rollback to a specified mark -- These statistics are computed using the serial id of log tables and holes is sequences recorded into emaj_seq_hole at rollback time -- Input: group name, the 2 mark names defining a range -- a NULL value or an empty string as first_mark indicates the first recorded mark -- a NULL value or an empty string as last_mark indicates the current situation -- Use a NULL or an empty string as last_mark to know the number of rows to rollback to reach the mark specified by the first_mark parameter. -- The keyword 'EMAJ_LAST_MARK' can be used as first or last mark to specify the last set mark. -- Output: table of log rows by table (including tables with 0 rows to rollback) DECLARE v_groupState TEXT; v_emajSchema TEXT := 'emaj'; v_realFirstMark TEXT; v_realLastMark TEXT; v_firstMarkId BIGINT; v_lastMarkId BIGINT; v_tsFirstMark TIMESTAMPTZ; v_tsLastMark TIMESTAMPTZ; v_firstLastSeqHoleId BIGINT; v_lastLastSeqHoleId BIGINT; v_fullSeqName TEXT; v_beginLastValue BIGINT; v_endLastValue BIGINT; v_sumHole BIGINT; r_tblsq RECORD; r_stat RECORD; BEGIN -- check that the group is recorded in emaj_group table SELECT group_state INTO v_groupState FROM emaj.emaj_group WHERE group_name = v_groupName; IF NOT FOUND THEN RAISE EXCEPTION 'emaj_log_stat_group: group % has not been created.', v_groupName; END IF; -- if first mark is NULL or empty, retrieve the name, timestamp and last sequ_hole id of the first recorded mark for the group IF v_firstMark IS NULL OR v_firstMark = '' THEN SELECT mark_id, mark_name, mark_datetime, mark_last_seq_hole_id INTO v_firstMarkId, v_realFirstMark, v_tsFirstMark, v_firstLastSeqHoleId FROM emaj.emaj_mark WHERE mark_group = v_groupName ORDER BY mark_id LIMIT 1; IF NOT FOUND THEN RAISE EXCEPTION 'emaj_log_stat_group: No initial mark can be found for group %.', v_groupName; END IF; ELSE -- else, check and retrieve the name, timestamp and last sequ_hole id of the supplied first mark for the group SELECT emaj._get_mark_name(v_groupName,v_firstMark) INTO v_realFirstMark; IF v_realFirstMark IS NULL THEN RAISE EXCEPTION 'emaj_log_stat_group: Start mark % is unknown for group %.', v_firstMark, v_groupName; END IF; SELECT mark_id, mark_datetime, mark_last_seq_hole_id INTO v_firstMarkId, v_tsFirstMark, v_firstLastSeqHoleId FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_name = v_realFirstMark; END IF; -- if last mark is NULL or empty, there is no timestamp to register IF v_lastMark IS NULL OR v_lastMark = '' THEN v_lastMarkId = NULL; v_tsLastMark = NULL; v_lastLastSeqHoleId = NULL; ELSE -- else, check and retrieve the name, timestamp and last sequ_hole id of the supplied end mark for the group SELECT emaj._get_mark_name(v_groupName,v_lastMark) INTO v_realLastMark; IF v_realLastMark IS NULL THEN RAISE EXCEPTION 'emaj_log_stat_group: End mark % is unknown for group %.', v_lastMark, v_groupName; END IF; SELECT mark_id, mark_datetime, mark_last_seq_hole_id INTO v_lastMarkId, v_tsLastMark, v_lastLastSeqHoleId FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_name = v_realLastMark; END IF; -- check that the first_mark < end_mark IF v_lastMarkId IS NOT NULL AND v_firstMarkId > v_lastMarkId THEN RAISE EXCEPTION 'emaj_log_stat_group: mark id for % (% = %) is greater than mark id for % (% = %).', v_firstMark, v_firstMarkId, v_tsFirstMark, v_lastMark, v_lastMarkId, v_tsLastMark; END IF; -- for each table of the emaj_relation table, get the number of log rows and return the statistic FOR r_tblsq IN SELECT rel_priority, rel_schema, rel_tblseq, emaj._log_stat_table(rel_schema, rel_tblseq, v_tsFirstMark, v_tsLastMark, v_firstLastSeqHoleId, v_lastLastSeqHoleId) AS nb_rows FROM emaj.emaj_relation WHERE rel_group = v_groupName AND rel_kind = 'r' ORDER BY rel_priority, rel_schema, rel_tblseq LOOP SELECT v_groupName, r_tblsq.rel_schema, r_tblsq.rel_tblseq, r_tblsq.nb_rows INTO r_stat; RETURN NEXT r_stat; END LOOP; RETURN; END; $emaj_log_stat_group$; COMMENT ON FUNCTION emaj.emaj_log_stat_group(TEXT,TEXT,TEXT) IS $$Returns global statistics about logged events for an E-Maj group between 2 marks.$$; CREATE or REPLACE FUNCTION emaj.emaj_detailed_log_stat_group(v_groupName TEXT, v_firstMark TEXT, v_lastMark TEXT) RETURNS SETOF emaj.emaj_detailed_log_stat_type LANGUAGE plpgsql AS $emaj_detailed_log_stat_group$ -- This function returns statistics on row updates executed between 2 marks as viewed through the log tables -- It provides more information than emaj_log_stat_group but it needs to scan log tables in order to provide these data. -- So the response time may be much longer. -- Input: group name, the 2 marks names defining a range -- a NULL value or an empty string as first_mark indicates the first recorded mark -- a NULL value or an empty string as last_mark indicates the current situation -- The keyword 'EMAJ_LAST_MARK' can be used as first or last mark to specify the last set mark. -- Output: table of updates by user and table DECLARE v_groupState TEXT; v_emajSchema TEXT := 'emaj'; v_realFirstMark TEXT; v_realLastMark TEXT; v_firstMarkId BIGINT; v_lastMarkId BIGINT; v_tsFirstMark TIMESTAMPTZ; v_tsLastMark TIMESTAMPTZ; v_firstEmajGid BIGINT; v_lastEmajGid BIGINT; v_logTableName TEXT; v_stmt TEXT; r_tblsq RECORD; r_stat RECORD; BEGIN -- check that the group is recorded in emaj_group table SELECT group_state INTO v_groupState FROM emaj.emaj_group WHERE group_name = v_groupName; IF NOT FOUND THEN RAISE EXCEPTION 'emaj_detailed_log_stat_group: group % has not been created.', v_groupName; END IF; -- catch the timestamp of the first mark IF v_firstMark IS NOT NULL AND v_firstMark <> '' THEN -- check and retrieve the global sequence value and the timestamp of the start mark for the group SELECT emaj._get_mark_name(v_groupName,v_firstMark) INTO v_realFirstMark; IF v_realFirstMark IS NULL THEN RAISE EXCEPTION 'emaj_detailed_log_stat_group: Start mark % is unknown for group %.', v_firstMark, v_groupName; END IF; SELECT mark_id, mark_global_seq, mark_datetime INTO v_firstMarkId, v_firstEmajGid, v_tsFirstMark FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_name = v_realFirstMark; END IF; -- catch the timestamp of the last mark IF v_lastMark IS NOT NULL AND v_lastMark <> '' THEN -- else, check and retrieve the global sequence value and the timestamp of the end mark for the group SELECT emaj._get_mark_name(v_groupName,v_lastMark) INTO v_realLastMark; IF v_realLastMark IS NULL THEN RAISE EXCEPTION 'emaj_detailed_log_stat_group: End mark % is unknown for group %.', v_lastMark, v_groupName; END IF; SELECT mark_id, mark_global_seq, mark_datetime INTO v_lastMarkId, v_lastEmajGid, v_tsLastMark FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_name = v_realLastMark; END IF; -- check that the first_mark < end_mark IF v_realFirstMark IS NOT NULL AND v_realLastMark IS NOT NULL AND v_firstMarkId > v_lastMarkId THEN RAISE EXCEPTION 'emaj_detailed_log_stat_group: mark id for % (% = %) is greater than mark id for % (% = %).', v_realFirstMark, v_firstMarkId, v_tsFirstMark, v_realLastMark, v_lastMarkId, v_tsLastMark; END IF; -- for each table of the emaj_relation table FOR r_tblsq IN SELECT rel_priority, rel_schema, rel_tblseq, rel_kind FROM emaj.emaj_relation WHERE rel_group = v_groupName ORDER BY rel_priority, rel_schema, rel_tblseq LOOP IF r_tblsq.rel_kind = 'r' THEN -- if it is a table, count the number of operations per type (INSERT, UPDATE and DELETE) and role -- compute the log table name and its sequence name for this table v_logTableName := quote_ident(v_emajSchema) || '.' || quote_ident(r_tblsq.rel_schema || '_' || r_tblsq.rel_tblseq || '_log'); -- prepare and execute the statement v_stmt= 'SELECT ' || quote_literal(v_groupName) || '::TEXT as emaj_group,' || ' ' || quote_literal(r_tblsq.rel_schema) || '::TEXT as emaj_schema,' || ' ' || quote_literal(r_tblsq.rel_tblseq) || '::TEXT as emaj_table,' || ' emaj_user,' || ' CASE WHEN emaj_verb = ''INS'' THEN ''INSERT''' || ' WHEN emaj_verb = ''UPD'' THEN ''UPDATE''' || ' WHEN emaj_verb = ''DEL'' THEN ''DELETE''' || ' ELSE ''?'' END::VARCHAR(6) as emaj_verb,' || ' count(*) as emaj_rows' || ' FROM ' || v_logTableName || ' WHERE NOT (emaj_verb = ''UPD'' AND emaj_tuple = ''OLD'')'; IF v_firstMark IS NOT NULL AND v_firstMark <> '' THEN v_stmt = v_stmt || ' AND emaj_gid > '|| v_firstEmajGid ; END IF; IF v_lastMark IS NOT NULL AND v_lastMark <> '' THEN v_stmt = v_stmt || ' AND emaj_gid <= '|| v_lastEmajGid ; END IF; v_stmt = v_stmt || ' GROUP BY emaj_group, emaj_schema, emaj_table, emaj_user, emaj_verb' || ' ORDER BY emaj_user, emaj_verb'; FOR r_stat IN EXECUTE v_stmt LOOP RETURN NEXT r_stat; END LOOP; END IF; END LOOP; RETURN; END; $emaj_detailed_log_stat_group$; COMMENT ON FUNCTION emaj.emaj_detailed_log_stat_group(TEXT,TEXT,TEXT) IS $$Returns detailed statistics about logged events for an E-Maj group between 2 marks.$$; CREATE or REPLACE FUNCTION emaj.emaj_estimate_rollback_duration(v_groupName TEXT, v_mark TEXT) RETURNS interval LANGUAGE plpgsql AS $emaj_estimate_rollback_duration$ -- This function computes an approximate duration of a rollback to a predefined mark for a group. -- It takes into account the content of emaj_rollback_stat table filled by previous rollback operations. -- It also uses several parameters from emaj_param table. -- "Logged" and "Unlogged" rollback durations are estimated with the same algorithm. (the cost of log insertion -- for logged rollback balances the cost of log deletion of unlogged rollback) -- Input: group name, the mark name of the rollback operation -- Output: the approximate duration that the rollback would need as time interval DECLARE v_nbTblSeq INTEGER; v_markName TEXT; v_markState TEXT; v_estim_duration INTERVAL; v_avg_row_rlbk INTERVAL; v_avg_row_del_log INTERVAL; v_fixed_table_rlbk INTERVAL; v_fixed_table_with_rlbk INTERVAL; v_estim INTERVAL; r_tblsq RECORD; r_fkey RECORD; BEGIN -- check that the group is recorded in emaj_group table and get the number of tables and sequences SELECT group_nb_table + group_nb_sequence INTO v_nbTblSeq FROM emaj.emaj_group WHERE group_name = v_groupName and group_state = 'LOGGING'; IF NOT FOUND THEN RAISE EXCEPTION 'emaj_estimate_rollback_duration: group % has not been created or is not in LOGGING state.', v_groupName; END IF; -- check the mark exists SELECT emaj._get_mark_name(v_groupName,v_mark) INTO v_markName; IF NOT FOUND OR v_markName IS NULL THEN RAISE EXCEPTION 'emaj_estimate_rollback_duration: no mark % exists for group %.', v_mark, v_groupName; END IF; -- check the mark is ACTIVE SELECT mark_state INTO v_markState FROM emaj.emaj_mark WHERE mark_group = v_groupName AND mark_name = v_markName; IF v_markState <> 'ACTIVE' THEN RAISE EXCEPTION 'emaj_estimate_rollback_duration: mark % for group % is not in ACTIVE state.', v_markName, v_groupName; END IF; -- get all needed duration parameters from emaj_param table, -- or get default values for rows that are not present in emaj_param table SELECT coalesce ((SELECT param_value_interval FROM emaj.emaj_param WHERE param_key = 'avg_row_rollback_duration'),'100 microsecond'::interval), coalesce ((SELECT param_value_interval FROM emaj.emaj_param WHERE param_key = 'avg_row_delete_log_duration'),'10 microsecond'::interval), coalesce ((SELECT param_value_interval FROM emaj.emaj_param WHERE param_key = 'fixed_table_rollback_duration'),'5 millisecond'::interval), coalesce ((SELECT param_value_interval FROM emaj.emaj_param WHERE param_key = 'fixed_table_with_rollback_duration'),'2.5 millisecond'::interval) INTO v_avg_row_rlbk, v_avg_row_del_log, v_fixed_table_rlbk, v_fixed_table_with_rlbk; -- compute the fixed cost for the group v_estim_duration = v_nbTblSeq * v_fixed_table_rlbk; -- -- walk through the list of tables with their number of rows to rollback as returned by the emaj_log_stat_group function -- -- for each table with content to rollback FOR r_tblsq IN SELECT stat_schema, stat_table, stat_rows FROM emaj.emaj_log_stat_group(v_groupName, v_mark, NULL) WHERE stat_rows > 0 LOOP -- -- compute the rollback duration estimate for the table -- -- first look at the previous rollback durations for the table and with similar rollback volume (same order of magnitude) SELECT sum(rlbk_duration) * r_tblsq.stat_rows / sum(rlbk_nb_rows) INTO v_estim FROM emaj.emaj_rlbk_stat WHERE rlbk_operation = 'rlbk' AND rlbk_nb_rows > 0 AND rlbk_schema = r_tblsq.stat_schema AND rlbk_tbl_fk = r_tblsq.stat_table AND rlbk_nb_rows / r_tblsq.stat_rows < 10 AND r_tblsq.stat_rows / rlbk_nb_rows < 10; IF v_estim IS NULL THEN -- if there is no previous rollback operation with similar volume, take statistics for the table with all available volumes SELECT sum(rlbk_duration) * r_tblsq.stat_rows / sum(rlbk_nb_rows) INTO v_estim FROM emaj.emaj_rlbk_stat WHERE rlbk_operation = 'rlbk' AND rlbk_nb_rows > 0 AND rlbk_schema = r_tblsq.stat_schema AND rlbk_tbl_fk = r_tblsq.stat_table; IF v_estim IS NULL THEN -- if there is no previous rollback operation, use the avg_row_rollback_duration from the emaj_param table v_estim = v_avg_row_rlbk * r_tblsq.stat_rows; END IF; END IF; v_estim_duration = v_estim_duration + v_fixed_table_with_rlbk + v_estim; -- -- compute the log rows delete duration for the table -- -- first look at the previous rollback durations for the table and with similar rollback volume (same order of magnitude) SELECT sum(rlbk_duration) * r_tblsq.stat_rows / sum(rlbk_nb_rows) INTO v_estim FROM emaj.emaj_rlbk_stat WHERE rlbk_operation = 'del_log' AND rlbk_nb_rows > 0 AND rlbk_schema = r_tblsq.stat_schema AND rlbk_tbl_fk = r_tblsq.stat_table AND rlbk_nb_rows / r_tblsq.stat_rows < 10 AND r_tblsq.stat_rows / rlbk_nb_rows < 10; IF v_estim IS NULL THEN -- if there is no previous rollback operation with similar volume, take statistics for the table with all available volumes SELECT sum(rlbk_duration) * r_tblsq.stat_rows / sum(rlbk_nb_rows) INTO v_estim FROM emaj.emaj_rlbk_stat WHERE rlbk_operation = 'del_log' AND rlbk_nb_rows > 0 AND rlbk_schema = r_tblsq.stat_schema AND rlbk_tbl_fk = r_tblsq.stat_table; IF v_estim IS NULL THEN -- if there is no previous rollback operation, use the avg_row_rollback_duration from the emaj_param table v_estim = v_avg_row_del_log * r_tblsq.stat_rows; END IF; END IF; v_estim_duration = v_estim_duration + v_estim; END LOOP; -- -- walk through the list of foreign key constraints concerned by the estimated rollback -- -- for each foreign key FOR r_fkey IN SELECT c.conname, n.nspname, t.relname, t.reltuples FROM pg_constraint c, pg_namespace n, pg_class t, emaj.emaj_log_stat_group(v_groupName, v_mark, NULL) s WHERE c.contype = 'f' -- FK constraints only AND stat_rows > 0 -- table to effectively rollback only AND c.conrelid = t.oid AND t.relnamespace = n.oid -- joins for table and namespace AND n.nspname = s.stat_schema AND t.relname = s.stat_table -- join on group tables to effectively rollback UNION -- and the foreign keys referenced by tables that are concerned by the rollback operation SELECT c.conname, n.nspname, t.relname, t.reltuples FROM pg_constraint c, pg_namespace n, pg_class t, pg_namespace rn, pg_class rt, emaj.emaj_log_stat_group(v_groupName, v_mark, NULL) s WHERE c.contype = 'f' -- FK constraints only AND stat_rows > 0 -- table to effectively rollback only AND c.conrelid = t.oid AND t.relnamespace = n.oid -- joins for table and namespace AND c.confrelid = rt.oid AND rt.relnamespace = rn.oid -- joins for referenced table and namespace AND rn.nspname = s.stat_schema AND rt.relname = s.stat_table -- join on group tables to effectively rollback LOOP -- estimate the recreation duration of a fkey IF r_fkey.reltuples = 0 THEN -- empty table (or table not analyzed) => duration = 0 v_estim = 0; ELSE -- non empty table and statistics (with at least one row) are available SELECT sum(rlbk_duration) * r_fkey.reltuples / sum(rlbk_nb_rows) INTO v_estim FROM emaj.emaj_rlbk_stat WHERE rlbk_operation = 'add_fk' AND rlbk_nb_rows > 0 AND rlbk_schema = r_fkey.nspname AND rlbk_tbl_fk = r_fkey.conname; IF v_estim IS NULL THEN -- non empty table, but no statistics with at least one row are available => take the last duration for this fkey, if any SELECT rlbk_duration INTO v_estim FROM emaj.emaj_rlbk_stat WHERE rlbk_operation = 'add_fk' AND rlbk_schema = r_fkey.nspname AND rlbk_tbl_fk = r_fkey.conname AND rlbk_datetime = (SELECT max(rlbk_datetime) FROM emaj.emaj_rlbk_stat WHERE rlbk_operation = 'add_fk' AND rlbk_schema = r_fkey.nspname AND rlbk_tbl_fk = r_fkey.conname); IF v_estim IS NULL THEN -- definitely no statistics available v_estim = 0; END IF; END IF; END IF; v_estim_duration = v_estim_duration + v_estim; END LOOP; RETURN v_estim_duration; END; $emaj_estimate_rollback_duration$; COMMENT ON FUNCTION emaj.emaj_estimate_rollback_duration(TEXT,TEXT) IS $$Estimates the duration of a potential rollback of an E-Maj group to a given mark.$$; CREATE or REPLACE FUNCTION emaj.emaj_snap_group(v_groupName TEXT, v_dir TEXT, v_copyOptions TEXT) RETURNS INT LANGUAGE plpgsql SECURITY DEFINER AS $emaj_snap_group$ -- This function creates a file for each table and sequence belonging to the group. -- For tables, these files contain all rows sorted on primary key. -- For sequences, they contain a single row describing the sequence. -- To do its job, the function performs COPY TO statement, with all default parameters. -- For table without primary key, rows are sorted on all columns. -- There is no need for the group to be in IDLE state. -- As all COPY statements are executed inside a single transaction: -- - the function can be called while other transactions are running, -- - the snap files will present a coherent state of tables. -- It's users responsability : -- - to create the directory (with proper permissions allowing the cluster to write into) before -- emaj_snap_group function call, and -- - maintain its content outside E-maj. -- Input: group name, the absolute pathname of the directory where the files are to be created and the options to used in the COPY TO statements -- Output: number of processed tables and sequences -- The function is defined as SECURITY DEFINER so that emaj_adm role can use. DECLARE v_pgVersion TEXT := emaj._pg_version(); v_emajSchema TEXT := 'emaj'; v_nbTb INT := 0; r_tblsq RECORD; v_fullTableName TEXT; r_col RECORD; v_colList TEXT; v_fileName TEXT; v_stmt TEXT; v_seqCol TEXT; BEGIN -- insert begin in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('SNAP_GROUP', 'BEGIN', v_groupName, v_dir); -- check that the group is recorded in emaj_group table PERFORM 0 FROM emaj.emaj_group WHERE group_name = v_groupName; IF NOT FOUND THEN RAISE EXCEPTION 'emaj_snap_group: group % has not been created.', v_groupName; END IF; -- check the supplied directory is not null IF v_dir IS NULL THEN RAISE EXCEPTION 'emaj_snap_group: directory parameter cannot be NULL'; END IF; -- for each table/sequence of the emaj_relation table FOR r_tblsq IN SELECT rel_priority, rel_schema, rel_tblseq, rel_kind FROM emaj.emaj_relation WHERE rel_group = v_groupName ORDER BY rel_priority, rel_schema, rel_tblseq LOOP v_fileName := v_dir || '/' || r_tblsq.rel_schema || '_' || r_tblsq.rel_tblseq || '.snap'; v_fullTableName := quote_ident(r_tblsq.rel_schema) || '.' || quote_ident(r_tblsq.rel_tblseq); IF r_tblsq.rel_kind = 'r' THEN -- if it is a table, -- first build the order by column list v_colList := ''; PERFORM 0 FROM pg_class, pg_namespace, pg_constraint WHERE relnamespace = pg_namespace.oid AND connamespace = pg_namespace.oid AND conrelid = pg_class.oid AND contype = 'p' AND nspname = r_tblsq.rel_schema AND relname = r_tblsq.rel_tblseq; IF FOUND THEN -- the table has a pkey, FOR r_col IN SELECT attname FROM pg_attribute, pg_index WHERE pg_attribute.attrelid = pg_index.indrelid AND attnum = ANY (indkey) AND indrelid = v_fullTableName::regclass AND indisprimary AND attnum > 0 AND attisdropped = false LOOP IF v_colList = '' THEN v_colList := quote_ident(r_col.attname); ELSE v_colList := v_colList || ',' || quote_ident(r_col.attname); END IF; END LOOP; ELSE -- the table has no pkey FOR r_col IN SELECT attname FROM pg_attribute WHERE attrelid = v_fullTableName::regclass AND attnum > 0 AND attisdropped = false LOOP IF v_colList = '' THEN v_colList := quote_ident(r_col.attname); ELSE v_colList := v_colList || ',' || quote_ident(r_col.attname); END IF; END LOOP; END IF; -- prepare the COPY statement v_stmt= 'COPY (SELECT * FROM ' || v_fullTableName || ' ORDER BY ' || v_colList || ') TO ' || quote_literal(v_fileName) || ' ' || coalesce (v_copyOptions, ''); ELSEIF r_tblsq.rel_kind = 'S' THEN -- if it is a sequence, the statement has no order by IF v_pgVersion <= '8.3' THEN v_seqCol = 'sequence_name, last_value, 0, increment_by, max_value, min_value, cache_value, is_cycled, is_called'; ELSE v_seqCol = 'sequence_name, last_value, start_value, increment_by, max_value, min_value, cache_value, is_cycled, is_called'; END IF; v_stmt= 'COPY (SELECT ' || v_seqCol || ' FROM ' || v_fullTableName || ') TO ' || quote_literal(v_fileName) || ' ' || coalesce (v_copyOptions, ''); END IF; -- and finaly perform the COPY -- raise notice 'emaj_snap_group: Executing %',v_stmt; EXECUTE v_stmt; v_nbTb = v_nbTb + 1; END LOOP; -- create the _INFO file to keep general information about the snap operation EXECUTE 'COPY (SELECT ' || quote_literal('E-Maj snap of tables group ' || v_groupName || ' at ' || transaction_timestamp()) || ') TO ' || quote_literal(v_dir || '/_INFO'); -- insert end in the history INSERT INTO emaj.emaj_hist (hist_function, hist_event, hist_object, hist_wording) VALUES ('SNAP_GROUP', 'END', v_groupName, v_nbTb || ' tables/sequences processed'); RETURN v_nbTb; END; $emaj_snap_group$; COMMENT ON FUNCTION emaj.emaj_snap_group(TEXT,TEXT,TEXT) IS $$Snaps all application tables and sequences of an E-Maj group into a given directory.$$; CREATE or REPLACE FUNCTION emaj.emaj_snap_log_group(v_groupName TEXT, v_firstMark TEXT, v_lastMark TEXT, v_dir TEXT, v_copyOptions TEXT) RETURNS INT LANGUAGE plpgsql SECURITY DEFINER AS $emaj_snap_log_group$ -- This function creates a file for each log table belonging to the group. -- It also creates 2 files containing the state of sequences respectively at start mark and end mark -- For log tables, files contain all rows related to the time frame, sorted on emaj_gid. -- For sequences, files are names _sequences_at_, or _sequences_at_