-- Narugn, a lightweight distributed computer -- Copyright (C) 2012, 2013 Gianni Ciolli -- -- This program is free software: you can redistribute it and/or modify -- it under the terms of the GNU General Public License as published by -- the Free Software Foundation, either version 3 of the License, or -- (at your option) any later version. -- -- This program is distributed in the hope that it will be useful, -- but WITHOUT ANY WARRANTY; without even the implied warranty of -- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -- GNU General Public License for more details. -- -- You should have received a copy of the GNU General Public License -- along with this program. If not, see . ------------------------------------------------------------------------ -- complain if script is sourced in psql, rather than via CREATE EXTENSION \echo Use "CREATE EXTENSION narugn" to load this file. \quit -- -- Settings -- CREATE TABLE settings ( tag text PRIMARY KEY , value text NOT NULL ); CREATE FUNCTION set_setting ( i_tag text , i_value text ) RETURNS void LANGUAGE plpgsql AS $BODY$ BEGIN INSERT INTO settings (tag,value) VALUES (i_tag, i_value); EXCEPTION WHEN unique_violation THEN UPDATE settings SET value = i_value WHERE tag = i_tag; END; $BODY$; CREATE FUNCTION maybe_get_setting ( i_tag text ) RETURNS text LANGUAGE plpgsql AS $BODY$ DECLARE o_value text; BEGIN SELECT value INTO o_value FROM settings WHERE tag = i_tag; RETURN o_value; END; $BODY$; -- -- The local polygon -- CREATE TABLE local_server ( id text , full_name text , shape polygon , connstr text , local_connstr text ); -- -- Locally available locations -- CREATE TYPE cds AS (x int, y int); CREATE TYPE cdt AS (x int, y int, t timestamp with time zone); CREATE VIEW local_cells AS WITH d(datname) AS ( SELECT datname FROM pg_database WHERE datname ~ '^narugn_cell_[0-9]+_[0-9]+$' ), c(datname,matches) AS ( SELECT datname , regexp_matches(datname, '^narugn_cell_([0-9]+)_([0-9]+)$') AS matches FROM d ) SELECT datname, ROW(matches[1], matches[2])::cds FROM c; -- -- dbname <--> coordinates -- CREATE FUNCTION global2dbname (i_c IN cds ) RETURNS text LANGUAGE SQL STRICT AS $BODY$ SELECT 'narugn_cell_' || ($1).x::text || '_' || ($1).y::text ; $BODY$; CREATE FUNCTION dbname2global (i_dbname IN text ,o OUT cds ) LANGUAGE plpgsql STRICT AS $BODY$ DECLARE a text[]; BEGIN a := regexp_matches(i_dbname, $_$^narugn_cell_(m?[0-9]+)_(m?[0-9]+)$$_$); IF a IS NOT NULL THEN o.x := a[1]; o.y := a[2]; END IF; END; $BODY$; CREATE FUNCTION dist2 ( c1 cds , c2 cds ) RETURNS int LANGUAGE SQL AS $BODY$ SELECT ($1.x - $2.x) * ($1.x - $2.x) + ($1.y - $2.y) * ($1.y - $2.y) $BODY$; CREATE FUNCTION locations(polygon) RETURNS SETOF cds LANGUAGE plpgsql AS $BODY$ DECLARE b box; BEGIN b := box($1); RETURN QUERY SELECT x.x, y.y FROM generate_series(ceil((b[1])[0])::int, floor((b[0])[0])::int) x(x) , generate_series(ceil((b[1])[1])::int, floor((b[0])[1])::int) y(y) WHERE point(x.x,y.y) <@ $1; END; $BODY$; CREATE FUNCTION local_locations() RETURNS SETOF cds LANGUAGE plpgsql AS $BODY$ DECLARE lp polygon; BEGIN SELECT shape INTO STRICT lp FROM local_server; RETURN QUERY SELECT * FROM locations(lp); END; $BODY$; -- -- Narugn version -- CREATE FUNCTION code_version() RETURNS text LANGUAGE SQL AS $$ SELECT '0.2.0' :: text $$; CREATE FUNCTION cds_timestamptz_to_cdt ( i_c IN cds , i_t IN timestamp with time zone DEFAULT clock_timestamp() , o OUT cdt ) LANGUAGE plpgsql AS $BODY$ BEGIN o.x := i_c.x; o.y := i_c.y; o.t := i_t; END; $BODY$; CREATE OPERATOR @ ( PROCEDURE = cds_timestamptz_to_cdt, LEFTARG = cds, RIGHTARG = timestamptz ); CREATE FUNCTION cds_to_cdt ( i IN cds , o OUT cdt ) LANGUAGE plpgsql AS $BODY$ BEGIN o.x := i.x; o.y := i.y; o.t := clock_timestamp(); END; $BODY$; CREATE FUNCTION cdt_to_cds ( i IN cdt , o OUT cds ) LANGUAGE plpgsql AS $BODY$ BEGIN o.x := i.x; o.y := i.y; END; $BODY$; CREATE CAST (cds AS cdt) WITH FUNCTION cds_to_cdt(cds); CREATE CAST (cdt AS cds) WITH FUNCTION cdt_to_cds(cdt); ------------------------------------------------------------ -- Network topology ------------------------------------------------------------ CREATE TABLE remote_servers ( id text PRIMARY KEY , full_name text NOT NULL , shape polygon NOT NULL , connstr text NOT NULL , local_connstr text NOT NULL , EXCLUDE USING gist (shape WITH &&) ); -- -- Initially this view is set to the default PostgreSQL -- connection. The view will have to be replaced if a different -- connection is needed, for instance if PGBouncer is used. -- CREATE VIEW all_servers AS SELECT true AS is_local, s.* FROM local_server s UNION ALL SELECT false AS is_local, s.* FROM remote_servers s; -- -- This function computes the pair (X,Y) of global coordinates for the -- current Narugn cell. -- CREATE FUNCTION global_coordinates (o OUT cds) LANGUAGE SQL AS $BODY$ SELECT dbname2global(current_database()) $BODY$ SET search_path TO narugn, public; -- -- This function returns the list of neighbours for a given Narugn -- cell. -- CREATE FUNCTION neighbours ( i_c IN cds DEFAULT global_coordinates() , c OUT cds , id OUT text , shape OUT polygon , connstr OUT text , local_connstr OUT text ) RETURNS SETOF RECORD LANGUAGE SQL AS $BODY$ WITH nbd(ord,x,y) AS ( VALUES (1, $1.x + 1 , $1.y) , (2, $1.x , $1.y + 1) , (3, $1.x - 1 , $1.y) , (4, $1.x , $1.y - 1) ) SELECT c, id, shape_text :: polygon, connstr, local_connstr FROM ( SELECT DISTINCT ROW(nbd.x, nbd.y)::cds AS c , s.id , s.shape :: text AS shape_text , s.connstr , s.local_connstr , nbd.ord FROM nbd LEFT JOIN all_servers s ON point(nbd.x,nbd.y) <@ s.shape ) s1 ORDER BY ord $BODY$ SET search_path TO narugn, public; CREATE FUNCTION probe_by_attempting_a_connection(int) RETURNS text LANGUAGE plproxy AS $BODY$ CLUSTER 'narugn'; SELECT narugn.code_version(); RUN ON $1; $BODY$ SET search_path TO narugn, public; CREATE FUNCTION probe_by_checking_dbname ( connstr text , dbname text ) RETURNS bigint LANGUAGE plproxy AS $BODY$ CONNECT $1; SELECT count(1) FROM pg_database where datname = $2; $BODY$ SET search_path TO narugn, public; CREATE FUNCTION probe_neighbours() RETURNS text[] LANGUAGE plpgsql AS $BODY$ DECLARE a int[] := '{}'; v_neighbours text[] := '{}'; v_res bigint; v_x text; x RECORD; BEGIN -- -- (1) Probe each neighbour -- FOR x IN SELECT * FROM neighbours_conninfo() ORDER BY i LOOP BEGIN WITH opts AS ( SELECT unnest(srvoptions) AS opt FROM pg_foreign_server WHERE srvname = 'narugn' ) SELECT opt INTO STRICT v_x FROM opts WHERE opt ~ ('^p' || x.i::text || '='); IF v_x ~ ('^p' || x.i::text || '=hostaddr=none') THEN v_res := -1; ELSE v_res := probe_by_checking_dbname(x.connstr || ' dbname=postgres user=narugn', x.dbname); IF v_res = 1 THEN PERFORM probe_by_attempting_a_connection(x.i); END IF; END IF; CASE v_res WHEN 1 THEN a := a || x.i; v_neighbours := v_neighbours || text '.ok.'; WHEN 0 THEN v_neighbours := v_neighbours || text 'miss'; WHEN -1 THEN v_neighbours := v_neighbours || text 'skip'; END CASE; EXCEPTION WHEN OTHERS THEN RAISE WARNING 'FAIL CONNECT % --> #% : SQLSTATE = %, SQLERRM = %', dbname2global(current_database()), x.i, SQLSTATE, SQLERRM; v_neighbours := v_neighbours || text 'fail'; END; END LOOP; -- -- (2) Update partition function -- IF a = '{}' THEN v_x := $_$ CREATE OR REPLACE FUNCTION partition_func() RETURNS SETOF int4 LANGUAGE plpgsql AS $__$ BEGIN END; $__$; $_$; ELSE v_x := $_$ CREATE OR REPLACE FUNCTION partition_func() RETURNS SETOF int4 LANGUAGE SQL AS $__$ VALUES ($_$ || array_to_string(a, '), (') || $_$) $__$; $_$; END IF; EXECUTE v_x; RETURN v_neighbours; END; $BODY$ SET search_path TO narugn, public; CREATE FUNCTION partition_func() RETURNS SETOF int4 LANGUAGE plpgsql AS $__$ BEGIN END; $__$; ------------------------------------------------------------ -- Distributed execution ------------------------------------------------------------ CREATE SEQUENCE horizon; CREATE FUNCTION hash ( origin cds , cell cds , t bigint ) RETURNS int LANGUAGE plpgSQL AS $BODY$ DECLARE o int; BEGIN o := ( ((($1).x :: bit(6) :: int :: bit(32)) ) | ((($1).y :: bit(6) :: int :: bit(32)) << 6) | ((($2).x :: bit(6) :: int :: bit(32)) << 12) | ((($2).y :: bit(6) :: int :: bit(32)) << 18) | (($3 :: bit(8) :: int :: bit(32)) << 24) ) :: int; RETURN o; END; $BODY$; CREATE FUNCTION display_cct ( this_cell cds , origin_cell cds , t bigint ) RETURNS text LANGUAGE SQL AS $$ SELECT '@' || $3 :: text || ': ' || $2 :: text || ' -> ' || $1 :: text $$; -- -- Locks -- CREATE TABLE locks ( this_cell cds , origin_cell cds , t bigint , UNIQUE (this_cell, origin_cell, t) ); CREATE FUNCTION reset_durable_locks() RETURNS void LANGUAGE plpgsql AS $BODY$ BEGIN TRUNCATE locks; END; $BODY$; CREATE FUNCTION take_durable_lock ( this_cell cds , origin_cell cds , t bigint ) RETURNS void LANGUAGE SQL AS $BODY$ INSERT INTO locks(this_cell, origin_cell, t) VALUES ($1, $2, $3) $BODY$; CREATE FUNCTION is_durably_locked ( this_cell cds , origin_cell cds , t bigint ) RETURNS boolean LANGUAGE plpgsql AS $BODY$ BEGIN PERFORM 1 FROM locks WHERE locks.this_cell = $1 AND locks.origin_cell = $2 AND locks.t = $3; RETURN FOUND; END; $BODY$; CREATE FUNCTION lock_nowait ( this_cell cds , origin_cell cds , t bigint ) RETURNS boolean LANGUAGE plpgsql AS $BODY$ DECLARE v_x text; v_hash bigint; BEGIN v_x := substring(md5(ROW(this_cell,origin_cell,t) :: text), 1, 15); v_hash := (('x' || v_x) :: bit(60)) :: int8; IF pg_try_advisory_xact_lock(v_hash) THEN IF is_durably_locked(this_cell, origin_cell, t) THEN -- resource already locked RETURN false; ELSE -- resource available PERFORM take_durable_lock(this_cell, origin_cell, t); RETURN true; END IF; ELSE RETURN false; END IF; END; $BODY$ SET search_path TO narugn, public; -- -- This function perform an isolated execution of a given function on -- the current cell. For each line in the output, "z" represents an -- integer which specifies the order of the rows within a single cell, -- and "output" is the actual output (each such function is supposed -- to return 'SETOF text'). -- CREATE FUNCTION run_locally ( i_ts IN timestamp with time zone , cell_function IN text , payload IN text[] , origin_tick IN bigint , walked IN cdt[] , z OUT bigint , t OUT timestamp with time zone , output OUT text ) RETURNS SETOF RECORD LANGUAGE plpgsql AS $BODY$ BEGIN -- (1) ensure that schema narugn_logic exists BEGIN SET LOCAL search_path TO narugn_logic, narugn, public; EXCEPTION WHEN invalid_parameter_value THEN CREATE SCHEMA narugn_logic; SET LOCAL search_path TO narugn_logic, narugn, public; PERFORM set_setting('md5(Logic)', md5('')); END; -- (2) execute function RETURN QUERY EXECUTE 'SELECT * FROM cell_' || cell_function || '(payload := $1, walked := $2, origin_tick := $3)' USING payload, walked, origin_tick; END; $BODY$; -- -- "ts" is the timestamp when the originating call was started, and is -- null if and only if this is the originating call -- -- "cell_function" is the name of the cell function being executed; -- -- "payload" is a text field (possibly NULL) carrying a payload which -- will be transferred during the function calls; -- -- "origin_cell" and "origin_tick" are null if and only if the -- function call is the originating one, otherwise they represent the -- space-tick coordinates of the originating call -- -- "walked" is null if and only if the function does not record the -- path (the default is like an "automatic traceroute" fashion). -- -- For each line in the output, "c" represent the global coordinates, -- "z" represents an integer which specifies the order of the rows -- within a single cell, and "output" is the actual output (each such -- function is supposed to return 'SETOF text'). -- CREATE FUNCTION execute_sync_raw ( cell_function IN text , payload IN text[] DEFAULT '{}' , ts IN timestamp with time zone DEFAULT clock_timestamp() , origin_cell IN cds DEFAULT NULL , origin_tick IN bigint DEFAULT NULL , walked IN cdt[] DEFAULT NULL , max_delay IN float DEFAULT 0.2 , c OUT cds , z OUT bigint , t OUT timestamp with time zone , output OUT text ) RETURNS SETOF RECORD LANGUAGE plpgsql AS $BODY$ DECLARE v_this_cell cds; v_walked cdt[]; v_rpfp boolean; BEGIN -- -- (1) allocating origin_tick and/or ts if required -- v_this_cell := global_coordinates(); IF origin_tick IS NULL THEN origin_tick := nextval('horizon'::regclass); origin_cell := v_this_cell; END IF; -- -- (1a) random delay -- PERFORM pg_sleep(random() * max_delay); -- -- (2) lock (origin_cell,origin_tick) @ v_this_cell -- IF NOT lock_nowait(v_this_cell, origin_cell, origin_tick) THEN RETURN; END IF; -- -- (3) run locally -- v_walked := walked || (v_this_cell @ clock_timestamp()); RETURN QUERY SELECT v_this_cell, a.* FROM run_locally ( ts , cell_function , payload , origin_tick , v_walked ) a; -- -- (4) run remotely -- EXECUTE ('SELECT output::boolean FROM cell_' || cell_function || '(payload := $1, rpfp := true, walked := $2, origin_tick := $3)') INTO v_rpfp USING payload, v_walked, origin_tick; IF v_rpfp THEN RETURN QUERY SELECT * FROM run_remotely ( origin_cell , ts , cell_function , payload , v_this_cell , origin_tick , v_walked ); END IF; -- -- Note. the lock taken in (2) must not be unlocked now; we -- need to wait until we are sure it is no longer reachable by -- other distributed commands. -- END; $BODY$ SET search_path TO narugn, public; -- -- Variadic version of execute_sync_raw with absolute timing -- CREATE FUNCTION execute_sync_abs ( cell_function IN text , payload VARIADIC text[] DEFAULT '{}' , c OUT cds , z OUT bigint , t OUT timestamp with time zone , output OUT text ) RETURNS SETOF RECORD LANGUAGE SQL AS $BODY$ SELECT * FROM execute_sync_raw ( cell_function := $1 , payload := $2 ) $BODY$ SET search_path TO narugn, public; -- -- Variadic version of execute_sync_raw with relative timing -- CREATE FUNCTION execute_sync ( cell_function IN text , payload VARIADIC text[] DEFAULT '{}' , c OUT cds , z OUT bigint , dt OUT interval , output OUT text ) RETURNS SETOF RECORD LANGUAGE SQL AS $BODY$ SELECT x.c , x.z , x.t - c.t0 AS dt , x.output FROM execute_sync_raw ( cell_function := $1 , payload := $2 ) x, clock_timestamp() c(t0) $BODY$ SET search_path TO narugn, public; -- -- This function will be invoked by run_remotely. -- CREATE FUNCTION remote_runner ( origin_cell IN cds , ts IN timestamp with time zone , cell_function IN text , payload IN text[] , origin_tick IN bigint , walked IN cdt[] , c OUT cds , z OUT bigint , t OUT timestamp with time zone , output OUT text ) RETURNS SETOF RECORD LANGUAGE plpgsql AS $BODY$ BEGIN RETURN QUERY SELECT * FROM execute_sync_raw ( cell_function , payload , ts , origin_cell , origin_tick , walked ); END; $BODY$ SET search_path TO narugn, public; -- -- This function executes the given (f,x,c,t) on all the neighbours of -- the current cell, invoking "narugn.remote_runner". i_c_orig and -- i_c are respectively the oldest and the youngest ancestors on the -- call stack. -- CREATE FUNCTION run_remotely ( origin_cell IN cds , ts IN timestamp with time zone , cell_function IN text , payload IN text[] , v_this_cell IN cds , origin_tick IN bigint , walked IN cdt[] , c OUT cds , z OUT bigint , t OUT timestamp with time zone , output OUT text ) RETURNS SETOF RECORD LANGUAGE plproxy AS $BODY$ CLUSTER 'narugn'; SELECT * FROM remote_runner ( origin_cell , ts , cell_function , payload , origin_tick , walked ); RUN ON partition_func(); $BODY$ SET search_path TO narugn, public; ------------------------------------------------------------ -- Setup and checks ------------------------------------------------------------ CREATE FUNCTION neighbours_conninfo ( i OUT int , connstr OUT text , dbname OUT text , c OUT cds ) RETURNS SETOF RECORD LANGUAGE plpgsql AS $BODY$ DECLARE v_my_cds cds; v_my_point point; x RECORD; BEGIN v_my_cds := global_coordinates(); v_my_point := point(v_my_cds.x, v_my_cds.y); FOR x IN SELECT row_number() OVER () - 1 AS i, n.* FROM neighbours() n LOOP i := x.i; c := x.c; connstr := CASE -- nessun poligono WHEN x.shape IS NULL THEN 'hostaddr=none' -- poligono locale WHEN v_my_point <@ x.shape THEN x.local_connstr -- poligono remoto ELSE x.connstr END; dbname := global2dbname(x.c); RETURN NEXT; END LOOP; END; $BODY$; CREATE FUNCTION neighbours_connstr() RETURNS text[] LANGUAGE plpgsql AS $BODY$ DECLARE a text[] := '{}'; x RECORD; v_my_cds cds; v_my_point point; BEGIN v_my_cds := global_coordinates(); v_my_point := point(v_my_cds.x, v_my_cds.y); FOR x IN SELECT * FROM neighbours_conninfo() LOOP a := a || ('p' || x.i :: text || ' ''' || x.connstr || ' ' || 'dbname=' || x.dbname || ' application_name=Narugn'''); END LOOP; RETURN a; END; $BODY$; CREATE FUNCTION reconfigure_plproxy() RETURNS VOID LANGUAGE plpgsql AS $BODY$ DECLARE x RECORD; a text[] := '{}'; q text; v_my_connstr text; v_my_local_connstr text; BEGIN -- (1) refresh view all_servers SELECT connstr, local_connstr INTO v_my_connstr, v_my_local_connstr FROM local_server LIMIT 1; q := 'CREATE OR REPLACE VIEW all_servers AS SELECT true AS is_local, s.* FROM local_server s UNION ALL SELECT false AS is_local, s.* FROM remote_servers s'; EXECUTE q; -- (2) refresh connstrs to neighbouring cells a := neighbours_connstr(); -- (3) refresh foreign server PERFORM 1 FROM pg_foreign_server WHERE srvname = 'narugn'; IF NOT FOUND THEN q := ' CREATE SERVER narugn FOREIGN DATA WRAPPER plproxy OPTIONS ( connection_lifetime ''1800'' , ' || array_to_string(a, E'\n\t , ') || ' )'; EXECUTE q; DROP USER MAPPING IF EXISTS FOR narugn SERVER narugn; CREATE USER MAPPING FOR narugn SERVER narugn OPTIONS ( user 'narugn' ); GRANT USAGE ON FOREIGN SERVER narugn TO narugn; ELSE q := ' ALTER SERVER narugn OPTIONS ( SET ' || array_to_string(a, E'\n\t\t , SET ') || ' )'; EXECUTE q; END IF; END; $BODY$; -- -- State management -- CREATE TABLE state_objects ( name text NOT NULL , kind char(1) NOT NULL , CHECK (kind = 't' OR kind = 's') ); CREATE FUNCTION state_object ( i_name text , i_kind char(1) ) RETURNS void LANGUAGE SQL AS $$ INSERT INTO state_objects(name,kind) VALUES ($1,$2) $$ SET search_path = narugn, public; CREATE FUNCTION state_table ( i_name text ) RETURNS void LANGUAGE SQL AS $$ SELECT state_object($1,'t') $$ SET search_path = narugn, public; CREATE FUNCTION state_sequence ( i_name text ) RETURNS void LANGUAGE SQL AS $$ SELECT state_object($1,'s') $$ SET search_path = narugn, public; ------------------------------------------------------------ -- Core cell functions ------------------------------------------------------------ -- -- The "ping" cell function can be run in two modes: -- -- without payload: just a "ping" -- -- with payload: updating the list of known servers (which is provided -- as payload[1]). -- CREATE FUNCTION cell_ping ( payload IN text[] , walked IN cdt[] , origin_tick IN bigint , rpfp IN boolean DEFAULT false , z OUT bigint , t OUT timestamp with time zone , output OUT text ) RETURNS SETOF RECORD LANGUAGE plpgsql AS $BODY$ BEGIN IF rpfp THEN z := 0; output := 'true'; RETURN NEXT; RETURN; END IF; z := 1; t := clock_timestamp(); IF payload = '{}' THEN -- normal ping output := 'OK'; RETURN NEXT; ELSE -- registering servers RETURN QUERY WITH e AS ( SELECT * FROM unnest(payload[1] :: remote_servers[]) ), n AS ( SELECT e.* FROM e LEFT JOIN all_servers rs ON e.shape && rs.shape WHERE rs.id IS NULL ), o AS ( SELECT DISTINCT ON (e.shape :: text) e.* FROM e JOIN all_servers rs ON e.shape && rs.shape ), i AS ( INSERT INTO remote_servers SELECT * FROM n ), n_o(txt) AS ( SELECT 'NEW ' || ROW(n.*) FROM n UNION ALL SELECT 'OLD ' || ROW(o.*) FROM o ) SELECT row_number() OVER () , clock_timestamp() , txt FROM n_o; PERFORM reconfigure_plproxy(); END IF; END; $BODY$ SET search_path TO narugn, public; -- -- This function prints some version numbers. -- CREATE FUNCTION cell_version ( payload IN text[] , walked IN cdt[] , origin_tick IN bigint , rpfp IN boolean DEFAULT false , z OUT bigint , t OUT timestamp with time zone , output OUT text ) RETURNS SETOF RECORD LANGUAGE plpgsql AS $BODY$ BEGIN IF rpfp THEN z := 0; output := 'true'; RETURN NEXT; RETURN; END IF; z := 1; t := clock_timestamp(); output := version(); RETURN NEXT; z := 2; t := clock_timestamp(); output := 'Narugn ' || code_version(); RETURN NEXT; END; $BODY$ SET search_path TO narugn, public; -- -- This function rescans neighbours. -- CREATE FUNCTION cell_rescan ( payload IN text[] , walked IN cdt[] , origin_tick IN bigint , rpfp IN boolean DEFAULT false , z OUT bigint , t OUT timestamp with time zone , output OUT text ) RETURNS SETOF RECORD LANGUAGE plpgsql AS $BODY$ BEGIN IF rpfp THEN z := 0; output := 'true'; RETURN NEXT; RETURN; END IF; z := 1; t := clock_timestamp(); output := probe_neighbours() :: text; RETURN NEXT; END; $BODY$ SET search_path TO narugn, public; -- -- This function updates the cell logic. -- CREATE FUNCTION cell_logic ( payload IN text[] , walked IN cdt[] , origin_tick IN bigint , rpfp IN boolean DEFAULT false , z OUT bigint , t OUT timestamp with time zone , output OUT text ) RETURNS SETOF RECORD LANGUAGE plpgsql AS $BODY$ DECLARE i_md5_new text; i_md5_old text; v_source text; x RECORD; BEGIN IF rpfp THEN z := 0; output := 'true'; RETURN NEXT; RETURN; END IF; z := 0; v_source := convert_from(decode(payload[1], 'base64'), 'utf8'); i_md5_new := md5(v_source) :: text; i_md5_old := maybe_get_setting('md5(Logic)'); IF i_md5_new = i_md5_old THEN z := z + 1; output := 'SAME ' || i_md5_old; t := clock_timestamp(); RETURN NEXT; ELSE DROP SCHEMA IF EXISTS narugn_logic_old CASCADE; BEGIN ALTER SCHEMA narugn_logic RENAME TO narugn_logic_old; EXCEPTION WHEN invalid_schema_name THEN CREATE SCHEMA narugn_logic_old; END; CREATE SCHEMA narugn_logic; SET LOCAL search_path TO narugn_logic, narugn_logic_old, narugn, public; TRUNCATE narugn.state_objects; EXECUTE v_source; FOR x IN SELECT * FROM narugn.state_objects LOOP CASE x.kind WHEN 't' THEN PERFORM 1 FROM information_schema.tables WHERE table_schema = 'narugn_logic_old' AND table_name = x.name; IF FOUND THEN EXECUTE 'INSERT INTO narugn_logic.' || x.name || ' SELECT * FROM narugn_logic_old.' || x.name; ELSE RAISE NOTICE 'state table narugn_logic.% not found', x.name; END IF; WHEN 's' THEN PERFORM 1 FROM information_schema.sequences WHERE sequence_schema = 'narugn_logic_old' AND sequence_name = x.name; IF FOUND THEN PERFORM setval('narugn_logic.' || x.name, nextval('narugn_logic_old.' || x.name)); ELSE RAISE NOTICE 'state sequence narugn_logic.% not found', x.name; END IF; ELSE RAISE EXCEPTION 'unsupported value % for state_object.kind', x.kind; END CASE; END LOOP; DROP SCHEMA narugn_logic_old CASCADE; PERFORM set_setting('md5(Logic)', i_md5_new); z := z + 1; output := 'NEW ' || i_md5_new; t := clock_timestamp(); RETURN NEXT; z := z + 1; output := 'OLD ' || i_md5_old; t := clock_timestamp(); RETURN NEXT; END IF; END; $BODY$ SET search_path TO narugn, public; -- -- Make a Narugn cell from a database -- CREATE FUNCTION configure_cell ( short_name IN text , full_name IN text , polygon IN polygon , connstr IN text , local_connstr IN text ) RETURNS text LANGUAGE plpgsql AS $BODY$ BEGIN INSERT INTO local_server SELECT short_name, full_name, polygon, connstr, local_connstr; PERFORM reconfigure_plproxy(); PERFORM reset_durable_locks(); RETURN 'OK'; END; $BODY$; CREATE FUNCTION local_server ( connstr IN text ) RETURNS local_server LANGUAGE plproxy AS $BODY$ CONNECT $1; SELECT * FROM local_server LIMIT 1; $BODY$; CREATE FUNCTION configure_cell ( cell IN cds , local_connstr IN text ) RETURNS text LANGUAGE plpgsql AS $BODY$ DECLARE v_x local_server; v_connstr text; BEGIN v_connstr := $2 || ' dbname=' || global2dbname($1); v_x := local_server(v_connstr); RETURN configure_cell ( v_x.id , v_x.full_name , v_x.shape , v_x.connstr , v_x.local_connstr ); END; $BODY$; -------------------------------------------------------------------------------- -- Merging two adjacent clusters -------------------------------------------------------------------------------- CREATE FUNCTION register ( s local_server ) RETURNS VOID LANGUAGE plpgsql AS $BODY$ BEGIN BEGIN INSERT INTO remote_servers ( id , full_name , shape , connstr , local_connstr) VALUES ( (s).id , (s).full_name , (s).shape , (s).connstr , (s).local_connstr); EXCEPTION WHEN unique_violation OR exclusion_violation THEN UPDATE remote_servers SET full_name = (s).full_name , shape = (s).shape , connstr = (s).connstr , local_connstr = (s).local_connstr WHERE id = (s).id; END; PERFORM reconfigure_plproxy(); END; $BODY$ SET search_path TO narugn, public; CREATE FUNCTION are_adjacent ( s1 local_server , s2 local_server ) RETURNS boolean LANGUAGE plpgsql AS $BODY$ BEGIN PERFORM 1 FROM ( WITH l(c) AS ( SELECT ROW(x,y) :: cds FROM locations(s1.shape) f(x,y) ), r(c) AS ( SELECT ROW(x,y) :: cds FROM locations(s2.shape) f(x,y) ) SELECT 1 FROM l JOIN r ON dist2(l.c,r.c) = 1 ) foo; RETURN FOUND; END; $BODY$ SET search_path TO narugn, public; CREATE FUNCTION cell_new_server ( payload IN text[] , walked IN cdt[] , origin_tick IN bigint , rpfp IN boolean DEFAULT false , z OUT bigint , t OUT timestamp with time zone , output OUT text ) RETURNS SETOF RECORD LANGUAGE plpgsql AS $BODY$ -- payload[1]: the new server DECLARE v_local_server local_server; v_remote_server local_server; BEGIN IF rpfp THEN z := 0; SELECT * INTO STRICT v_local_server FROM local_server LIMIT 1; v_remote_server := payload[1] :: local_server; output := are_adjacent(v_remote_server,v_local_server) :: text; RETURN NEXT; ELSIF array_length(payload,1) = 1 THEN PERFORM register(payload[1] :: local_server); ELSE z := 1; t := clock_timestamp(); output := 'ERROR payload = ' || COALESCE(payload :: text, 'NULL'); RETURN NEXT; END IF; END; $BODY$ SET search_path TO narugn, public; CREATE FUNCTION xch ( remote_server local_server ) RETURNS SETOF local_server LANGUAGE plpgsql AS $BODY$ DECLARE v_local_server local_server; BEGIN PERFORM execute_sync('new_server', remote_server :: text); RETURN QUERY SELECT * FROM local_server LIMIT 1; END; $BODY$ SET search_path TO narugn, public; CREATE FUNCTION xch ( connstr text , remote_server local_server ) RETURNS SETOF local_server LANGUAGE plproxy AS $BODY$ CONNECT $1; SELECT * FROM xch($2); $BODY$; CREATE FUNCTION api_connect ( connstr IN text , remote_server OUT local_server ) LANGUAGE plpgsql AS $BODY$ DECLARE v_local_server local_server; BEGIN SELECT * INTO STRICT v_local_server FROM local_server LIMIT 1; remote_server := xch(connstr, v_local_server); PERFORM execute_sync('new_server', remote_server :: text); END; $BODY$ SET search_path TO narugn, public;