-- ---------------------------------------------------------------------- -- provsql 1.9.0 -> 1.10.0 -- -- New SQL surface since 1.9.0: -- * exact two-terminal network reliability on bounded-treewidth data, -- compiled along a tree decomposition of the data graph. The -- user-facing route is the ordinary WITH RECURSIVE reachability -- query under provsql.boolean_provenance, which the query rewriter -- lowers through the new eval_reachability driver (with fallback to -- eval_recursive); the surface here is that driver, the -- reachability_materialize / reachability_materialize_hops / -- reachability_evaluate / reachability_compile_stats columnar -- internals, and the gather_reachability_edges helper. -- * a canonical-address probe in provenance_plus, serving the -- pre-created within-bound gates of the bounded-hop route, and its -- provenance_times twin, serving the pre-created -- all-members-reachable gates of self-join conjunctions -- (plant_reach_cover / reachability_materialize_cover). -- * the labelled assumption marker: gate type 'assumed_boolean' is -- renamed 'assumed' with the assumption kind in the gate's extra -- label; provenance_assume(token, assumption) is the constructor, -- assume_boolean(token) kept as a compatibility wrapper. -- ---------------------------------------------------------------------- SET search_path TO provsql; -- The Boolean-assumption marker generalises to a labelled assumption -- marker: the gate type is renamed 'assumed' (ordinality preserved) and -- the assumption kind ('boolean' / 'absorptive') lives in the gate's -- extra label, absent on pre-existing gates and then defaulting to the -- historical 'boolean'. ALTER TYPE provenance_gate RENAME VALUE 'assumed_boolean' TO 'assumed'; -- Conditioning marker for the | / cond operator (see the conditioning -- block at the end of this script): a three-child gate evaluated only in -- the measure interpretation. ADD VALUE is not used in this transaction -- (the cond function below only references it as a literal / in -- create_gate, never materialising the value), so it is upgrade-safe. ALTER TYPE provenance_gate ADD VALUE IF NOT EXISTS 'conditioned'; /** * @brief Wrap @p token in a fresh @c gate_assumed carrying @p assumption * as its label, and return the wrapper's UUID. * * Public primitive callable from any rewrite or driver that needs to * flag a sub-circuit as sound only under an evaluation assumption: * * - @c 'boolean' -- the sub-circuit only preserves the Boolean function * of the lineage (e.g. the safe-query rewrite collapses derivation * multiplicities); transparent for semirings admitting a homomorphism * from Boolean functions. * - @c 'absorptive' -- the sub-circuit was truncated at the absorptive * value fixpoint (cyclic recursive query); transparent for absorptive * semirings (probability, boolean, min-plus over nonnegative * costs...), fatal for the rest (counting, why-provenance). * * Incompatible evaluators raise a @c CircuitException. Always kept as * an explicit node in PROV-XML export. * * The wrapper UUID is content-derived via @c uuid_generate_v5 on the * assumption and the child, so identical children always wrap to the * same outer UUID per assumption. No-op (returns NULL) on a NULL * input. */ CREATE OR REPLACE FUNCTION provenance_assume(token UUID, assumption TEXT) RETURNS UUID AS $$ DECLARE wrapped uuid; BEGIN IF token IS NULL THEN RETURN NULL; END IF; IF assumption NOT IN ('boolean', 'absorptive') THEN RAISE EXCEPTION 'provenance_assume: unknown assumption %', assumption; END IF; wrapped := public.uuid_generate_v5(uuid_ns_provsql(), concat('assumed', assumption, token)); PERFORM create_gate(wrapped, 'assumed', ARRAY[token]); PERFORM set_extra(wrapped, assumption); RETURN wrapped; END $$ LANGUAGE plpgsql SET search_path=provsql,pg_temp,public SECURITY DEFINER PARALLEL SAFE; /** * @brief Wrap @p token in a Boolean-assumption marker (compatibility * name; see @c provenance_assume). */ CREATE OR REPLACE FUNCTION assume_boolean(token UUID) RETURNS UUID AS $$ SELECT provsql.provenance_assume(token, 'boolean'); $$ LANGUAGE sql SECURITY DEFINER PARALLEL SAFE; -- eval_recursive: cyclic data now stops at the absorptive value fixpoint -- under the 'absorptive'/'boolean' provenance classes, with the resulting -- tokens wrapped in the 'absorptive' assumption marker. /** * @brief Driver for provenance over recursive queries (WITH RECURSIVE). * * Invoked by the planner hook (@c lower_recursive_cte in @c provsql.c) when it * lowers a recursive CTE whose body touches provenance-tracked relations. The * hook deparses the CTE body to SQL and calls this function, which runs naive * bottom-up (fixpoint) evaluation: each round re-evaluates the body * @c base @c UNION @c recursive over a tracked working table until the * provenance tokens stop changing. Every round goes through ProvSQL's normal * rewriting, so the recursive join yields @c times gates, the untracked base * branch yields @c gate_one, and the @c UNION yields the @c plus merge of * alternative derivations -- no provenance is plumbed by hand here. The result * is left in a tracked temp table named @p work_name, which the hook then scans * in place of the CTE. * * The working tables (@p work_name and a scratch @c _new) are created once and * reused across rounds (TRUNCATE + INSERT), so the round count never * accumulates relation locks. Because content-addressed gate UUIDs make * structurally identical sub-circuits share, the fixpoint test is an exact * relational @c EXCEPT and the circuit stays the shared (polynomial) form. * * Scope: UNION (set) recursion. On *acyclic* input the structural fixpoint is * reached and the resulting circuit is the universal provenance, sound for any * semiring. On *cyclic* input the circuit never stabilises structurally; when * the session's provenance class (@c provsql.provenance) is @c 'absorptive' or * @c 'boolean' we instead stop at the value-fixpoint bound (number of * derivable tuples) -- every minimal, tuple-repetition-free derivation is then * covered, and the longer ones are absorbed in any absorptive semiring (after * Deutch, Milo, Roy & Tannen, ICDT 2014) -- and wrap the resulting tokens in * the @c 'absorptive' assumption marker, so that non-absorptive semiring * evaluations (counting, why-provenance: genuinely infinite on cyclic data) * refuse them while probability, Boolean, formula-as-circuit and min-plus * evaluations proceed. Under the general classes, cyclic input trips the * @p max_iter guard. * * This function has no @c SET @c search_path on purpose: @p body_sql is the * caller's deparsed query and must resolve relation names in the caller's path. * * @param body_sql the recursive CTE body, e.g. * @c 'SELECT 1 UNION SELECT e.dst FROM edge e JOIN reach r ON e.src=r.node' * @param work_name the working relation name @p body_sql references (the CTE name) * @param colnames comma-separated user columns, e.g. @c 'node' * @param coldef column definitions for the working table, e.g. @c 'node integer' * @param max_iter safety bound on fixpoint rounds (non-termination guard) */ CREATE OR REPLACE FUNCTION eval_recursive( body_sql text, work_name text, colnames text, coldef text, max_iter int DEFAULT 1000) RETURNS void AS $$ DECLARE changed boolean; -- circuit changed structurally this round set_stable boolean; -- user-column tuple set unchanged this round iters int := 0; new_count int; -- rows in _new this round (INSERT ROW_COUNT) -- Under an absorptive semiring the provenance *value* converges on cyclic -- data even though the circuit keeps growing structurally. A minimal -- derivation cannot repeat a tuple, so it has depth <= (number of derivable -- tuples); after that many naive rounds the value equals the least fixpoint, -- and the surplus (longer, cyclic) derivations are absorbed at evaluation -- time. We learn that bound from the tuple-set fixpoint, stop there, and -- mark the resulting tokens with the 'absorptive' assumption so evaluation -- under a non-absorptive semiring refuses rather than silently returning a -- truncated value. absorptive_mode boolean := coalesce(current_setting('provsql.provenance', true), 'semiring') IN ('absorptive', 'boolean'); truncated boolean := false; -- exited at the value fixpoint (cyclic data) ntuples int := NULL; -- the bound above, set once the tuple set stabilises BEGIN EXECUTE format('DROP TABLE IF EXISTS %I', work_name); DROP TABLE IF EXISTS _new; -- Tracked working table (carries provsql), initially empty, plus a scratch -- table of the same shape; both reused across rounds. EXECUTE format('CREATE TEMP TABLE %I (%s, provsql uuid)', work_name, coldef); EXECUTE format('CREATE TEMP TABLE _new (LIKE %I)', work_name); LOOP iters := iters + 1; -- Hard safety bound (also catches genuinely unbounded recursion, e.g. an -- unbounded counter, where even the tuple set never stabilises). IF iters > max_iter THEN RAISE EXCEPTION 'eval_recursive: no fixpoint after % rounds (cyclic data?)', max_iter; END IF; -- One round of naive evaluation: re-run the CTE body over the current -- working table. INSERT targets a tracked table, so ProvSQL fills provsql. -- Take the row count from the INSERT itself (counting _new directly would be -- an aggregate over a provenance-tracked table -> an agg_token). EXECUTE 'TRUNCATE _new'; EXECUTE format('INSERT INTO _new(%s) %s', colnames, body_sql); GET DIAGNOSTICS new_count = ROW_COUNT; -- Exact structural fixpoint test (content-addressed tokens => set equality). EXECUTE format( 'SELECT EXISTS((TABLE _new EXCEPT TABLE %1$I) UNION ALL (TABLE %1$I EXCEPT TABLE _new))', work_name) INTO changed; -- In an absorptive class, learn the round bound from the tuple-set -- fixpoint (the set always stabilises after finitely many rounds, even on -- cyclic data). IF absorptive_mode AND ntuples IS NULL THEN EXECUTE format( 'SELECT NOT EXISTS(' || '(SELECT %2$s FROM _new EXCEPT SELECT %2$s FROM %1$I) UNION ALL ' || '(SELECT %2$s FROM %1$I EXCEPT SELECT %2$s FROM _new))', work_name, colnames) INTO set_stable; IF set_stable THEN ntuples := new_count; END IF; END IF; -- Copy _new into the working table (tracked -> tracked carries the tokens). EXECUTE format('TRUNCATE %I', work_name); EXECUTE format('INSERT INTO %1$I(%2$s) SELECT %2$s FROM _new', work_name, colnames); -- Structural fixpoint: done (acyclic / fully converged) -- sound for any -- semiring. EXIT WHEN NOT changed; -- Absorptive class on cyclic data: once the value-fixpoint bound is -- reached (plus one confirming round, so that acyclic circuits whose -- token depth lags the tuple-set saturation still exit through the -- structural test above, untagged) we stop, even though the circuit -- is not structurally stable. IF absorptive_mode AND ntuples IS NOT NULL AND iters >= ntuples + 1 THEN truncated := true; EXIT; END IF; END LOOP; -- Tokens of a truncated (cyclic) fixpoint are sound only under absorptive -- evaluation: record that in the circuit itself. IF truncated THEN EXECUTE format( 'UPDATE %I SET provsql = provsql.provenance_assume(provsql, ''absorptive'')', work_name); END IF; END $$ LANGUAGE plpgsql SET client_min_messages = warning; -- sr_tropical gains the 'nonnegative' flag (absorptive min-plus, -- accepting cyclic-recursion tokens); the two-argument form is -- subsumed by the default. DROP FUNCTION sr_tropical(ANYELEMENT, regclass); /** @brief Evaluate provenance over the tropical (min-plus) m-semiring * * Inputs are read as %float8 cost values; the additive identity * is 'Infinity'::%float8 and the multiplicative identity is 0. * Returns the cost of the cheapest derivation. * * With @p nonnegative, input costs are checked nonnegative and the * semiring is *absorptive*: evaluation then also accepts circuits * carrying the @c 'absorptive' assumption marker -- notably cyclic * recursive queries truncated at the absorptive value fixpoint, giving * exact min-cost reachability on cyclic data. */ CREATE FUNCTION sr_tropical(token ANYELEMENT, token2value regclass, nonnegative BOOLEAN = false) RETURNS FLOAT AS $$ BEGIN RETURN provsql.provenance_evaluate_compiled( token, token2value, CASE WHEN nonnegative THEN 'tropical_nonneg' ELSE 'tropical' END, 0::FLOAT ); END $$ LANGUAGE plpgsql STRICT PARALLEL SAFE STABLE; /** * @brief Create a plus (sum) gate from an array of provenance tokens * * Filters out NULL and zero-gates; returns gate_zero() if all tokens * are trivial, or a single token if only one remains. Before creating * a gate, probes the *canonical* address of the multiset -- a dedicated * v5 recipe namespace over the sorted tokens (plus is commutative), in * which this function never creates anything, so a gate found there is * always a deliberate pre-creation computing the same sum. That is the * bounded-hop reachability route's hook: it plants, at the canonical * address of a vertex's per-length tokens, a certified gate over its * native within-bound circuit, keeping the natural hop-discarding query * on the linear evaluation route. Absent a canonical gate, the * historical order-dependent recipe is used unchanged, so ordinary plus * gates (and their formula rendering) are untouched. */ CREATE OR REPLACE FUNCTION provenance_plus(tokens uuid[]) RETURNS UUID AS $$ DECLARE c INTEGER; plus_token uuid; filtered_tokens uuid[]; canonical uuid; BEGIN SELECT array_agg(t) FROM unnest(tokens) t WHERE t IS NOT NULL AND t <> gate_zero() INTO filtered_tokens; c:=array_length(filtered_tokens, 1); IF c = 0 THEN plus_token := gate_zero(); ELSIF c = 1 THEN plus_token := filtered_tokens[1]; ELSE -- Computed separately from the filtering aggregate above: an ORDER -- BY aggregate there would make the planner feed *both* aggregates -- sorted input, scrambling the stored (aggregation-order) children. SELECT uuid_generate_v5(uuid_ns_provsql(), concat('plus-canonical', array_agg(t ORDER BY t))) FROM unnest(filtered_tokens) t INTO canonical; IF get_gate_type(canonical) = 'plus' THEN -- A deliberate pre-creation at the canonical address: same -- children, same sum. plus_token := canonical; ELSE plus_token := uuid_generate_v5( uuid_ns_provsql(), concat('plus', filtered_tokens)); PERFORM create_gate(plus_token, 'plus', filtered_tokens); END IF; END IF; RETURN plus_token; END $$ LANGUAGE plpgsql STRICT SET search_path=provsql,pg_temp,public SECURITY DEFINER PARALLEL SAFE IMMUTABLE; /** * @brief Exact reachability probability over bounded-treewidth data * (columnar form) * * Computes the probability that @p target is reachable from @p source in * the probabilistic graph given by the parallel edge arrays * (two-terminal network reliability). Unlike * @c probability_evaluate(), which compiles the provenance circuit * built along the relational query plan, this compiles the query * along a tree decomposition of the *data* graph (in the spirit of the * provenance refinement of Courcelle's theorem), producing a d-DNNF * whose size is linear in the number of edges for data of bounded * treewidth. Exact, and linear-time, on cyclic data as well -- where * the recursive-query fixpoint cannot terminate structurally. * * Edges are independent events. Two array positions may share a token * only if they are mutual reverses (the natural encoding of an * undirected edge in a directed edge relation); they are then treated * as a single bidirectional edge. This is an internal/testing surface: * the user-facing route is a plain @c WITH @c RECURSIVE reachability * query under the 'absorptive' (or 'boolean') provenance class, which * the query rewriter compiles through @c eval_reachability() / * @c reachability_materialize(). * * @param sources source vertex of each edge (dense integer IDs) * @param destinations destination vertex of each edge * @param tokens provenance token of each edge tuple * @param probabilities probability of each edge tuple * @param source the vertex reachability starts from * @param target the vertex whose reachability is evaluated * @param directed if false, each edge can be traversed both ways */ CREATE OR REPLACE FUNCTION reachability_evaluate( sources INT[], destinations INT[], tokens UUID[], probabilities DOUBLE PRECISION[], source INT, target INT, directed BOOLEAN) RETURNS DOUBLE PRECISION AS 'provsql','reachability_evaluate' LANGUAGE C IMMUTABLE PARALLEL SAFE; /** * @brief Reachability probability plus compilation statistics * (columnar form) * * Same compilation as @c reachability_evaluate(), returning the * probability together with the structural statistics that * substantiate the bounded-treewidth guarantee: the treewidth of the * min-fill decomposition of the data graph, its number of bags, the * maximum number of dynamic-programming states at any decomposition * node, and the size of the emitted d-DNNF (linear in the number of * edges for fixed data treewidth). * * @param sources source vertex of each edge (dense integer IDs) * @param destinations destination vertex of each edge * @param tokens provenance token of each edge tuple * @param probabilities probability of each edge tuple * @param source the vertex reachability starts from * @param target the vertex whose reachability is evaluated * @param directed if false, each edge can be traversed both ways */ CREATE OR REPLACE FUNCTION reachability_compile_stats( IN sources INT[], IN destinations INT[], IN tokens UUID[], IN probabilities DOUBLE PRECISION[], IN source INT, IN target INT, IN directed BOOLEAN, OUT probability DOUBLE PRECISION, OUT data_treewidth INT, OUT nb_bags BIGINT, OUT max_states BIGINT, OUT nb_gates BIGINT, OUT nb_variables BIGINT) AS 'provsql','reachability_compile_stats' LANGUAGE C IMMUTABLE PARALLEL SAFE; /** * @brief Compile and materialise the reachability provenance of every * vertex (columnar form, internal) * * All-targets variant of @c reachability_evaluate(): compiles, along a * tree decomposition of the data graph, one certified provenance * circuit per vertex reachable from some source in the all-edges-present * world, materialises the (shared, linear-size) circuits in the * provenance store -- @c plus / @c times gates carrying the d-DNNF * certificate, negated edges as @c monus(one, edge) -- and returns one * @c (vertex, token) row per such vertex. Sources form a possibly * *probabilistic source set*: each source arc is gated by the source * tuple's token, the nil UUID marking a certain (always present) * source. This is the engine behind the rewriter's * recursive-reachability route; the returned tokens are ordinary * provenance tokens usable with the whole evaluation surface, wrapped * in the 'absorptive' assumption marker (the compiled circuit is the * exact Boolean lineage but only the absorptive quotient of the * infinite recursive semiring provenance: probability and absorptive * semiring evaluations -- e.g. nonnegative min-plus -- are exact, * counting and why-provenance refuse). * * @param sources source vertex of each edge (dense integer IDs) * @param destinations destination vertex of each edge * @param tokens provenance token of each edge tuple * @param probabilities probability of each edge tuple * @param block_keys per-edge BID key variable (nil UUID = independent * tuple; alternatives sharing a key are mutually exclusive, e.g. * from repair_key) * @param block_indices per-edge outcome index within its block * @param source_vertices the source vertices * @param source_tokens per-source provenance token (nil UUID = certain) * @param source_probabilities per-source probability * @param directed if false, each edge can be traversed both ways */ CREATE OR REPLACE FUNCTION reachability_materialize( IN sources INT[], IN destinations INT[], IN tokens UUID[], IN probabilities DOUBLE PRECISION[], IN block_keys UUID[], IN block_indices INT[], IN source_vertices INT[], IN source_tokens UUID[], IN source_probabilities DOUBLE PRECISION[], IN directed BOOLEAN, OUT vertex INT, OUT token UUID) RETURNS SETOF record AS 'provsql','reachability_materialize' LANGUAGE C VOLATILE; /** * @brief Bounded-hop variant of @c reachability_materialize() (internal) * * Compiles, along a tree decomposition of the data graph, one certified * provenance circuit per (vertex, walk length) pair achievable within * @p hop_bound edges -- the rows a hop-counting recursive CTE derives, * row @c (v,h) meaning "some *walk* of exactly @c h edges connects a * present source to @c v" -- and returns them as @c (vertex, hops, * token) with @p hop_seed added to the lengths (the CTE base arm's hop * constant). Also pre-creates, per vertex, the certified gate that a * hop-discarding query's deduplication will address, wired to the * compilation's native within-bound root, so the natural "within k * hops" probability evaluates through the linear certified route. * * @param sources source vertex of each edge (dense integer IDs) * @param destinations destination vertex of each edge * @param tokens provenance token of each edge tuple * @param probabilities probability of each edge tuple * @param block_keys per-edge BID key variable (nil UUID = independent) * @param block_indices per-edge outcome index within its block * @param source_vertices the source vertices * @param source_tokens per-source provenance token (nil UUID = certain) * @param source_probabilities per-source probability * @param directed if false, each edge can be traversed both ways * @param hop_bound maximum walk length * @param hop_seed hop value of the base arm (added to reported lengths) */ CREATE OR REPLACE FUNCTION reachability_materialize_hops( IN sources INT[], IN destinations INT[], IN tokens UUID[], IN probabilities DOUBLE PRECISION[], IN block_keys UUID[], IN block_indices INT[], IN source_vertices INT[], IN source_tokens UUID[], IN source_probabilities DOUBLE PRECISION[], IN directed BOOLEAN, IN hop_bound INT, IN hop_seed INT, OUT vertex INT, OUT hops INT, OUT token UUID) RETURNS SETOF record AS 'provsql','reachability_materialize_hops' LANGUAGE C VOLATILE; /** * @brief Per-group "some member reachable" compilation (columnar form, * internal) * * For each distinct group in the parallel @p group_ids / * @p member_vertices arrays, compiles the certified circuit of "some * member vertex is reachable from a present source" along the data * decomposition -- the disjunction over the group's *correlated* * per-vertex reachability events, deterministic by construction * through the set-reachability state bit -- materialises it, and * returns one @c (group_id, token) row per group. Engine behind the * rewriter's cross-vertex aggregation planting. * * @param sources source vertex of each edge (dense integer IDs) * @param destinations destination vertex of each edge * @param tokens provenance token of each edge tuple * @param probabilities probability of each edge tuple * @param block_keys per-edge BID key variable (nil UUID = independent) * @param block_indices per-edge outcome index within its block * @param source_vertices the source vertices * @param source_tokens per-source provenance token (nil UUID = certain) * @param source_probabilities per-source probability * @param directed if false, each edge can be traversed both ways * @param group_ids group identifier of each member row * @param member_vertices member vertex of each member row */ CREATE OR REPLACE FUNCTION reachability_materialize_any( IN sources INT[], IN destinations INT[], IN tokens UUID[], IN probabilities DOUBLE PRECISION[], IN block_keys UUID[], IN block_indices INT[], IN source_vertices INT[], IN source_tokens UUID[], IN source_probabilities DOUBLE PRECISION[], IN directed BOOLEAN, IN group_ids INT[], IN member_vertices INT[], OUT group_id INT, OUT token UUID) RETURNS SETOF record AS 'provsql','reachability_materialize_any' LANGUAGE C VOLATILE; /** * @brief Plant certified any-member-reachable gates for a grouped * reachability aggregation (internal) * * Called (at plan time, over SPI) by the recursive-CTE lowering when * the outer query aggregates a reachability working table by a column * of a joined, untracked member relation: @c GROUP @c BY collapses * each group's per-vertex reach tokens with @c provenance_plus, whose * disjuncts are correlated (they share edges) and would otherwise * leave the certified route. For each multi-member group this * pre-creates, at the canonical address of the group's token multiset, * a certified single-child plus over the group's native * any-member-reachable circuit (@c reachability_materialize_any), so * the natural aggregation stays on the linear evaluation route. * Best-effort: any failure leaves the generic path untouched (notice * under verbosity 10). * * @param work_name the lowered CTE's working table * @param node_attribute its vertex column * @param member_rel the joined member relation (must be untracked) * @param member_attribute the member relation's join column * @param group_attribute the member relation's grouping column * @param edge_rel the tracked edge relation (as for eval_reachability) * @param source_attribute name of the source-vertex column * @param destination_attribute name of the destination-vertex column * @param source_value the base arm's constant, as text * @param directed if false, each edge can be traversed both ways * @param edge_quals optional deterministic filter over edge columns * @param source_rel source relation of a multi-source base arm * @param source_rel_attribute the source relation's vertex column * @param edge_sql deparsed edge subquery (join-defined edges) * @param member_quals optional deterministic filter over the member * relation's columns (table-qualified as @c t.), restricting * which members participate in each group */ CREATE OR REPLACE FUNCTION plant_reach_any_groups( work_name text, node_attribute text, member_rel regclass, member_attribute text, group_attribute text, edge_rel regclass, source_attribute text, destination_attribute text, source_value text, directed boolean, edge_quals text DEFAULT NULL, source_rel regclass DEFAULT NULL, source_rel_attribute text DEFAULT NULL, edge_sql text DEFAULT NULL, member_quals text DEFAULT NULL) RETURNS void AS $$ DECLARE e record; grp record; m record; sv text[]; st uuid[]; sp double precision[]; gids int[] := ARRAY[]::int[]; mids int[] := ARRAY[]::int[]; vid int; canonical uuid; verbosity int := coalesce(current_setting('provsql.verbose_level', true)::int, 0); BEGIN BEGIN -- A tracked member relation would make the aggregated tokens -- per-row products, not the bare reach tokens: nothing to plant. IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = member_rel AND attname = 'provsql' AND atttypid = 'uuid'::regtype AND NOT attisdropped) THEN RETURN; END IF; IF source_rel IS NOT NULL THEN SELECT g.source_values, g.source_tokens, g.source_probabilities INTO sv, st, sp FROM provsql.gather_reachability_sources(source_rel, source_rel_attribute) g; IF sv IS NULL THEN sv := ARRAY[]::text[]; st := ARRAY[]::uuid[]; sp := ARRAY[]::float8[]; END IF; ELSE sv := ARRAY[source_value]; st := ARRAY['00000000-0000-0000-0000-000000000000'::uuid]; sp := ARRAY[1.0::float8]; END IF; e := provsql.gather_reachability_edges(edge_rel, source_attribute, destination_attribute, sv, edge_quals, edge_sql); -- The groups, replicating the user's join semantics: per group, the -- member vertices and the multiset of their reach tokens (with the -- multiplicity the join produces). Single-member groups need no -- planting (provenance_plus passes a single token through). -- Two steps: materialise the joined rows with their per-row tokens -- (tracked CTAS, then strip the automatic provsql column), and only -- then aggregate the now-plain table -- aggregating provenance() -- inside a grouped tracked query would be rewritten as a -- provenance-aware aggregation, which is not what the planting -- needs. DROP TABLE IF EXISTS provsql_reach_any_flat_tmp; EXECUTE format( 'CREATE TEMP TABLE provsql_reach_any_flat_tmp AS ' || 'SELECT w.%1$I::text AS node_val, provsql.provenance() AS tok, ' || ' t.%5$I AS grp_key ' || 'FROM %2$I w JOIN %3$s t ON w.%1$I = t.%4$I' -- The member-relation filter restricts which members participate -- (deparsed table-qualified as t.); the working table side -- carries no provenance distinction here. || coalesce(' WHERE ' || member_quals, ''), node_attribute, work_name, member_rel::text, member_attribute, group_attribute); PERFORM provsql.remove_provenance('provsql_reach_any_flat_tmp'); DROP TABLE IF EXISTS provsql_reach_any_groups_tmp; CREATE TEMP TABLE provsql_reach_any_groups_tmp AS SELECT (row_number() OVER ())::int AS gid, members, toks FROM ( SELECT array_agg(node_val) AS members, array_agg(tok) AS toks FROM provsql_reach_any_flat_tmp GROUP BY grp_key HAVING count(*) >= 2) g; DROP TABLE provsql_reach_any_flat_tmp; FOR grp IN SELECT gid, members FROM provsql_reach_any_groups_tmp LOOP FOR m IN SELECT DISTINCT unnest(grp.members) AS val LOOP vid := array_position(e.vertices, m.val); IF vid IS NOT NULL THEN gids := gids || grp.gid; mids := mids || vid; END IF; END LOOP; END LOOP; IF cardinality(gids) = 0 THEN DROP TABLE provsql_reach_any_groups_tmp; RETURN; END IF; FOR grp IN SELECT a.group_id, a.token AS any_token, t.toks FROM provsql.reachability_materialize_any( e.sources, e.destinations, e.tokens, e.probabilities, e.block_keys, e.block_indices, e.extra_ids, st, sp, directed, gids, mids) a JOIN provsql_reach_any_groups_tmp t ON t.gid = a.group_id LOOP canonical := public.uuid_generate_v5( provsql.uuid_ns_provsql(), concat('plus-canonical', (SELECT array_agg(tok ORDER BY tok) FROM unnest(grp.toks) tok))); PERFORM provsql.create_gate(canonical, 'plus', ARRAY[grp.any_token]); PERFORM provsql.set_infos(canonical, 1); END LOOP; DROP TABLE provsql_reach_any_groups_tmp; IF verbosity >= 20 THEN -- Lift the function-level client_min_messages = warning for the -- one RAISE; the function-level SET restores the caller's value. PERFORM set_config('client_min_messages', 'notice', true); RAISE NOTICE 'ProvSQL: certified any-member gates planted for the aggregation of "%" by %.%', work_name, member_rel, group_attribute; PERFORM set_config('client_min_messages', 'warning', true); END IF; EXCEPTION WHEN OTHERS THEN IF verbosity >= 10 THEN PERFORM set_config('client_min_messages', 'notice', true); RAISE NOTICE 'ProvSQL: any-member planting for "%" skipped (%)', work_name, SQLERRM; PERFORM set_config('client_min_messages', 'warning', true); END IF; END; END -- No SET search_path: the deparsed edge subquery must resolve against -- the caller's path; ProvSQL internals are schema-qualified. $$ LANGUAGE plpgsql SET client_min_messages = warning; /** * @brief Input leaves of a conjunction-shaped provenance token (internal) * * Descends a token's circuit through the conjunctive gate types * (@c times, and the pass-through @c project / @c eq where-provenance * wrappers) down to @c input leaves. Returns the distinct leaves, or * NULL when the circuit contains any other gate type (a disjunctive or * aggregate shape, which is not a conjunction of independent tuples). * Used by the reachability gathering to accept join-defined edges: * a derived edge whose token is a pure conjunction of base tuples. * * @param token the provenance token */ CREATE OR REPLACE FUNCTION token_conjunctive_leaves(token uuid) RETURNS uuid[] AS $$ WITH RECURSIVE walk(g) AS ( SELECT token UNION SELECT c FROM walk w, unnest(provsql.get_children(w.g)) AS c WHERE provsql.get_gate_type(w.g) IN ('times', 'project', 'eq') ) SELECT CASE WHEN bool_and(provsql.get_gate_type(g) IN ('times', 'project', 'eq', 'input')) THEN array_agg(DISTINCT g) FILTER (WHERE provsql.get_gate_type(g) = 'input') ELSE NULL END FROM walk; $$ LANGUAGE sql STABLE; /** * @brief Gather the edges of a tracked relation in the columnar form * expected by reachability_evaluate (internal) * * Materializes the edge relation with its provenance tokens and * probabilities, maps arbitrary vertex values (compared as text) onto * dense integer IDs, and checks that every edge tuple carries a base * input token (independent tuples): reachability compilation along the * data is only correct when the edges are independent events, so views * or query results with derived provenance are rejected. * * @param rel the provenance-tracked edge relation * @param source_attribute name of the source-vertex column * @param destination_attribute name of the destination-vertex column * @param extra_vertices vertex values (as text) that must be part of * the dense ID space even when they touch no edge -- the source * set in particular; their IDs come back in @c extra_ids * (aligned with the input) * @param edge_quals optional deterministic filter over the edge * relation's columns (SQL text, deparsed by the rewriter from * the recursive arm's WHERE clause), restricting which edges * participate * @param rel_sql deparsed edge subquery to gather from instead of * @p rel (join-defined edges); the tokens are then conjunctions * of base tuples, validated for shape and disjoint supports * * The @c vertices output maps the dense IDs back to the original * vertex values (as text, 1-indexed), for callers that need to label * per-vertex results. */ CREATE OR REPLACE FUNCTION gather_reachability_edges( IN rel regclass, IN source_attribute TEXT, IN destination_attribute TEXT, IN extra_vertices TEXT[], IN edge_quals TEXT DEFAULT NULL, IN rel_sql TEXT DEFAULT NULL, OUT sources INT[], OUT destinations INT[], OUT tokens UUID[], OUT probabilities DOUBLE PRECISION[], OUT block_keys UUID[], OUT block_indices INT[], OUT extra_ids INT[], OUT vertices TEXT[]) AS $$ DECLARE tkind text; bkey_expr text; sel_probs text; sel_bkeys text; sel_bidx text; verbosity int := coalesce(current_setting('provsql.verbose_level', true)::int, 0); BEGIN -- Consult the per-table characterisation registry (TID / BID / OPAQUE, -- maintained by add_provenance / repair_key and the CTAS lineage hook): -- a TID relation is certified all-independent-inputs, a BID relation -- holds input or mulinput rows with the block structure given by the -- registry's key columns. Derived (OPAQUE), unregistered, or -- subquery-defined edges take the fully dynamic per-token path. IF rel IS NOT NULL AND rel_sql IS NULL THEN tkind := (provsql.get_table_info(rel::oid)).kind; END IF; IF tkind NOT IN ('tid', 'bid') THEN tkind := NULL; END IF; IF tkind = 'bid' THEN SELECT string_agg(quote_ident(a.attname) || '::text', ' || '','' || ' ORDER BY k.ord) INTO bkey_expr FROM unnest((provsql.get_table_info(rel::oid)).block_key) WITH ORDINALITY AS k(attnum, ord) JOIN pg_attribute a ON a.attrelid = rel AND a.attnum = k.attnum; -- An empty registry key means the whole table is one block. bkey_expr := coalesce(bkey_expr, quote_literal('')); END IF; IF tkind IS NOT NULL AND verbosity >= 20 THEN -- The function-level client_min_messages = warning (which silences -- the CTAS / DROP TABLE chatter) would also swallow this notice; -- lift it for the one RAISE. The function-level SET restores the -- caller's value at exit regardless. PERFORM set_config('client_min_messages', 'notice', true); RAISE NOTICE 'ProvSQL: catalog characterises % as %', rel, upper(tkind); PERFORM set_config('client_min_messages', 'warning', true); END IF; -- Materialize the edges with their tokens; the planner hook resolves -- provenance() over the tracked relation, and remove_provenance strips -- the automatic provsql column so the later aggregation is plain SQL. -- For a BID relation the synthetic per-block key (a v5 UUID over the -- registry key columns' values) is computed here, while the columns -- are in scope. DROP TABLE IF EXISTS provsql_reachability_edges_tmp; EXECUTE format( 'CREATE TEMP TABLE provsql_reachability_edges_tmp AS ' || 'SELECT %1$I::text AS u, %2$I::text AS v, provsql.provenance() AS token%5$s ' || 'FROM %3$s WHERE %1$I IS NOT NULL AND %2$I IS NOT NULL%4$s', source_attribute, destination_attribute, CASE WHEN rel_sql IS NULL THEN rel::text ELSE '(' || rel_sql || ') AS provsql_edge_subquery' END, CASE WHEN edge_quals IS NULL THEN '' ELSE ' AND (' || edge_quals || ')' END, CASE WHEN tkind = 'bid' THEN ', public.uuid_generate_v5(provsql.uuid_ns_provsql(), ' || quote_literal('bidblock' || rel::text || ':') || ' || ' || bkey_expr || ') AS bkey' ELSE ', NULL::uuid AS bkey' END); PERFORM provsql.remove_provenance('provsql_reachability_edges_tmp'); DROP TABLE IF EXISTS provsql_reachability_support_tmp; IF tkind IS NULL THEN -- Dynamic path: validate the token shapes and, for conjunction-shaped -- (join-defined) tokens, the pairwise disjointness of their supports. IF EXISTS (SELECT 1 FROM provsql_reachability_edges_tmp WHERE provsql.get_gate_type(token) NOT IN ('input', 'mulinput', 'times', 'project', 'eq')) THEN DROP TABLE provsql_reachability_edges_tmp; RAISE EXCEPTION 'reachability: the provenance of % must consist of base input, repair_key, or conjunctive join tokens', coalesce(rel::text, 'the edge query'); END IF; CREATE TEMP TABLE provsql_reachability_support_tmp AS SELECT t.token, l.leaf FROM (SELECT DISTINCT token FROM provsql_reachability_edges_tmp WHERE provsql.get_gate_type(token) IN ('times', 'project', 'eq')) t, LATERAL unnest(provsql.token_conjunctive_leaves(t.token)) AS l(leaf); IF EXISTS (SELECT 1 FROM (SELECT DISTINCT token FROM provsql_reachability_edges_tmp) t WHERE provsql.get_gate_type(t.token) IN ('times', 'project', 'eq') AND provsql.token_conjunctive_leaves(t.token) IS NULL) THEN DROP TABLE provsql_reachability_support_tmp; DROP TABLE provsql_reachability_edges_tmp; RAISE EXCEPTION 'reachability: a join-defined edge token is not a pure conjunction of base tuples'; END IF; IF EXISTS (SELECT 1 FROM ( SELECT leaf FROM provsql_reachability_support_tmp UNION ALL SELECT DISTINCT token FROM provsql_reachability_edges_tmp WHERE provsql.get_gate_type(token) = 'input' ) all_leaves GROUP BY leaf HAVING count(*) > 1) THEN DROP TABLE provsql_reachability_support_tmp; DROP TABLE provsql_reachability_edges_tmp; RAISE EXCEPTION 'reachability: join-defined edges share base tuples (their supports overlap), so they are not independent'; END IF; END IF; -- Per-kind classification expressions for the final aggregation: a TID -- relation needs no per-row gate introspection at all; a BID relation -- one get_gate_type per row (the input/mulinput split), block keys from -- the precomputed column-derived key and indices by numbering within -- the block; the dynamic path reads the gates. IF tkind = 'tid' THEN sel_probs := 'coalesce(provsql.get_prob(e.token), 1.0)'; sel_bkeys := $sql$'00000000-0000-0000-0000-000000000000'::uuid$sql$; sel_bidx := '0'; ELSIF tkind = 'bid' THEN sel_probs := 'coalesce(provsql.get_prob(e.token), 1.0)'; sel_bkeys := $sql$CASE WHEN provsql.get_gate_type(e.token) = 'mulinput' THEN e.bkey ELSE '00000000-0000-0000-0000-000000000000'::uuid END$sql$; sel_bidx := 'e.bidx'; ELSE sel_probs := $sql$CASE WHEN provsql.get_gate_type(e.token) IN ('times','project','eq') THEN (SELECT CASE WHEN bool_or(coalesce(provsql.get_prob(s.leaf),1.0) = 0) THEN 0.0 ELSE exp(sum(ln(coalesce(provsql.get_prob(s.leaf),1.0)))) END FROM provsql_reachability_support_tmp s WHERE s.token = e.token) ELSE coalesce(provsql.get_prob(e.token), 1.0) END$sql$; sel_bkeys := $sql$CASE WHEN provsql.get_gate_type(e.token) = 'mulinput' THEN (provsql.get_children(e.token))[1] ELSE '00000000-0000-0000-0000-000000000000'::uuid END$sql$; sel_bidx := $sql$CASE WHEN provsql.get_gate_type(e.token) = 'mulinput' THEN (provsql.get_infos(e.token)).info1 ELSE 0 END$sql$; END IF; EXECUTE format( $sql$ WITH verts AS ( SELECT u AS x FROM provsql_reachability_edges_tmp UNION SELECT v FROM provsql_reachability_edges_tmp UNION SELECT unnest($1)), ids AS ( SELECT x, (row_number() OVER (ORDER BY x))::int AS id FROM verts) SELECT array_agg(iu.id), array_agg(iv.id), array_agg(e.token), array_agg(%s), array_agg(%s), array_agg(%s), (SELECT array_agg(i.id ORDER BY ev.ord) FROM unnest($1) WITH ORDINALITY AS ev(x, ord) JOIN ids i ON i.x = ev.x), (SELECT array_agg(x ORDER BY id) FROM ids) FROM (SELECT t.*, (row_number() OVER (PARTITION BY t.bkey))::int AS bidx FROM provsql_reachability_edges_tmp t) e JOIN ids iu ON iu.x = e.u JOIN ids iv ON iv.x = e.v $sql$, sel_probs, sel_bkeys, sel_bidx) INTO sources, destinations, tokens, probabilities, block_keys, block_indices, extra_ids, vertices USING extra_vertices; DROP TABLE provsql_reachability_edges_tmp; DROP TABLE IF EXISTS provsql_reachability_support_tmp; END -- No SET search_path: the deparsed edge subquery (and the regclass -- rendering) must resolve against the caller's search_path; the ProvSQL -- calls above are schema-qualified instead. $$ LANGUAGE plpgsql SET client_min_messages = warning; /** * @brief Gather a source relation's vertices, tokens and probabilities * (internal) * * For a provenance-tracked source relation, every tuple must carry a * base @c input token (a *probabilistic source set*); for an untracked * relation the sources are certain and the tokens come back as the nil * UUID. Vertex values are returned as text, for the shared dense-ID * mapping of @c gather_reachability_edges(). * * @param rel the source relation * @param source_attribute name of the vertex column */ CREATE OR REPLACE FUNCTION gather_reachability_sources( IN rel regclass, IN source_attribute TEXT, OUT source_values TEXT[], OUT source_tokens UUID[], OUT source_probabilities DOUBLE PRECISION[]) AS $$ DECLARE tracked boolean; tkind text; BEGIN SELECT EXISTS ( SELECT 1 FROM pg_attribute WHERE attrelid = rel AND attname = 'provsql' AND atttypid = 'uuid'::regtype AND NOT attisdropped) INTO tracked; -- Registry consultation: a TID source relation is certified -- all-base-input, so the per-row gate check can be skipped; a BID one -- holds block-correlated tuples, which a probabilistic source set -- cannot model -- reject it before gathering anything. IF tracked THEN tkind := (get_table_info(rel::oid)).kind; IF tkind = 'bid' THEN RAISE EXCEPTION 'reachability: % is block-independent (repair_key); block-correlated source sets are not supported', rel; END IF; END IF; DROP TABLE IF EXISTS provsql_reachability_sources_tmp; IF tracked THEN EXECUTE format( 'CREATE TEMP TABLE provsql_reachability_sources_tmp AS ' || 'SELECT %1$I::text AS x, provenance() AS token ' || 'FROM %2$s WHERE %1$I IS NOT NULL', source_attribute, rel); PERFORM remove_provenance('provsql_reachability_sources_tmp'); IF tkind IS DISTINCT FROM 'tid' AND EXISTS (SELECT 1 FROM provsql_reachability_sources_tmp WHERE get_gate_type(token) <> 'input') THEN DROP TABLE provsql_reachability_sources_tmp; RAISE EXCEPTION 'reachability: the provenance of % must consist of base input tokens (independent tuples); views or query results are not supported', rel; END IF; SELECT array_agg(x), array_agg(token), array_agg(coalesce(get_prob(token), 1.0)) INTO source_values, source_tokens, source_probabilities FROM provsql_reachability_sources_tmp; DROP TABLE provsql_reachability_sources_tmp; ELSE EXECUTE format( 'CREATE TEMP TABLE provsql_reachability_sources_tmp AS ' || 'SELECT DISTINCT %1$I::text AS x FROM %2$s WHERE %1$I IS NOT NULL', source_attribute, rel); SELECT array_agg(x), array_agg('00000000-0000-0000-0000-000000000000'::uuid), array_agg(1.0::float8) INTO source_values, source_tokens, source_probabilities FROM provsql_reachability_sources_tmp; DROP TABLE provsql_reachability_sources_tmp; END IF; END $$ LANGUAGE plpgsql SET search_path=provsql,pg_temp,public SET client_min_messages = warning; /** * @brief Fixpoint driver for the recursive reachability shape: * decomposition-aligned compilation with fallback to eval_recursive * * Called (at plan time, over SPI) by the recursive-CTE lowering when * the provenance class is 'absorptive' or 'boolean' * (@c provsql.provenance) and the CTE matches the linear * reachability shape over a tracked base edge relation. Attempts the * decomposition-aligned route -- gather the edges, compile every * reachable vertex's certified provenance circuit along a tree * decomposition of the data graph, materialise them, and fill the * working table with one tokenised row per reachable vertex. On any * failure (data treewidth above the cap, per-node state bound, edges * that are not independent base tuples...), falls back to the generic * @c eval_recursive() fixpoint, preserving its behaviour exactly. * * @param edge_rel the provenance-tracked edge relation * @param source_attribute name of the source-vertex column * @param destination_attribute name of the destination-vertex column * @param source_value the base arm's constant, as text * @param directed if false, each edge can be traversed both ways * @param work_name name of the working temp table (the CTE name) * @param colnames comma-separated user column names (for the fallback) * @param coldef column definitions of the working table * @param coltype type of the CTE's single column * @param body_sql deparsed CTE body (for the fallback) * @param edge_quals optional deterministic filter over edge columns * (deparsed from the recursive arm's WHERE clause) * @param source_rel source relation of a multi-source base arm * (@c SELECT col FROM sources), NULL for the constant form; * tracked sources form a probabilistic source set, untracked * ones are certain * @param source_rel_attribute the source relation's vertex column * @param edge_sql deparsed edge subquery when the recursive arm joins a * derived (join-defined) edge relation instead of a base one; * NULL for the regclass form * @param hop_bound maximum number of recursive steps for the * hop-counting CTE shape (NULL for plain reachability) * @param hop_seed the base arm's hop constant (hop-counting shape) * @param hops_position 1-based position of the hop column among the * CTE's two columns (hop-counting shape) */ CREATE OR REPLACE FUNCTION eval_reachability( edge_rel regclass, source_attribute text, destination_attribute text, source_value text, directed boolean, work_name text, colnames text, coldef text, coltype text, body_sql text, edge_quals text DEFAULT NULL, source_rel regclass DEFAULT NULL, source_rel_attribute text DEFAULT NULL, edge_sql text DEFAULT NULL, hop_bound int DEFAULT NULL, hop_seed int DEFAULT NULL, hops_position int DEFAULT NULL) RETURNS void AS $$ DECLARE e record; sv text[]; st uuid[]; sp double precision[]; verbosity int := coalesce(current_setting('provsql.verbose_level', true)::int, 0); BEGIN BEGIN IF source_rel IS NOT NULL THEN -- Multi-source: gather the source relation (probabilistic when -- tracked, certain otherwise). SELECT g.source_values, g.source_tokens, g.source_probabilities INTO sv, st, sp FROM provsql.gather_reachability_sources(source_rel, source_rel_attribute) g; IF sv IS NULL THEN sv := ARRAY[]::text[]; st := ARRAY[]::uuid[]; sp := ARRAY[]::float8[]; END IF; ELSE -- Constant base arm: one certain source. sv := ARRAY[source_value]; st := ARRAY['00000000-0000-0000-0000-000000000000'::uuid]; sp := ARRAY[1.0::float8]; END IF; e := provsql.gather_reachability_edges(edge_rel, source_attribute, destination_attribute, sv, edge_quals, edge_sql); IF to_regclass(work_name) IS NOT NULL THEN EXECUTE format('DROP TABLE %I', work_name); END IF; EXECUTE format('CREATE TEMP TABLE %I (%s, provsql uuid)', work_name, coldef); IF hop_bound IS NULL THEN EXECUTE format( 'INSERT INTO %I SELECT ($1::text[])[m.vertex]::%s, m.token ' || 'FROM provsql.reachability_materialize($2, $3, $4, $5, $6, $7, $8, $9, $10, $11) m', work_name, coltype) USING e.vertices, e.sources, e.destinations, e.tokens, e.probabilities, e.block_keys, e.block_indices, e.extra_ids, st, sp, directed; ELSE -- Hop-counting shape: one row per (vertex, walk length), the hop -- column in its CTE position. EXECUTE format( 'INSERT INTO %I SELECT %s, m.token ' || 'FROM provsql.reachability_materialize_hops($2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) m', work_name, CASE WHEN hops_position = 1 THEN format('m.hops, ($1::text[])[m.vertex]::%s', coltype) ELSE format('($1::text[])[m.vertex]::%s, m.hops', coltype) END) USING e.vertices, e.sources, e.destinations, e.tokens, e.probabilities, e.block_keys, e.block_indices, e.extra_ids, st, sp, directed, hop_bound, hop_seed; END IF; IF verbosity >= 20 THEN RAISE NOTICE 'ProvSQL: recursive CTE "%" compiled along a tree decomposition of %', work_name, coalesce(edge_rel::text, 'the join-defined edge query'); END IF; EXCEPTION WHEN OTHERS THEN IF verbosity >= 10 THEN RAISE NOTICE 'ProvSQL: reachability route for "%" fell back to the generic fixpoint (%)', work_name, SQLERRM; END IF; PERFORM provsql.eval_recursive(body_sql, work_name, colnames, coldef); END; END $$ LANGUAGE plpgsql; /** * @brief Create a times (product) gate from multiple provenance tokens * * Filters out NULL and one-gates; returns gate_one() if all tokens * are trivial, or a single token if only one remains. * * Before creating an ordinary gate, the *times-canonical* address of * the surviving multiset -- @c uuid5('times-canonical{sorted tokens}') * -- is probed: the reachability rewriter pre-creates there, for * self-join conjunctions of reachability tokens, a certified * equivalent (the all-members-reachable circuit; see * @c plant_reach_cover). Ordinary creation never writes under that * recipe, so a hit is always a deliberate plant; the historical * order-dependent recipe is used unchanged otherwise, so ordinary * times gates (and their formula rendering) are untouched. */ CREATE OR REPLACE FUNCTION provenance_times(VARIADIC tokens uuid[]) RETURNS UUID AS $$ DECLARE times_token uuid; filtered_tokens uuid[]; canonical uuid; BEGIN SELECT array_agg(t) FROM unnest(tokens) t WHERE t IS NOT NULL AND t <> gate_one() INTO filtered_tokens; -- Dispatch on the FILTERED count: a single survivor short-circuits -- to that token directly (no useless single-child times gate); zero -- survivors collapse to the identity. Using array_length(tokens, 1) -- here would miss the [one, cmp] → [cmp] case, leaving the cmp wrapped -- in a one-child times when its only sibling was gate_one(). CASE coalesce(array_length(filtered_tokens, 1), 0) WHEN 0 THEN times_token:=gate_one(); WHEN 1 THEN times_token:=filtered_tokens[1]; ELSE -- Computed separately from the filtering aggregate above: an -- ORDER BY aggregate there would make the planner feed *both* -- aggregates sorted input, scrambling the stored children order. SELECT uuid_generate_v5(uuid_ns_provsql(), concat('times-canonical', array_agg(t ORDER BY t))) FROM unnest(filtered_tokens) t INTO canonical; IF get_gate_type(canonical) = 'times' THEN -- A deliberate pre-creation at the canonical address: same -- children, same product. times_token := canonical; ELSE times_token := uuid_generate_v5(uuid_ns_provsql(),concat('times',filtered_tokens)); PERFORM create_gate(times_token, 'times', ARRAY_AGG(t)) FROM UNNEST(filtered_tokens) AS t WHERE t IS NOT NULL; END IF; END CASE; RETURN times_token; END $$ LANGUAGE plpgsql SET search_path=provsql,pg_temp,public SECURITY DEFINER PARALLEL SAFE; /** * @brief Compile and materialise the "every member vertex reachable" * (k-terminal / coverage) circuit (columnar form, internal) * * Arguments as @c reachability_materialize_any() with a single member * set: compiles the certified circuit of "every member vertex is * reachable from a present source" -- the conjunction over the * members' *correlated* per-vertex events, deterministic by * construction through the pending rescuer-set congruence -- * materialises it, and returns its token, wrapped in the * @c 'absorptive' assumption marker. Probability evaluation gives the * k-terminal reliability; nonnegative min-plus the cost of the * cheapest covering subgraph (directed Steiner cost), shared edges * paid once. A member vertex absent from the graph is unreachable: * the circuit is then constant false. * * @param sources source vertex of each edge (dense integer IDs) * @param destinations destination vertex of each edge * @param tokens provenance token of each edge tuple * @param probabilities probability of each edge tuple * @param block_keys per-edge BID key variable (nil UUID = independent) * @param block_indices per-edge outcome index within its block * @param source_vertices the source vertices * @param source_tokens per-source provenance token (nil UUID = certain) * @param source_probabilities per-source probability * @param directed if false, each edge can be traversed both ways * @param member_vertices the member vertices (dense IDs) */ CREATE OR REPLACE FUNCTION reachability_materialize_cover( sources INT[], destinations INT[], tokens UUID[], probabilities DOUBLE PRECISION[], block_keys UUID[], block_indices INT[], source_vertices INT[], source_tokens UUID[], source_probabilities DOUBLE PRECISION[], directed BOOLEAN, member_vertices INT[]) RETURNS UUID AS 'provsql','reachability_materialize_cover' LANGUAGE C VOLATILE; /** * @brief Plant the certified all-members-reachable gate for a * reachability self-join conjunction (internal) * * Called (at plan time, over SPI) by the recursive-CTE lowering when * the outer query self-joins a reachability working table with one * constant node binding per reference -- "are these k vertices all * reachable" -- whose row provenance @c provenance_times() computes as * the product of *correlated* per-vertex reach tokens (they share * edges). This pre-creates, at the times-canonical address of that * token multiset, a certified single-child times over the native * all-members-reachable circuit (@c reachability_materialize_cover), * so the natural conjunction stays on the linear certified route -- * with the joint-worlds semantics: probability evaluation gives the * k-terminal reliability, and nonnegative min-plus the cost of the * cheapest covering subgraph (directed Steiner cost), shared edges * paid once where the raw product would pay them once per factor. * Best-effort: any failure leaves the generic path untouched (notice * under verbosity 10). * * @param work_name the lowered CTE's working table * @param node_attribute its vertex column * @param edge_rel the tracked edge relation (as for eval_reachability) * @param source_attribute name of the source-vertex column * @param destination_attribute name of the destination-vertex column * @param source_value the base arm's constant, as text * @param directed if false, each edge can be traversed both ways * @param node_values the constant node bindings, as text (multiset: * one per self-join reference) * @param edge_quals optional deterministic filter over edge columns * @param source_rel source relation of a multi-source base arm * @param source_rel_attribute the source relation's vertex column * @param edge_sql deparsed edge subquery (join-defined edges) */ CREATE OR REPLACE FUNCTION plant_reach_cover( work_name text, node_attribute text, edge_rel regclass, source_attribute text, destination_attribute text, source_value text, directed boolean, node_values text[], edge_quals text DEFAULT NULL, source_rel regclass DEFAULT NULL, source_rel_attribute text DEFAULT NULL, edge_sql text DEFAULT NULL) RETURNS void AS $$ DECLARE e record; sv text[]; st uuid[]; sp double precision[]; val text; vid int; vids int[] := ARRAY[]::int[]; tok uuid; toks uuid[] := ARRAY[]::uuid[]; cover_token uuid; canonical uuid; verbosity int := coalesce(current_setting('provsql.verbose_level', true)::int, 0); BEGIN BEGIN IF source_rel IS NOT NULL THEN SELECT g.source_values, g.source_tokens, g.source_probabilities INTO sv, st, sp FROM provsql.gather_reachability_sources(source_rel, source_rel_attribute) g; IF sv IS NULL THEN sv := ARRAY[]::text[]; st := ARRAY[]::uuid[]; sp := ARRAY[]::float8[]; END IF; ELSE sv := ARRAY[source_value]; st := ARRAY['00000000-0000-0000-0000-000000000000'::uuid]; sp := ARRAY[1.0::float8]; END IF; e := provsql.gather_reachability_edges(edge_rel, source_attribute, destination_attribute, sv, edge_quals, edge_sql); -- The bound vertices and their per-row reach tokens, with the -- multiplicity the self-join produces. A vertex absent from the -- graph, or from the working table, means the join is empty: no -- row will exist, nothing to plant. FOREACH val IN ARRAY node_values LOOP vid := array_position(e.vertices, val); IF vid IS NULL THEN RETURN; END IF; vids := vids || vid; EXECUTE format('SELECT provsql FROM %I WHERE %I::text = $1', work_name, node_attribute) INTO tok USING val; IF tok IS NULL THEN RETURN; END IF; toks := toks || tok; END LOOP; cover_token := provsql.reachability_materialize_cover( e.sources, e.destinations, e.tokens, e.probabilities, e.block_keys, e.block_indices, e.extra_ids, st, sp, directed, vids); SELECT public.uuid_generate_v5( provsql.uuid_ns_provsql(), concat('times-canonical', array_agg(t ORDER BY t))) FROM unnest(toks) t INTO canonical; PERFORM provsql.create_gate(canonical, 'times', ARRAY[cover_token]); PERFORM provsql.set_infos(canonical, 1); IF verbosity >= 20 THEN -- Lift the function-level client_min_messages = warning for the -- one RAISE; the function-level SET restores the caller's value. PERFORM set_config('client_min_messages', 'notice', true); RAISE NOTICE 'ProvSQL: certified all-members gate planted for the self-join of "%"', work_name; PERFORM set_config('client_min_messages', 'warning', true); END IF; EXCEPTION WHEN OTHERS THEN IF verbosity >= 10 THEN PERFORM set_config('client_min_messages', 'notice', true); RAISE NOTICE 'ProvSQL: all-members planting for "%" skipped (%)', work_name, SQLERRM; PERFORM set_config('client_min_messages', 'warning', true); END IF; END; END -- No SET search_path: the deparsed edge subquery must resolve against -- the caller's path; ProvSQL internals are schema-qualified. $$ LANGUAGE plpgsql SET client_min_messages = warning; -- ---------------------------------------------------------------------- -- Conditioning: the | / cond operator (uuid carrier). -- -- cond(target, evidence) builds the terminal gate_conditioned that the -- measure evaluators read as P(target ∧ evidence) / P(evidence). The gate -- stores [target, evidence, joint] with joint = times(target, evidence), so -- probability_evaluate is the plain ratio P(joint)/P(evidence); content- -- addressing makes shared base tuples the same input gate in both circuits, -- so the conditional is exact and correlation-aware. Nested conditioning -- folds (sequential Bayesian update (X|A)|B = X|(A∧B)); the token is -- terminal (refused as a child of any semiring gate, and by every general -- sr_* semiring at evaluation). -- ---------------------------------------------------------------------- CREATE OR REPLACE FUNCTION cond(target UUID, evidence UUID) RETURNS UUID AS $$ DECLARE tgt uuid; ev uuid; jnt uuid; result uuid; ch uuid[]; BEGIN IF evidence IS NULL OR evidence = gate_one() THEN RETURN target; END IF; tgt := coalesce(target, gate_one()); IF get_gate_type(tgt) = 'conditioned' THEN ch := get_children(tgt); tgt := ch[1]; ev := provenance_times(ch[2], evidence); jnt := provenance_times(ch[3], evidence); ELSE ev := evidence; jnt := provenance_times(tgt, evidence); END IF; result := public.uuid_generate_v5(uuid_ns_provsql(), concat('conditioned', tgt, ev, jnt)); PERFORM create_gate(result, 'conditioned', ARRAY[tgt, ev, jnt]); RETURN result; END $$ LANGUAGE plpgsql SET search_path=provsql,pg_temp,public SECURITY DEFINER PARALLEL SAFE; CREATE OPERATOR | (LEFTARG=UUID, RIGHTARG=UUID, PROCEDURE=cond); -- Whole-tuple output conditioning marker: given(c) / prefix | c. A consumed -- select-list term the rewriter strips, conditioning each output row's -- provenance on c. Identity at runtime when the rewriter is inactive. CREATE OR REPLACE FUNCTION given(evidence UUID) RETURNS UUID AS $$ SELECT evidence; $$ LANGUAGE sql IMMUTABLE PARALLEL SAFE; CREATE OPERATOR | (RIGHTARG=UUID, PROCEDURE=given); -- ---------------------------------------------------------------------- -- Conditioning: the X | C operator for the random_variable carrier. -- -- random_variable_cond builds a composable two-child gate_conditioned -- [target, condition]; the moment / support dispatchers below unpack it -- (rv_conditioned_target / rv_conditioned_prov) and route through the -- existing conditional evaluator (rv_moment / rv_support). -- ---------------------------------------------------------------------- CREATE OR REPLACE FUNCTION random_variable_cond(rv random_variable, cond uuid) RETURNS random_variable AS $$ DECLARE tgt uuid; ev uuid; result uuid; ch uuid[]; BEGIN IF cond IS NULL OR cond = gate_one() THEN RETURN rv; END IF; tgt := (rv)::uuid; IF get_gate_type(tgt) = 'conditioned' AND array_length(get_children(tgt), 1) = 2 THEN -- Fold (X|A)|B = X|(A∧B): the rv-carrier conditioned gate is the -- two-child [target, condition] shape; accumulate the new event. ch := get_children(tgt); tgt := ch[1]; ev := provenance_times(ch[2], cond); ELSE ev := cond; END IF; result := public.uuid_generate_v5(uuid_ns_provsql(), concat('conditioned', tgt, ev)); PERFORM create_gate(result, 'conditioned', ARRAY[tgt, ev]); RETURN (result)::random_variable; END $$ LANGUAGE plpgsql SET search_path=provsql,pg_temp,public SECURITY DEFINER PARALLEL SAFE; CREATE OPERATOR | ( LEFTARG = random_variable, RIGHTARG = uuid, PROCEDURE = random_variable_cond ); /** * @brief Unpack the target of a random-variable conditioning gate. * * For a two-child @c gate_conditioned @c [target, condition] (the @c "X | C" * shape) returns @p target; for any other token returns it unchanged. Used * by the moment / support dispatchers to route a conditioned distribution * through the existing conditional evaluator. */ CREATE OR REPLACE FUNCTION rv_conditioned_target(token uuid) RETURNS uuid AS $$ SELECT CASE WHEN provsql.get_gate_type(token) = 'conditioned' AND array_length(provsql.get_children(token), 1) = 2 THEN (provsql.get_children(token))[1] ELSE token END; $$ LANGUAGE sql STABLE PARALLEL SAFE SET search_path=provsql,pg_temp,public; /** * @brief Combine a conditioning gate's event with an explicit @p prov. * * For a two-child @c gate_conditioned @c [target, condition] returns * @c "condition ∧ prov"; otherwise returns @p prov unchanged. Lets a stored * @c "X | C" be queried as @c expected(X|C) (prov defaulting to one) or have * an extra condition conjoined as @c expected(X|C, extra_prov). */ CREATE OR REPLACE FUNCTION rv_conditioned_prov(token uuid, prov uuid) RETURNS uuid AS $$ SELECT CASE WHEN provsql.get_gate_type(token) = 'conditioned' AND array_length(provsql.get_children(token), 1) = 2 THEN provsql.provenance_times((provsql.get_children(token))[2], prov) ELSE prov END; $$ LANGUAGE sql STABLE PARALLEL SAFE SET search_path=provsql,pg_temp,public; CREATE OR REPLACE FUNCTION variance( input ANYELEMENT, prov UUID = gate_one(), method text = NULL, arguments text = NULL) RETURNS DOUBLE PRECISION AS $$ DECLARE m1 float8; m2 float8; BEGIN IF pg_typeof(input) = 'random_variable'::regtype THEN IF input IS NULL THEN RETURN NULL; END IF; -- Conditioning on prov is handled inside rv_moment: when prov -- resolves to gate_one() (the default, or load-time -- simplification of any always-true sub-circuit) the -- unconditional analytical path runs unchanged; otherwise the -- joint-circuit loader unifies shared gate_rv leaves between -- input and prov, and the conditional path runs either -- truncated-distribution closed form or MC rejection. RETURN provsql.rv_moment( rv_conditioned_target((input::random_variable)::uuid), 2, true, rv_conditioned_prov((input::random_variable)::uuid, prov)); END IF; IF pg_typeof(input) = 'agg_token'::regtype THEN IF input IS NULL THEN RETURN NULL; END IF; m1 := agg_raw_moment(input::agg_token, 1, prov, method, arguments); m2 := agg_raw_moment(input::agg_token, 2, prov, method, arguments); IF m1 IS NULL OR m2 IS NULL THEN RETURN NULL; END IF; RETURN m2 - m1 * m1; END IF; RAISE EXCEPTION 'variance() is not yet supported for input type %', pg_typeof(input); END $$ LANGUAGE plpgsql PARALLEL SAFE SET search_path=provsql SECURITY DEFINER; CREATE OR REPLACE FUNCTION moment( input ANYELEMENT, k integer, prov UUID = gate_one(), method text = NULL, arguments text = NULL) RETURNS DOUBLE PRECISION AS $$ BEGIN IF pg_typeof(input) = 'random_variable'::regtype THEN IF input IS NULL OR k IS NULL THEN RETURN NULL; END IF; -- See variance() above: rv_moment handles the conditional/unconditional -- dispatch internally based on the resolved prov gate type. RETURN provsql.rv_moment( rv_conditioned_target((input::random_variable)::uuid), k, false, rv_conditioned_prov((input::random_variable)::uuid, prov)); END IF; IF pg_typeof(input) = 'agg_token'::regtype THEN RETURN agg_raw_moment(input::agg_token, k, prov, method, arguments); END IF; RAISE EXCEPTION 'moment() is not yet supported for input type %', pg_typeof(input); END $$ LANGUAGE plpgsql PARALLEL SAFE SET search_path=provsql SECURITY DEFINER; CREATE OR REPLACE FUNCTION support( input ANYELEMENT, prov UUID = gate_one(), method text = NULL, arguments text = NULL, OUT lo float8, OUT hi float8) AS $$ DECLARE aggregation_function VARCHAR; child_pairs uuid[]; values_arr float8[]; total_probability float8; BEGIN IF input IS NULL THEN lo := NULL; hi := NULL; RETURN; END IF; -- Plain numeric: degenerate point support. Lets `support(2.5)` / -- `support(42)` / etc. return (2.5, 2.5) without making the user -- wrap in `as_random`. IF pg_typeof(input) IN ( 'smallint'::regtype, 'integer'::regtype, 'bigint'::regtype, 'numeric'::regtype, 'real'::regtype, 'double precision'::regtype) THEN lo := input::double precision; hi := input::double precision; RETURN; END IF; -- random_variable is binary-coercible to uuid (explicit cast -- below), so a single rv_support call covers both shapes. -- rv_support handles -- gate_value (point), gate_rv (distribution), gate_arith -- (propagated), and falls back to the conservative all-real -- interval for any other gate kind. Conditioning on prov is not -- supported (would require restricting the underlying joint -- distribution by the indicator of prov, which has no closed form -- for the basic distributions we ship). IF pg_typeof(input) IN ('random_variable'::regtype, 'uuid'::regtype) THEN -- Conditional support: rv_support folds the AND-conjunct interval -- constraints from prov into the unconditional support. When -- prov is gate_one() the unconditional support is returned -- unchanged. SELECT r.lo, r.hi INTO lo, hi FROM provsql.rv_support( rv_conditioned_target(input::uuid), rv_conditioned_prov(input::uuid, prov)) r; RETURN; END IF; IF pg_typeof(input) = 'agg_token'::regtype THEN IF get_gate_type(input::agg_token) <> 'agg' THEN RAISE EXCEPTION USING MESSAGE='Wrong gate type for support computation'; END IF; SELECT pp.proname::varchar FROM pg_proc pp WHERE oid=(get_infos(input::agg_token)).info1 INTO aggregation_function; child_pairs := get_children(input::agg_token); IF aggregation_function = 'sum' OR aggregation_function = 'count' THEN -- count(col) is a SUM of per-row 0/1 indicators (empty group = 0), so its -- support is computed like SUM; count(*) arrives as 'sum'. -- Empty agg_token: SUM is identically 0. IF COALESCE(array_length(child_pairs, 1), 0) = 0 THEN lo := 0; hi := 0; RETURN; END IF; SELECT sum(LEAST(v, 0::float8)), sum(GREATEST(v, 0::float8)) INTO lo, hi FROM (SELECT CAST(get_extra((get_children(c))[2]) AS float8) AS v FROM unnest(child_pairs) AS c) sub; ELSIF aggregation_function = 'min' OR aggregation_function = 'max' THEN -- MIN/MAX over the empty input world are NULL, not ±Infinity (matching the -- moment surface): the empty world carries no value, so the support is just -- the range of the per-row values [min(v), max(v)]. A structurally empty -- aggregate has no defined value at all -> NULL support. IF COALESCE(array_length(child_pairs, 1), 0) = 0 THEN lo := NULL; hi := NULL; RETURN; END IF; SELECT min(v), max(v) INTO lo, hi FROM (SELECT CAST(get_extra((get_children(c))[2]) AS float8) AS v FROM UNNEST(child_pairs) AS c) sub; ELSE RAISE EXCEPTION USING MESSAGE= 'Cannot compute support for aggregation function ' || aggregation_function; END IF; RETURN; END IF; RAISE EXCEPTION 'support() is not yet supported for input type %', pg_typeof(input); END $$ LANGUAGE plpgsql PARALLEL SAFE SET search_path=provsql SECURITY DEFINER; CREATE OR REPLACE FUNCTION central_moment( input ANYELEMENT, k integer, prov UUID = gate_one(), method text = NULL, arguments text = NULL) RETURNS DOUBLE PRECISION AS $$ DECLARE mu float8; total float8; i integer; raw_i float8; binom float8; -- iterative binomial coefficient C(k, i) k_double float8; BEGIN IF pg_typeof(input) = 'random_variable'::regtype THEN IF input IS NULL OR k IS NULL THEN RETURN NULL; END IF; -- See variance() above: rv_moment handles the conditional/unconditional -- dispatch internally based on the resolved prov gate type. RETURN provsql.rv_moment( rv_conditioned_target((input::random_variable)::uuid), k, true, rv_conditioned_prov((input::random_variable)::uuid, prov)); END IF; IF pg_typeof(input) = 'agg_token'::regtype THEN IF input IS NULL OR k IS NULL THEN RETURN NULL; END IF; IF k < 0 THEN RAISE EXCEPTION 'central_moment(): k must be non-negative (got %)', k; END IF; IF k = 0 THEN RETURN 1; END IF; IF k = 1 THEN RETURN 0; END IF; mu := agg_raw_moment(input::agg_token, 1, prov, method, arguments); IF mu IS NULL THEN RETURN NULL; END IF; -- mu may be ±Infinity for empty MIN / MAX with positive empty -- probability; central_moment is undefined in that case. IF mu = 'Infinity'::float8 OR mu = '-Infinity'::float8 THEN RETURN mu; END IF; total := 0; binom := 1; -- C(k, 0) k_double := k; FOR i IN 0..k LOOP raw_i := agg_raw_moment(input::agg_token, i, prov, method, arguments); IF raw_i IS NULL THEN RETURN NULL; END IF; total := total + binom * power(-mu, k - i) * raw_i; -- C(k, i+1) = C(k, i) * (k - i) / (i + 1) IF i < k THEN binom := binom * (k_double - i) / (i + 1); END IF; END LOOP; RETURN total; END IF; RAISE EXCEPTION 'central_moment() is not yet supported for input type %', pg_typeof(input); END $$ LANGUAGE plpgsql PARALLEL SAFE SET search_path=provsql SECURITY DEFINER; -- ---------------------------------------------------------------------- -- Conditioning: the SUM(x) | C operator for the agg_token carrier, and the -- final moment / support dispatchers carrying both the random_variable and -- agg_token conditioned-distribution unpacking. -- ---------------------------------------------------------------------- CREATE OR REPLACE FUNCTION agg_token_cond(a agg_token, cond uuid) RETURNS agg_token AS $$ DECLARE tok uuid; ev uuid; result uuid; ch uuid[]; BEGIN IF cond IS NULL OR cond = gate_one() THEN RETURN a; END IF; tok := (a)::uuid; IF get_gate_type(tok) = 'conditioned' AND array_length(get_children(tok), 1) = 2 THEN ch := get_children(tok); tok := ch[1]; ev := provenance_times(ch[2], cond); ELSE ev := cond; END IF; result := public.uuid_generate_v5(uuid_ns_provsql(), concat('conditioned', tok, ev)); PERFORM create_gate(result, 'conditioned', ARRAY[tok, ev]); RETURN agg_token_make(result, agg_token_value(a)); END $$ LANGUAGE plpgsql SET search_path=provsql,pg_temp,public SECURITY DEFINER PARALLEL SAFE; CREATE OPERATOR | ( LEFTARG = agg_token, RIGHTARG = uuid, PROCEDURE = agg_token_cond ); /** * @brief Unpack the target of a conditioned @c agg_token. * * For a @c "SUM(x) | C" whose provenance token is the two-child * @c gate_conditioned @c [agg_target, condition] returns the agg_token over * @c agg_target (same running value); for any other agg_token returns it * unchanged. The conditioning event itself is recovered separately via * @c rv_conditioned_prov on the token's uuid. */ CREATE OR REPLACE FUNCTION agg_conditioned_target(a agg_token) RETURNS agg_token AS $$ SELECT CASE WHEN provsql.get_gate_type((a)::uuid) = 'conditioned' AND array_length(provsql.get_children((a)::uuid), 1) = 2 THEN provsql.agg_token_make( (provsql.get_children((a)::uuid))[1], provsql.agg_token_value(a)) ELSE a END; $$ LANGUAGE sql STABLE PARALLEL SAFE SET search_path=provsql,pg_temp,public; CREATE OR REPLACE FUNCTION variance( input ANYELEMENT, prov UUID = gate_one(), method text = NULL, arguments text = NULL) RETURNS DOUBLE PRECISION AS $$ DECLARE m1 float8; m2 float8; BEGIN IF pg_typeof(input) = 'random_variable'::regtype THEN IF input IS NULL THEN RETURN NULL; END IF; -- Conditioning on prov is handled inside rv_moment: when prov -- resolves to gate_one() (the default, or load-time -- simplification of any always-true sub-circuit) the -- unconditional analytical path runs unchanged; otherwise the -- joint-circuit loader unifies shared gate_rv leaves between -- input and prov, and the conditional path runs either -- truncated-distribution closed form or MC rejection. RETURN provsql.rv_moment( rv_conditioned_target((input::random_variable)::uuid), 2, true, rv_conditioned_prov((input::random_variable)::uuid, prov)); END IF; IF pg_typeof(input) = 'agg_token'::regtype THEN IF input IS NULL THEN RETURN NULL; END IF; m1 := agg_raw_moment(agg_conditioned_target(input::agg_token), 1, rv_conditioned_prov(input::uuid, prov), method, arguments); m2 := agg_raw_moment(agg_conditioned_target(input::agg_token), 2, rv_conditioned_prov(input::uuid, prov), method, arguments); IF m1 IS NULL OR m2 IS NULL THEN RETURN NULL; END IF; RETURN m2 - m1 * m1; END IF; RAISE EXCEPTION 'variance() is not yet supported for input type %', pg_typeof(input); END $$ LANGUAGE plpgsql PARALLEL SAFE SET search_path=provsql SECURITY DEFINER; CREATE OR REPLACE FUNCTION moment( input ANYELEMENT, k integer, prov UUID = gate_one(), method text = NULL, arguments text = NULL) RETURNS DOUBLE PRECISION AS $$ BEGIN IF pg_typeof(input) = 'random_variable'::regtype THEN IF input IS NULL OR k IS NULL THEN RETURN NULL; END IF; -- See variance() above: rv_moment handles the conditional/unconditional -- dispatch internally based on the resolved prov gate type. RETURN provsql.rv_moment( rv_conditioned_target((input::random_variable)::uuid), k, false, rv_conditioned_prov((input::random_variable)::uuid, prov)); END IF; IF pg_typeof(input) = 'agg_token'::regtype THEN RETURN agg_raw_moment(agg_conditioned_target(input::agg_token), k, rv_conditioned_prov(input::uuid, prov), method, arguments); END IF; RAISE EXCEPTION 'moment() is not yet supported for input type %', pg_typeof(input); END $$ LANGUAGE plpgsql PARALLEL SAFE SET search_path=provsql SECURITY DEFINER; CREATE OR REPLACE FUNCTION support( input ANYELEMENT, prov UUID = gate_one(), method text = NULL, arguments text = NULL, OUT lo float8, OUT hi float8) AS $$ DECLARE aggregation_function VARCHAR; child_pairs uuid[]; values_arr float8[]; total_probability float8; BEGIN IF input IS NULL THEN lo := NULL; hi := NULL; RETURN; END IF; -- Plain numeric: degenerate point support. Lets `support(2.5)` / -- `support(42)` / etc. return (2.5, 2.5) without making the user -- wrap in `as_random`. IF pg_typeof(input) IN ( 'smallint'::regtype, 'integer'::regtype, 'bigint'::regtype, 'numeric'::regtype, 'real'::regtype, 'double precision'::regtype) THEN lo := input::double precision; hi := input::double precision; RETURN; END IF; -- random_variable is binary-coercible to uuid (explicit cast -- below), so a single rv_support call covers both shapes. -- rv_support handles -- gate_value (point), gate_rv (distribution), gate_arith -- (propagated), and falls back to the conservative all-real -- interval for any other gate kind. Conditioning on prov is not -- supported (would require restricting the underlying joint -- distribution by the indicator of prov, which has no closed form -- for the basic distributions we ship). IF pg_typeof(input) IN ('random_variable'::regtype, 'uuid'::regtype) THEN -- Conditional support: rv_support folds the AND-conjunct interval -- constraints from prov into the unconditional support. When -- prov is gate_one() the unconditional support is returned -- unchanged. SELECT r.lo, r.hi INTO lo, hi FROM provsql.rv_support( rv_conditioned_target(input::uuid), rv_conditioned_prov(input::uuid, prov)) r; RETURN; END IF; IF pg_typeof(input) = 'agg_token'::regtype THEN -- A conditioned aggregate SUM(x)|C: the value-range support is that of -- the target aggregate (conditioning can only tighten it; the -- conservative range stays valid), so unpack to the target gate. DECLARE atok agg_token := agg_conditioned_target(input::agg_token); BEGIN IF get_gate_type(atok) <> 'agg' THEN RAISE EXCEPTION USING MESSAGE='Wrong gate type for support computation'; END IF; SELECT pp.proname::varchar FROM pg_proc pp WHERE oid=(get_infos(atok)).info1 INTO aggregation_function; child_pairs := get_children(atok); IF aggregation_function = 'sum' OR aggregation_function = 'count' THEN -- count(col) is a SUM of per-row 0/1 indicators (empty group = 0), so its -- support is computed like SUM; count(*) arrives as 'sum'. -- Empty agg_token: SUM is identically 0. IF COALESCE(array_length(child_pairs, 1), 0) = 0 THEN lo := 0; hi := 0; RETURN; END IF; SELECT sum(LEAST(v, 0::float8)), sum(GREATEST(v, 0::float8)) INTO lo, hi FROM (SELECT CAST(get_extra((get_children(c))[2]) AS float8) AS v FROM unnest(child_pairs) AS c) sub; ELSIF aggregation_function = 'min' OR aggregation_function = 'max' THEN -- MIN/MAX over the empty input world are NULL, not ±Infinity (matching the -- moment surface): the empty world carries no value, so the support is just -- the range of the per-row values [min(v), max(v)]. A structurally empty -- aggregate has no defined value at all -> NULL support. IF COALESCE(array_length(child_pairs, 1), 0) = 0 THEN lo := NULL; hi := NULL; RETURN; END IF; SELECT min(v), max(v) INTO lo, hi FROM (SELECT CAST(get_extra((get_children(c))[2]) AS float8) AS v FROM UNNEST(child_pairs) AS c) sub; ELSE RAISE EXCEPTION USING MESSAGE= 'Cannot compute support for aggregation function ' || aggregation_function; END IF; RETURN; END; END IF; RAISE EXCEPTION 'support() is not yet supported for input type %', pg_typeof(input); END $$ LANGUAGE plpgsql PARALLEL SAFE SET search_path=provsql SECURITY DEFINER; CREATE OR REPLACE FUNCTION central_moment( input ANYELEMENT, k integer, prov UUID = gate_one(), method text = NULL, arguments text = NULL) RETURNS DOUBLE PRECISION AS $$ DECLARE mu float8; total float8; i integer; raw_i float8; binom float8; -- iterative binomial coefficient C(k, i) k_double float8; BEGIN IF pg_typeof(input) = 'random_variable'::regtype THEN IF input IS NULL OR k IS NULL THEN RETURN NULL; END IF; -- See variance() above: rv_moment handles the conditional/unconditional -- dispatch internally based on the resolved prov gate type. RETURN provsql.rv_moment( rv_conditioned_target((input::random_variable)::uuid), k, true, rv_conditioned_prov((input::random_variable)::uuid, prov)); END IF; IF pg_typeof(input) = 'agg_token'::regtype THEN IF input IS NULL OR k IS NULL THEN RETURN NULL; END IF; IF k < 0 THEN RAISE EXCEPTION 'central_moment(): k must be non-negative (got %)', k; END IF; IF k = 0 THEN RETURN 1; END IF; IF k = 1 THEN RETURN 0; END IF; mu := agg_raw_moment(agg_conditioned_target(input::agg_token), 1, rv_conditioned_prov(input::uuid, prov), method, arguments); IF mu IS NULL THEN RETURN NULL; END IF; -- mu may be ±Infinity for empty MIN / MAX with positive empty -- probability; central_moment is undefined in that case. IF mu = 'Infinity'::float8 OR mu = '-Infinity'::float8 THEN RETURN mu; END IF; total := 0; binom := 1; -- C(k, 0) k_double := k; FOR i IN 0..k LOOP raw_i := agg_raw_moment(agg_conditioned_target(input::agg_token), i, rv_conditioned_prov(input::uuid, prov), method, arguments); IF raw_i IS NULL THEN RETURN NULL; END IF; total := total + binom * power(-mu, k - i) * raw_i; -- C(k, i+1) = C(k, i) * (k - i) / (i + 1) IF i < k THEN binom := binom * (k_double - i) / (i + 1); END IF; END LOOP; RETURN total; END IF; RAISE EXCEPTION 'central_moment() is not yet supported for input type %', pg_typeof(input); END $$ LANGUAGE plpgsql PARALLEL SAFE SET search_path=provsql SECURITY DEFINER; -- ---------------------------------------------------------------------- -- Natural Boolean-predicate conditioning: "X | (predicate)". Placeholder -- operators whose Boolean operand (a combination of random_variable / -- aggregate comparisons) the planner converts into a condition gate and -- routes to cond / random_variable_cond / agg_token_cond / given. -- ---------------------------------------------------------------------- CREATE OR REPLACE FUNCTION cond_predicate(target UUID, predicate boolean) RETURNS UUID AS $$ BEGIN RAISE EXCEPTION 'uuid | (predicate) must be rewritten by the ProvSQL ' 'planner hook: the right operand must be a Boolean combination of ' 'random_variable / aggregate comparisons (is provsql.active off?)'; END $$ LANGUAGE plpgsql IMMUTABLE STRICT PARALLEL SAFE; CREATE OPERATOR | (LEFTARG=UUID, RIGHTARG=boolean, PROCEDURE=cond_predicate); CREATE OR REPLACE FUNCTION given_predicate(predicate boolean) RETURNS UUID AS $$ BEGIN RAISE EXCEPTION 'prefix | (predicate) must be rewritten by the ProvSQL ' 'planner hook: the operand must be a Boolean combination of ' 'random_variable / aggregate comparisons (is provsql.active off?)'; END $$ LANGUAGE plpgsql IMMUTABLE STRICT PARALLEL SAFE; CREATE OPERATOR | (RIGHTARG=boolean, PROCEDURE=given_predicate); CREATE OR REPLACE FUNCTION agg_token_cond_predicate( a agg_token, predicate boolean) RETURNS agg_token AS $$ BEGIN RAISE EXCEPTION 'agg_token | (predicate) must be rewritten by the ProvSQL ' 'planner hook: the right operand must be a Boolean combination of ' 'aggregate / random_variable comparisons (is provsql.active off?)'; END $$ LANGUAGE plpgsql IMMUTABLE STRICT PARALLEL SAFE; CREATE OPERATOR | ( LEFTARG = agg_token, RIGHTARG = boolean, PROCEDURE = agg_token_cond_predicate ); CREATE OR REPLACE FUNCTION random_variable_cond_predicate( rv random_variable, predicate boolean) RETURNS random_variable AS $$ BEGIN RAISE EXCEPTION 'random_variable | (predicate) must be rewritten by the ' 'ProvSQL planner hook: the right operand must be a Boolean combination ' 'of random_variable comparisons (is provsql.active off?)'; END $$ LANGUAGE plpgsql IMMUTABLE STRICT PARALLEL SAFE; CREATE OPERATOR | ( LEFTARG = random_variable, RIGHTARG = boolean, PROCEDURE = random_variable_cond_predicate ); -- Deterministic indicator gate for an ordinary (regular) comparison inside a -- MIXED conditioning predicate (HAVING-provenance semantics): gate_one when -- the comparison holds on the row, gate_zero otherwise. The planner emits it -- for the regular leaves of "X | (SUM(x) > 5 AND region = 'north')". CREATE OR REPLACE FUNCTION regular_indicator(cond boolean) RETURNS UUID AS $$ SELECT CASE WHEN cond THEN provsql.gate_one() ELSE provsql.gate_zero() END; $$ LANGUAGE sql IMMUTABLE PARALLEL SAFE SET search_path=provsql,pg_temp,public; -- Better error message when a moment is requested over an arithmetic -- combination of aggregates / a conditioning of one (not yet supported). CREATE OR REPLACE FUNCTION agg_raw_moment( token agg_token, k integer, prov UUID = gate_one(), method text = NULL, arguments text = NULL) RETURNS DOUBLE PRECISION AS $$ DECLARE aggregation_function VARCHAR; child_pairs uuid[]; pair_children uuid[]; n integer; i integer; j integer; vals float8[]; toks uuid[]; total float8; total_probability float8; tup integer[]; d integer; prod_v float8; distinct_tok uuid[]; conj_token uuid; prob float8; sign_max float8; BEGIN IF token IS NULL OR k IS NULL THEN RETURN NULL; END IF; IF k < 0 THEN RAISE EXCEPTION 'agg_raw_moment(): k must be non-negative (got %)', k; END IF; IF get_gate_type(token) <> 'agg' THEN IF get_gate_type(token) IN ('arith', 'conditioned') THEN RAISE EXCEPTION 'expected / variance / moment over an arithmetic ' 'combination of aggregates (e.g. SUM(x) + SUM(y) or SUM(x) + 5), or a ' 'conditioning of one, is not yet supported: a moment can be taken only ' 'over a single aggregate (SUM / COUNT / MIN / MAX), optionally ' 'conditioned (SUM(x) | C)' USING HINT = 'Take the moment of each aggregate separately, or condition ' 'the bare aggregate.'; ELSE RAISE EXCEPTION USING MESSAGE='Wrong gate type for agg_raw_moment computation'; END IF; END IF; IF k = 0 THEN RETURN 1; END IF; SELECT pp.proname::varchar FROM pg_proc pp WHERE oid=(get_infos(token)).info1 INTO aggregation_function; child_pairs := get_children(token); n := COALESCE(array_length(child_pairs, 1), 0); IF aggregation_function = 'sum' OR aggregation_function = 'count' THEN -- count(col) keeps the COUNT identity at the gate level but its value is a -- SUM of per-row 0/1 indicators, so its moments are computed exactly like -- SUM (and its empty group is the real value 0, like SUM). count(*) -- arrives here as 'sum' (it normalises to F_SUM_INT4); count(col) as 'count'. -- Trivial empty aggregation: SUM = 0, so SUM^k = 0 for k >= 1. -- Note: agg_token semantics treat the "no row included" world as -- SUM = 0, so this stays consistent with k = 1 (= expected()). IF n = 0 THEN RETURN 0; END IF; -- Extract per-child token + value arrays. vals := ARRAY[]::float8[]; toks := ARRAY[]::uuid[]; FOR i IN 1..n LOOP pair_children := get_children(child_pairs[i]); toks := toks || pair_children[1]; vals := vals || CAST(get_extra(pair_children[2]) AS float8); END LOOP; -- Enumerate all k-tuples (i_1, ..., i_k) in {1..n}^k. tup is the -- current tuple; we step through them in lexicographic order. total := 0; tup := array_fill(1, ARRAY[k]); LOOP prod_v := 1; FOR j IN 1..k LOOP prod_v := prod_v * vals[tup[j]]; END LOOP; SELECT array_agg(DISTINCT toks[idx]) INTO distinct_tok FROM unnest(tup) AS idx; IF prov <> gate_one() THEN distinct_tok := distinct_tok || prov; END IF; conj_token := provenance_times(VARIADIC distinct_tok); prob := probability_evaluate(conj_token, method, arguments); total := total + prod_v * prob; d := k; WHILE d >= 1 AND tup[d] = n LOOP tup[d] := 1; d := d - 1; END LOOP; EXIT WHEN d = 0; tup[d] := tup[d] + 1; END LOOP; ELSIF aggregation_function = 'min' OR aggregation_function = 'max' THEN -- Rank enumeration: per distinct value v, P(MIN = v) is the -- probability that some t_i with v_i=v is true and all t_j with -- smaller v are false. For MAX we negate values so the same -- "smaller-than" rank logic computes MIN-of-negated, then flip. -- The outer multiplier picks up the right sign for the k-th moment -- of MAX: E[MAX^k] = (-1)^k * E[MIN(-v)^k], so sign_max = (-1)^k. sign_max := CASE WHEN aggregation_function = 'max' THEN power(-1::float8, k) ELSE 1 END; -- MIN/MAX over the empty input world are NULL (no elements), not ±Infinity: -- SQL returns one row with a NULL value. The moment is therefore CONDITIONAL -- on the aggregate being defined (non-empty) -- the empty world is excluded -- and the result renormalised by P(prov AND non-empty). (count, whose empty -- value 0 is a real value, keeps the empty world; sum keeps it too, as 0.) IF n = 0 THEN RETURN NULL; -- structurally empty: MIN/MAX undefined END IF; -- Numerator E[MIN^k . 1{prov AND non-empty}] (the rank sum naturally omits -- the empty world, since every term requires a present token). WITH tok_value AS ( SELECT (get_children(c))[1] AS tok, (CASE WHEN aggregation_function='max' THEN -1 ELSE 1 END) * CAST(get_extra((get_children(c))[2]) AS DOUBLE PRECISION) AS v FROM UNNEST(child_pairs) AS c ) SELECT sign_max * COALESCE(SUM(p * power(v, k)), 0) FROM ( SELECT t1.v AS v, probability_evaluate( CASE WHEN prov = gate_one() THEN provenance_monus(provenance_plus(ARRAY_AGG(t1.tok)), provenance_plus(ARRAY_AGG(t2.tok))) ELSE provenance_times(prov, provenance_monus(provenance_plus(ARRAY_AGG(t1.tok)), provenance_plus(ARRAY_AGG(t2.tok)))) END, method, arguments) AS p FROM tok_value t1 LEFT OUTER JOIN tok_value t2 ON t1.v > t2.v GROUP BY t1.v) tmp INTO total; -- Denominator P(prov AND non-empty) = P(prov (x) (+) tokens). SELECT probability_evaluate( CASE WHEN prov = gate_one() THEN provenance_plus(ARRAY_AGG(tok)) ELSE provenance_times(prov, provenance_plus(ARRAY_AGG(tok))) END, method, arguments) FROM (SELECT (get_children(c))[1] AS tok FROM UNNEST(child_pairs) AS c) s INTO total_probability; IF total_probability <= epsilon() THEN RETURN NULL; -- never defined under prov: MIN/MAX undefined END IF; RETURN total / total_probability; -- already conditional; skip generic norm ELSE RAISE EXCEPTION USING MESSAGE= 'Cannot compute moment for aggregation function ' || aggregation_function; END IF; -- Conditional normalisation: E[X^k · 1_A] / P(A) = E[X^k | A]. IF prov <> gate_one() AND total <> 0 AND total <> 'Infinity'::float8 AND total <> '-Infinity'::float8 THEN total := total / probability_evaluate(prov, method, arguments); END IF; RETURN total; END $$ LANGUAGE plpgsql PARALLEL SAFE SET search_path=provsql SECURITY DEFINER; -- ---------------------------------------------------------------------- -- 1.10.0 completion: joint-width / Möbius / denial-constraint surface -- -- The UCQ joint-width and Möbius-inversion exact routes and the prefix -- ! (event negation) operator joined the 1.10.0 SQL surface after the -- conditioning block above. Replicate them here so an upgraded database -- matches a fresh install (mechanically verified by -- make upgrade-parity-test). The cond and plant_reach_any_groups -- bodies are refreshed too: both gained code after they were first -- copied into this script. -- ---------------------------------------------------------------------- -- mobius_or_null below is LANGUAGE sql and tests get_gate_type(...) = -- 'mobius'. A fresh CREATE TYPE makes an enum label usable at once, but -- ALTER TYPE ... ADD VALUE does not until the (upgrade) transaction -- commits, so creating the function in this transaction would raise -- "unsafe use of new enum value". Defer body validation for the rest -- of the transaction; the stored function bodies are unchanged. SET LOCAL check_function_bodies = off; -- Signed Möbius combination gate (measure-only); see the gate-type -- comment in provsql.common.sql. ALTER TYPE provenance_gate ADD VALUE IF NOT EXISTS 'mobius'; CREATE OR REPLACE FUNCTION cond(target UUID, evidence UUID) RETURNS UUID AS $$ DECLARE tgt uuid; ev uuid; jnt uuid; result uuid; ch uuid[]; BEGIN -- P(X | true) = P(X): conditioning on a certain / absent event is inert. IF evidence IS NULL OR evidence = gate_one() THEN RETURN target; END IF; -- A row with no provenance defaults to the certain event 1. tgt := coalesce(target, gate_one()); IF get_gate_type(tgt) = 'conditioned' THEN -- Sequential update (X | A) | B = X | (A ∧ B): fold B into both the -- evidence and the joint of the inner gate so the result stays a single -- gate_conditioned over the ORIGINAL target. ch := get_children(tgt); tgt := ch[1]; -- original target X ev := provenance_times(ch[2], evidence); -- A ∧ B jnt := provenance_times(ch[3], evidence); -- (X ∧ A) ∧ B ELSE ev := evidence; jnt := provenance_times(tgt, evidence); -- X ∧ C END IF; result := public.uuid_generate_v5(uuid_ns_provsql(), concat('conditioned', tgt, ev, jnt)); PERFORM create_gate(result, 'conditioned', ARRAY[tgt, ev, jnt]); RETURN result; END $$ LANGUAGE plpgsql SET search_path=provsql,pg_temp,public SECURITY DEFINER PARALLEL SAFE; CREATE OR REPLACE FUNCTION provenance_not(event UUID) RETURNS UUID AS $$ SELECT provsql.provenance_monus(provsql.gate_one(), event); $$ LANGUAGE sql IMMUTABLE STRICT PARALLEL SAFE SET search_path=provsql,pg_temp,public; CREATE OR REPLACE FUNCTION ucq_joint_compile_stats( IN disjunct_nvars INT[], IN atom_disjunct INT[], IN atom_rel INT[], IN atom_vars INT[], IN atom_arity INT[], IN fact_rel INT[], IN fact_elems INT[], IN fact_arity INT[], IN fact_tokens UUID[], IN fact_probs DOUBLE PRECISION[], OUT probability DOUBLE PRECISION, OUT joint_treewidth INT, OUT data_treewidth_lb INT, OUT circuit_treewidth_lb INT, OUT n_bags BIGINT, OUT max_states BIGINT, OUT dd_size BIGINT, OUT n_enumerating INT) AS 'provsql','ucq_joint_compile_stats' LANGUAGE C IMMUTABLE PARALLEL SAFE; CREATE OR REPLACE FUNCTION ucq_joint_compile_stats( IN query JSONB, IN fact_rel INT[], IN fact_elems INT[], IN fact_arity INT[], IN fact_tokens UUID[], IN fact_probs DOUBLE PRECISION[], OUT probability DOUBLE PRECISION, OUT joint_treewidth INT, OUT data_treewidth_lb INT, OUT circuit_treewidth_lb INT, OUT n_bags BIGINT, OUT max_states BIGINT, OUT dd_size BIGINT, OUT n_enumerating INT) AS $$ DECLARE dnv INT[] := '{}'; adisj INT[] := '{}'; arel INT[] := '{}'; avars INT[] := '{}'; aarity INT[] := '{}'; d JSONB; a JSONB; v TEXT; didx INT := 0; BEGIN FOR d IN SELECT * FROM jsonb_array_elements(query->'disjuncts') LOOP dnv := dnv || (d->>'n_vars')::int; FOR a IN SELECT * FROM jsonb_array_elements(d->'atoms') LOOP adisj := adisj || didx; arel := arel || (a->>'rel')::int; aarity := aarity || jsonb_array_length(a->'vars'); FOR v IN SELECT * FROM jsonb_array_elements_text(a->'vars') LOOP avars := avars || v::int; END LOOP; END LOOP; didx := didx + 1; END LOOP; SELECT s.probability, s.joint_treewidth, s.data_treewidth_lb, s.circuit_treewidth_lb, s.n_bags, s.max_states, s.dd_size, s.n_enumerating INTO probability, joint_treewidth, data_treewidth_lb, circuit_treewidth_lb, n_bags, max_states, dd_size, n_enumerating FROM ucq_joint_compile_stats(dnv, adisj, arel, avars, aarity, fact_rel, fact_elems, fact_arity, fact_tokens, fact_probs) s; END; $$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; CREATE OR REPLACE FUNCTION ucq_joint_compile_stats_tracked( IN disjunct_nvars INT[], IN atom_disjunct INT[], IN atom_rel INT[], IN atom_vars INT[], IN atom_arity INT[], IN fact_rel INT[], IN fact_elems INT[], IN fact_arity INT[], IN fact_tokens UUID[], OUT probability DOUBLE PRECISION, OUT joint_treewidth INT, OUT data_treewidth_lb INT, OUT circuit_treewidth_lb INT, OUT n_bags BIGINT, OUT max_states BIGINT, OUT dd_size BIGINT, OUT n_enumerating INT) AS 'provsql','ucq_joint_compile_stats_tracked' LANGUAGE C STABLE PARALLEL SAFE; CREATE OR REPLACE FUNCTION ucq_joint_compile_stats_tracked( IN query JSONB, IN fact_rel INT[], IN fact_elems INT[], IN fact_arity INT[], IN fact_tokens UUID[], OUT probability DOUBLE PRECISION, OUT joint_treewidth INT, OUT data_treewidth_lb INT, OUT circuit_treewidth_lb INT, OUT n_bags BIGINT, OUT max_states BIGINT, OUT dd_size BIGINT, OUT n_enumerating INT) AS $$ DECLARE dnv INT[] := '{}'; adisj INT[] := '{}'; arel INT[] := '{}'; avars INT[] := '{}'; aarity INT[] := '{}'; d JSONB; a JSONB; v TEXT; didx INT := 0; BEGIN FOR d IN SELECT * FROM jsonb_array_elements(query->'disjuncts') LOOP dnv := dnv || (d->>'n_vars')::int; FOR a IN SELECT * FROM jsonb_array_elements(d->'atoms') LOOP adisj := adisj || didx; arel := arel || (a->>'rel')::int; aarity := aarity || jsonb_array_length(a->'vars'); FOR v IN SELECT * FROM jsonb_array_elements_text(a->'vars') LOOP avars := avars || v::int; END LOOP; END LOOP; didx := didx + 1; END LOOP; SELECT s.probability, s.joint_treewidth, s.data_treewidth_lb, s.circuit_treewidth_lb, s.n_bags, s.max_states, s.dd_size, s.n_enumerating INTO probability, joint_treewidth, data_treewidth_lb, circuit_treewidth_lb, n_bags, max_states, dd_size, n_enumerating FROM ucq_joint_compile_stats_tracked(dnv, adisj, arel, avars, aarity, fact_rel, fact_elems, fact_arity, fact_tokens) s; END; $$ LANGUAGE plpgsql STABLE PARALLEL SAFE; CREATE OR REPLACE FUNCTION ucq_joint_materialize_tracked( disjunct_nvars INT[], atom_disjunct INT[], atom_rel INT[], atom_vars INT[], atom_arity INT[], fact_rel INT[], fact_elems INT[], fact_arity INT[], fact_tokens UUID[]) RETURNS UUID AS 'provsql','ucq_joint_materialize_tracked' LANGUAGE C VOLATILE; CREATE OR REPLACE FUNCTION ucq_joint_materialize_tracked( query JSONB, fact_rel INT[], fact_elems INT[], fact_arity INT[], fact_tokens UUID[]) RETURNS UUID AS $$ DECLARE dnv INT[] := '{}'; adisj INT[] := '{}'; arel INT[] := '{}'; avars INT[] := '{}'; aarity INT[] := '{}'; d JSONB; a JSONB; v TEXT; didx INT := 0; BEGIN FOR d IN SELECT * FROM jsonb_array_elements(query->'disjuncts') LOOP dnv := dnv || (d->>'n_vars')::int; FOR a IN SELECT * FROM jsonb_array_elements(d->'atoms') LOOP adisj := adisj || didx; arel := arel || (a->>'rel')::int; aarity := aarity || jsonb_array_length(a->'vars'); FOR v IN SELECT * FROM jsonb_array_elements_text(a->'vars') LOOP avars := avars || v::int; END LOOP; END LOOP; didx := didx + 1; END LOOP; RETURN ucq_joint_materialize_tracked(dnv, adisj, arel, avars, aarity, fact_rel, fact_elems, fact_arity, fact_tokens); END; $$ LANGUAGE plpgsql VOLATILE; CREATE OR REPLACE FUNCTION ucq_joint_provenance( descriptor JSONB, fallback UUID DEFAULT NULL) RETURNS UUID AS $$ DECLARE legs text; sql text; saved text; fact_rel int[]; fact_elems int[]; fact_arity int[]; fact_tokens uuid[]; dnv int[]:='{}'; adisj int[]:='{}'; arel int[]:='{}'; avars int[]:='{}'; aarity int[]:='{}'; d jsonb; a jsonb; v text; didx int:=0; BEGIN -- Parse the UCQ structure into the columnar query arrays. FOR d IN SELECT * FROM jsonb_array_elements(descriptor->'disjuncts') LOOP dnv := dnv || (d->>'n_vars')::int; FOR a IN SELECT * FROM jsonb_array_elements(d->'atoms') LOOP adisj := adisj || didx; arel := arel || (a->>'rel')::int; aarity := aarity || jsonb_array_length(a->'vars'); FOR v IN SELECT * FROM jsonb_array_elements_text(a->'vars') LOOP avars := avars || v::int; END LOOP; END LOOP; didx := didx + 1; END LOOP; -- One UNION ALL leg per relation: (relation index, text element array, -- provenance token). No temp tables: a single gather query, with the -- value-based dense element dictionary built inline. SELECT string_agg( format('SELECT %s, ARRAY[%s]::text[], provsql FROM %s%s', rn - 1, (SELECT string_agg(format('(%I)::text', c), ',') FROM jsonb_array_elements_text(descriptor->'elem_cols'->(rn-1)::int) c), rel, -- the lifted single-relation selection (a pre-filter), already -- deparsed to SQL by the recogniser; '' / absent = unfiltered. CASE WHEN coalesce(descriptor->'rel_where'->>(rn-1)::int,'') <> '' THEN ' WHERE '||(descriptor->'rel_where'->>(rn-1)::int) ELSE '' END), ' UNION ALL ') INTO legs FROM jsonb_array_elements_text(descriptor->'relations') WITH ORDINALITY t(rel, rn); sql := format($q$ WITH facts(rel,elems,tok) AS (%s), ord AS (SELECT row_number() OVER () AS ord, rel, elems, tok FROM facts), dict AS (SELECT val, (dense_rank() OVER (ORDER BY val))-1 AS id FROM (SELECT DISTINCT unnest(elems) AS val FROM facts) u) SELECT (SELECT array_agg(rel ORDER BY ord) FROM ord), (SELECT array_agg(cardinality(elems) ORDER BY ord) FROM ord), (SELECT array_agg(tok ORDER BY ord) FROM ord), (SELECT array_agg(dd.id ORDER BY o.ord, e.k) FROM ord o, LATERAL unnest(o.elems) WITH ORDINALITY e(val,k) JOIN dict dd ON dd.val = e.val) $q$, legs); -- Read the raw rows with provenance rewriting disabled (we only read -- the existing provsql column; this internal gather is not tracked). saved := current_setting('provsql.active', true); PERFORM set_config('provsql.active','off', true); EXECUTE sql INTO fact_rel, fact_arity, fact_tokens, fact_elems; PERFORM set_config('provsql.active', saved, true); RETURN ucq_joint_materialize_tracked(dnv,adisj,arel,avars,aarity, fact_rel,fact_elems,fact_arity,fact_tokens); EXCEPTION WHEN OTHERS THEN -- The joint-width compiler declined (unsupported gate type, joint -- width too large, ...): fall back to the normal provenance so the -- query never fails. Both give the same probability. RETURN fallback; END; $$ LANGUAGE plpgsql VOLATILE; CREATE OR REPLACE FUNCTION ucq_mobius_materialize_tracked( disjunct_nvars INT[], atom_disjunct INT[], atom_rel INT[], atom_vars INT[], atom_arity INT[], fact_rel INT[], fact_elems INT[], fact_arity INT[], fact_tokens UUID[], lineage UUID DEFAULT NULL) RETURNS UUID AS 'provsql','ucq_mobius_materialize_tracked' LANGUAGE C VOLATILE; CREATE OR REPLACE FUNCTION ucq_mobius_compile_stats( IN disjunct_nvars INT[], IN atom_disjunct INT[], IN atom_rel INT[], IN atom_vars INT[], IN atom_arity INT[], IN fact_rel INT[], IN fact_elems INT[], IN fact_arity INT[], IN fact_tokens UUID[], OUT probability DOUBLE PRECISION, OUT n_components INT, OUT n_cnf_conjuncts INT, OUT lattice_size INT, OUT n_nonzero INT, OUT n_cancelled INT, OUT cancelled_hard BOOLEAN, OUT dd_size BIGINT, OUT memo_hits BIGINT) AS 'provsql','ucq_mobius_compile_stats' LANGUAGE C VOLATILE; CREATE OR REPLACE FUNCTION mobius_or_null(tok UUID) RETURNS UUID AS $$ SELECT CASE WHEN tok IS NOT NULL AND provsql.get_gate_type(tok) = 'mobius' THEN tok END $$ LANGUAGE sql STABLE; CREATE OR REPLACE FUNCTION ucq_mobius_provenance( descriptor JSONB, fallback UUID DEFAULT NULL) RETURNS UUID AS $$ DECLARE legs text; sql text; saved text; fact_rel int[]; fact_elems int[]; fact_arity int[]; fact_tokens uuid[]; dnv int[]:='{}'; adisj int[]:='{}'; arel int[]:='{}'; avars int[]:='{}'; aarity int[]:='{}'; d jsonb; a jsonb; v text; didx int:=0; BEGIN FOR d IN SELECT * FROM jsonb_array_elements(descriptor->'disjuncts') LOOP dnv := dnv || (d->>'n_vars')::int; FOR a IN SELECT * FROM jsonb_array_elements(d->'atoms') LOOP adisj := adisj || didx; arel := arel || (a->>'rel')::int; aarity := aarity || jsonb_array_length(a->'vars'); FOR v IN SELECT * FROM jsonb_array_elements_text(a->'vars') LOOP avars := avars || v::int; END LOOP; END LOOP; didx := didx + 1; END LOOP; SELECT string_agg( format('SELECT %s, ARRAY[%s]::text[], provsql FROM %s%s', rn - 1, (SELECT string_agg(format('(%I)::text', c), ',') FROM jsonb_array_elements_text(descriptor->'elem_cols'->(rn-1)::int) c), rel, CASE WHEN coalesce(descriptor->'rel_where'->>(rn-1)::int,'') <> '' THEN ' WHERE '||(descriptor->'rel_where'->>(rn-1)::int) ELSE '' END), ' UNION ALL ') INTO legs FROM jsonb_array_elements_text(descriptor->'relations') WITH ORDINALITY t(rel, rn); sql := format($q$ WITH facts(rel,elems,tok) AS (%s), ord AS (SELECT row_number() OVER () AS ord, rel, elems, tok FROM facts), dict AS (SELECT val, (dense_rank() OVER (ORDER BY val))-1 AS id FROM (SELECT DISTINCT unnest(elems) AS val FROM facts) u) SELECT (SELECT array_agg(rel ORDER BY ord) FROM ord), (SELECT array_agg(cardinality(elems) ORDER BY ord) FROM ord), (SELECT array_agg(tok ORDER BY ord) FROM ord), (SELECT array_agg(dd.id ORDER BY o.ord, e.k) FROM ord o, LATERAL unnest(o.elems) WITH ORDINALITY e(val,k) JOIN dict dd ON dd.val = e.val) $q$, legs); saved := current_setting('provsql.active', true); PERFORM set_config('provsql.active','off', true); EXECUTE sql INTO fact_rel, fact_arity, fact_tokens, fact_elems; PERFORM set_config('provsql.active', saved, true); -- Pass the normal-provenance fallback as the lineage: it is carried on the -- gate_mobius so the token still answers Shapley / semiring / PROV on the -- literal lineage (the Möbius combination is a probability-only shortcut). RETURN ucq_mobius_materialize_tracked(dnv,adisj,arel,avars,aarity, fact_rel,fact_elems,fact_arity,fact_tokens, fallback); EXCEPTION WHEN OTHERS THEN RETURN fallback; END; $$ LANGUAGE plpgsql VOLATILE; CREATE OR REPLACE FUNCTION mobius_compile_stats( IN descriptor JSONB, OUT probability DOUBLE PRECISION, OUT n_components INT, OUT n_cnf_conjuncts INT, OUT lattice_size INT, OUT n_nonzero INT, OUT n_cancelled INT, OUT cancelled_hard BOOLEAN, OUT dd_size BIGINT, OUT memo_hits BIGINT) RETURNS RECORD AS $$ DECLARE legs text; sql text; saved text; fact_rel int[]; fact_elems int[]; fact_arity int[]; fact_tokens uuid[]; dnv int[]:='{}'; adisj int[]:='{}'; arel int[]:='{}'; avars int[]:='{}'; aarity int[]:='{}'; d jsonb; a jsonb; v text; didx int:=0; BEGIN FOR d IN SELECT * FROM jsonb_array_elements(descriptor->'disjuncts') LOOP dnv := dnv || (d->>'n_vars')::int; FOR a IN SELECT * FROM jsonb_array_elements(d->'atoms') LOOP adisj := adisj || didx; arel := arel || (a->>'rel')::int; aarity := aarity || jsonb_array_length(a->'vars'); FOR v IN SELECT * FROM jsonb_array_elements_text(a->'vars') LOOP avars := avars || v::int; END LOOP; END LOOP; didx := didx + 1; END LOOP; SELECT string_agg( format('SELECT %s, ARRAY[%s]::text[], provsql FROM %s%s', rn - 1, (SELECT string_agg(format('(%I)::text', c), ',') FROM jsonb_array_elements_text(descriptor->'elem_cols'->(rn-1)::int) c), rel, CASE WHEN coalesce(descriptor->'rel_where'->>(rn-1)::int,'') <> '' THEN ' WHERE '||(descriptor->'rel_where'->>(rn-1)::int) ELSE '' END), ' UNION ALL ') INTO legs FROM jsonb_array_elements_text(descriptor->'relations') WITH ORDINALITY t(rel, rn); sql := format($q$ WITH facts(rel,elems,tok) AS (%s), ord AS (SELECT row_number() OVER () AS ord, rel, elems, tok FROM facts), dict AS (SELECT val, (dense_rank() OVER (ORDER BY val))-1 AS id FROM (SELECT DISTINCT unnest(elems) AS val FROM facts) u) SELECT (SELECT array_agg(rel ORDER BY ord) FROM ord), (SELECT array_agg(cardinality(elems) ORDER BY ord) FROM ord), (SELECT array_agg(tok ORDER BY ord) FROM ord), (SELECT array_agg(dd.id ORDER BY o.ord, e.k) FROM ord o, LATERAL unnest(o.elems) WITH ORDINALITY e(val,k) JOIN dict dd ON dd.val = e.val) $q$, legs); saved := current_setting('provsql.active', true); PERFORM set_config('provsql.active','off', true); EXECUTE sql INTO fact_rel, fact_arity, fact_tokens, fact_elems; PERFORM set_config('provsql.active', saved, true); SELECT s.probability, s.n_components, s.n_cnf_conjuncts, s.lattice_size, s.n_nonzero, s.n_cancelled, s.cancelled_hard, s.dd_size, s.memo_hits INTO probability, n_components, n_cnf_conjuncts, lattice_size, n_nonzero, n_cancelled, cancelled_hard, dd_size, memo_hits FROM ucq_mobius_compile_stats(dnv,adisj,arel,avars,aarity, fact_rel,fact_elems,fact_arity,fact_tokens) s; END; $$ LANGUAGE plpgsql VOLATILE; CREATE OR REPLACE FUNCTION ucq_joint_gather( descriptor JSONB, OUT disjunct_nvars INT[], OUT atom_disjunct INT[], OUT atom_rel INT[], OUT atom_vars INT[], OUT atom_arity INT[], OUT fact_rel INT[], OUT fact_elems INT[], OUT fact_arity INT[], OUT fact_tokens UUID[], OUT val_by_id TEXT[]) AS $$ DECLARE legs text; sql text; saved text; d jsonb; a jsonb; v text; didx int := 0; BEGIN disjunct_nvars:='{}'; atom_disjunct:='{}'; atom_rel:='{}'; atom_vars:='{}'; atom_arity:='{}'; FOR d IN SELECT * FROM jsonb_array_elements(descriptor->'disjuncts') LOOP disjunct_nvars := disjunct_nvars || (d->>'n_vars')::int; FOR a IN SELECT * FROM jsonb_array_elements(d->'atoms') LOOP atom_disjunct := atom_disjunct || didx; atom_rel := atom_rel || (a->>'rel')::int; atom_arity := atom_arity || jsonb_array_length(a->'vars'); FOR v IN SELECT * FROM jsonb_array_elements_text(a->'vars') LOOP atom_vars := atom_vars || v::int; END LOOP; END LOOP; didx := didx + 1; END LOOP; SELECT string_agg( format('SELECT %s, ARRAY[%s]::text[], provsql FROM %s%s', rn - 1, (SELECT string_agg(format('(%I)::text', c), ',') FROM jsonb_array_elements_text(descriptor->'elem_cols'->(rn-1)::int) c), rel, CASE WHEN coalesce(descriptor->'rel_where'->>(rn-1)::int,'') <> '' THEN ' WHERE '||(descriptor->'rel_where'->>(rn-1)::int) ELSE '' END), ' UNION ALL ') INTO legs FROM jsonb_array_elements_text(descriptor->'relations') WITH ORDINALITY t(rel, rn); sql := format($q$ WITH facts(rel,elems,tok) AS (%s), ord AS (SELECT row_number() OVER () AS ord, rel, elems, tok FROM facts), dict AS (SELECT val, (dense_rank() OVER (ORDER BY val))-1 AS id FROM (SELECT DISTINCT unnest(elems) AS val FROM facts) u) SELECT (SELECT array_agg(rel ORDER BY ord) FROM ord), (SELECT array_agg(cardinality(elems) ORDER BY ord) FROM ord), (SELECT array_agg(tok ORDER BY ord) FROM ord), (SELECT array_agg(dd.id ORDER BY o.ord, e.k) FROM ord o, LATERAL unnest(o.elems) WITH ORDINALITY e(val,k) JOIN dict dd ON dd.val = e.val), (SELECT array_agg(val ORDER BY id) FROM dict) $q$, legs); saved := current_setting('provsql.active', true); PERFORM set_config('provsql.active','off', true); EXECUTE sql INTO fact_rel, fact_arity, fact_tokens, fact_elems, val_by_id; PERFORM set_config('provsql.active', saved, true); END; $$ LANGUAGE plpgsql VOLATILE; CREATE OR REPLACE FUNCTION ucq_joint_provenance_answer( descriptor JSONB, head_vars INT[], head_vals TEXT[], fallback UUID DEFAULT NULL) RETURNS UUID AS 'provsql','ucq_joint_provenance_answer' LANGUAGE C STABLE; CREATE OR REPLACE FUNCTION ucq_mobius_provenance_answer( descriptor JSONB, head_vars INT[], head_vals TEXT[], fallback UUID DEFAULT NULL) RETURNS UUID AS 'provsql','ucq_mobius_provenance_answer' LANGUAGE C STABLE; CREATE OR REPLACE FUNCTION plant_reach_any_groups( work_name text, node_attribute text, member_rel regclass, member_attribute text, group_attribute text, edge_rel regclass, source_attribute text, destination_attribute text, source_value text, directed boolean, edge_quals text DEFAULT NULL, source_rel regclass DEFAULT NULL, source_rel_attribute text DEFAULT NULL, edge_sql text DEFAULT NULL, member_quals text DEFAULT NULL) RETURNS void AS $$ DECLARE e record; grp record; m record; sv text[]; st uuid[]; sp double precision[]; gids int[] := ARRAY[]::int[]; mids int[] := ARRAY[]::int[]; vid int; canonical uuid; verbosity int := coalesce(current_setting('provsql.verbose_level', true)::int, 0); BEGIN BEGIN -- A tracked member relation would make the aggregated tokens -- per-row products, not the bare reach tokens: nothing to plant. IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = member_rel AND attname = 'provsql' AND atttypid = 'uuid'::regtype AND NOT attisdropped) THEN RETURN; END IF; IF source_rel IS NOT NULL THEN SELECT g.source_values, g.source_tokens, g.source_probabilities INTO sv, st, sp FROM provsql.gather_reachability_sources(source_rel, source_rel_attribute) g; IF sv IS NULL THEN sv := ARRAY[]::text[]; st := ARRAY[]::uuid[]; sp := ARRAY[]::float8[]; END IF; ELSE sv := ARRAY[source_value]; st := ARRAY['00000000-0000-0000-0000-000000000000'::uuid]; sp := ARRAY[1.0::float8]; END IF; e := provsql.gather_reachability_edges(edge_rel, source_attribute, destination_attribute, sv, edge_quals, edge_sql); -- The groups, replicating the user's join semantics: per group, the -- member vertices and the multiset of their reach tokens (with the -- multiplicity the join produces). Single-member groups need no -- planting (provenance_plus passes a single token through). -- Two steps: materialise the joined rows with their per-row tokens -- (tracked CTAS, then strip the automatic provsql column), and only -- then aggregate the now-plain table -- aggregating provenance() -- inside a grouped tracked query would be rewritten as a -- provenance-aware aggregation, which is not what the planting -- needs. DROP TABLE IF EXISTS provsql_reach_any_flat_tmp; EXECUTE format( 'CREATE TEMP TABLE provsql_reach_any_flat_tmp AS ' || 'SELECT w.%1$I::text AS node_val, provsql.provenance() AS tok, ' || ' t.%5$I AS grp_key ' || 'FROM %2$I w JOIN %3$s t ON w.%1$I = t.%4$I' -- The member-relation filter restricts which members participate -- (deparsed table-qualified as t.column); the working table side -- carries no provenance distinction here. || coalesce(' WHERE ' || member_quals, ''), node_attribute, work_name, member_rel::text, member_attribute, group_attribute); PERFORM provsql.remove_provenance('provsql_reach_any_flat_tmp'); DROP TABLE IF EXISTS provsql_reach_any_groups_tmp; CREATE TEMP TABLE provsql_reach_any_groups_tmp AS SELECT (row_number() OVER ())::int AS gid, members, toks FROM ( SELECT array_agg(node_val) AS members, array_agg(tok) AS toks FROM provsql_reach_any_flat_tmp GROUP BY grp_key HAVING count(*) >= 2) g; DROP TABLE provsql_reach_any_flat_tmp; FOR grp IN SELECT gid, members FROM provsql_reach_any_groups_tmp LOOP FOR m IN SELECT DISTINCT unnest(grp.members) AS val LOOP vid := array_position(e.vertices, m.val); IF vid IS NOT NULL THEN gids := gids || grp.gid; mids := mids || vid; END IF; END LOOP; END LOOP; IF cardinality(gids) = 0 THEN DROP TABLE provsql_reach_any_groups_tmp; RETURN; END IF; FOR grp IN SELECT a.group_id, a.token AS any_token, t.toks FROM provsql.reachability_materialize_any( e.sources, e.destinations, e.tokens, e.probabilities, e.block_keys, e.block_indices, e.extra_ids, st, sp, directed, gids, mids) a JOIN provsql_reach_any_groups_tmp t ON t.gid = a.group_id LOOP canonical := public.uuid_generate_v5( provsql.uuid_ns_provsql(), concat('plus-canonical', (SELECT array_agg(tok ORDER BY tok) FROM unnest(grp.toks) tok))); PERFORM provsql.create_gate(canonical, 'plus', ARRAY[grp.any_token]); PERFORM provsql.set_infos(canonical, 1); END LOOP; DROP TABLE provsql_reach_any_groups_tmp; IF verbosity >= 20 THEN -- Lift the function-level client_min_messages = warning for the -- one RAISE; the function-level SET restores the caller's value. PERFORM set_config('client_min_messages', 'notice', true); RAISE NOTICE 'ProvSQL: certified any-member gates planted for the aggregation of "%" by %.%', work_name, member_rel, group_attribute; PERFORM set_config('client_min_messages', 'warning', true); END IF; EXCEPTION WHEN OTHERS THEN IF verbosity >= 10 THEN PERFORM set_config('client_min_messages', 'notice', true); RAISE NOTICE 'ProvSQL: any-member planting for "%" skipped (%)', work_name, SQLERRM; PERFORM set_config('client_min_messages', 'warning', true); END IF; END; END -- No SET search_path: the deparsed edge subquery must resolve against -- the caller's path; ProvSQL internals are schema-qualified. $$ LANGUAGE plpgsql SET client_min_messages = warning; -- Prefix ! : event negation (provenance_not). Guarded so the script -- stays idempotent; the oprcode <> 0 test rejects a shell operator. DO $do$ BEGIN IF NOT EXISTS ( SELECT 1 FROM pg_operator WHERE oprname = '!' AND oprnamespace = 'provsql'::regnamespace AND oprleft = 0 AND oprright = 'uuid'::regtype AND oprcode <> 0 ) THEN CREATE OPERATOR ! (RIGHTARG=UUID, PROCEDURE=provenance_not); END IF; END $do$; -- This upgrade appended values to the provenance_gate enum ('conditioned' -- above, 'mobius' here): force a fresh constant-OID lookup so a backend -- warmed under the previous version can create gates of the new types. SELECT reset_constants_cache();