#!/usr/bin/env bash
set -euo pipefail

DBNAME="${DBNAME:-pggraph_read_latency}"
CREATE_DB="${CREATE_DB:-1}"
NODE_COUNT="${NODE_COUNT:-10000}"
SMALL_BACKLOG="${SMALL_BACKLOG:-100}"
LARGE_BACKLOG="${LARGE_BACKLOG:-5000}"
SAMPLES="${SAMPLES:-25}"
CONCURRENT_SAMPLES="${CONCURRENT_SAMPLES:-25}"
CLIENTS="${CLIENTS:-4}"
JOBS="${JOBS:-2}"
TIME="${TIME:-30}"
RATE="${RATE:-100}"
QUERY_FRESHNESS="${QUERY_FRESHNESS:-default}"
KEEP_WORKDIR="${KEEP_WORKDIR:-0}"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
GRAPH_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)"
OUTPUT_DIR="${OUTPUT_DIR:-$GRAPH_DIR/target/read-latency}"
TMPDIR_ROOT="${TMPDIR:-$GRAPH_DIR/target/tmp}"
mkdir -p "$OUTPUT_DIR" "$TMPDIR_ROOT"
WORKDIR="$(mktemp -d "$TMPDIR_ROOT/pggraph-read-latency.XXXXXX")"
SAMPLES_CSV="$WORKDIR/read-latency-samples.csv"
SUMMARY_CSV="$WORKDIR/read-latency-summary.csv"

cleanup() {
  if [[ "$KEEP_WORKDIR" == "1" ]]; then
    echo "Keeping workdir: $WORKDIR"
  else
    rm -rf "$WORKDIR"
  fi
}
trap cleanup EXIT

if ! command -v psql >/dev/null 2>&1; then
  echo "psql is required"
  exit 2
fi

if ! command -v pgbench >/dev/null 2>&1; then
  echo "pgbench is required"
  exit 2
fi

case "$QUERY_FRESHNESS" in
  default | off | apply_pending_sync | error_on_pending) ;;
  *)
    echo "QUERY_FRESHNESS must be default, off, apply_pending_sync, or error_on_pending"
    exit 2
    ;;
esac

if [[ "$CREATE_DB" == "1" && "$DBNAME" != "postgres" ]]; then
  dropdb --if-exists "$DBNAME" >/dev/null 2>&1 || true
  createdb "$DBNAME"
fi

echo "label,query_kind,query_freshness,sample,latency_ms,rows_seen,backlog_rows" >"$SAMPLES_CSV"
echo "label,query_kind,query_freshness,samples,p50_ms,p95_ms,p99_ms,min_ms,max_ms,backlog_rows" >"$SUMMARY_CSV"

psql -X -v ON_ERROR_STOP=1 \
  -v node_count="$NODE_COUNT" \
  "$DBNAME" <<'SQL'
CREATE EXTENSION IF NOT EXISTS graph;
SELECT graph.reset();
DROP TABLE IF EXISTS public.graph_read_latency_edges CASCADE;
DROP TABLE IF EXISTS public.graph_read_latency_nodes CASCADE;
DROP SEQUENCE IF EXISTS public.graph_read_latency_node_seq;

CREATE TABLE public.graph_read_latency_nodes (
    id TEXT PRIMARY KEY,
    name TEXT NOT NULL,
    score INT NOT NULL
);

CREATE SEQUENCE public.graph_read_latency_node_seq START WITH 10000001;

CREATE TABLE public.graph_read_latency_edges (
    id BIGSERIAL PRIMARY KEY,
    from_id TEXT NOT NULL REFERENCES public.graph_read_latency_nodes(id) ON UPDATE CASCADE ON DELETE CASCADE,
    to_id TEXT NOT NULL REFERENCES public.graph_read_latency_nodes(id) ON UPDATE CASCADE ON DELETE CASCADE,
    weight INT NOT NULL DEFAULT 1
);

INSERT INTO public.graph_read_latency_nodes (id, name, score)
SELECT i::text, 'seed-' || i::text, i % 1000
FROM generate_series(1, :node_count) AS i;

INSERT INTO public.graph_read_latency_edges (from_id, to_id, weight)
SELECT i::text, (i + 1)::text, 1
FROM generate_series(1, :node_count - 1) AS i;

SELECT graph.add_table('public.graph_read_latency_nodes'::regclass, 'id', ARRAY['name', 'score']);
SELECT graph.add_edge(
    'public.graph_read_latency_edges'::regclass,
    'from_id',
    'public.graph_read_latency_nodes'::regclass,
    'id',
    'read_latency',
    false,
    'weight'
);
SET graph.persist_on_build = on;
SELECT * FROM graph.build();
SELECT graph.enable_sync();

CREATE OR REPLACE FUNCTION public.graph_read_latency_edges_sync_capture()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        INSERT INTO graph._sync_log
            (op, table_oid, table_name, pk, old_pk, new_pk, properties, old_row, new_row, xid)
        VALUES
            ('I', TG_RELID, 'public.graph_read_latency_edges', NEW.id::text, NULL, NEW.id::text,
             jsonb_build_object('from_id', NEW.from_id::text, 'to_id', NEW.to_id::text, 'weight', NEW.weight::text),
             NULL, to_jsonb(NEW), txid_current());
        RETURN NEW;
    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO graph._sync_log
            (op, table_oid, table_name, pk, old_pk, new_pk, properties, old_row, new_row, xid)
        VALUES
            ('U', TG_RELID, 'public.graph_read_latency_edges', NEW.id::text, OLD.id::text, NEW.id::text,
             jsonb_build_object('from_id', NEW.from_id::text, 'to_id', NEW.to_id::text, 'weight', NEW.weight::text),
             to_jsonb(OLD), to_jsonb(NEW), txid_current());
        RETURN NEW;
    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO graph._sync_log
            (op, table_oid, table_name, pk, old_pk, new_pk, properties, old_row, new_row, xid)
        VALUES
            ('D', TG_RELID, 'public.graph_read_latency_edges', OLD.id::text, OLD.id::text, NULL,
             NULL, to_jsonb(OLD), NULL, txid_current());
        RETURN OLD;
    END IF;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

DROP TRIGGER IF EXISTS graph_read_latency_edges_sync_capture ON public.graph_read_latency_edges;
CREATE TRIGGER graph_read_latency_edges_sync_capture
AFTER INSERT OR UPDATE OR DELETE ON public.graph_read_latency_edges
FOR EACH ROW EXECUTE FUNCTION public.graph_read_latency_edges_sync_capture();
SQL

pending_rows() {
  psql -X -q -v ON_ERROR_STOP=1 -tA "$DBNAME" <<'SQL'
SET graph.auto_load = on;
SELECT pending_sync_rows FROM graph.sync_health();
SQL
}

reset_sync_backlog() {
  psql -X -q -v ON_ERROR_STOP=1 "$DBNAME" <<'SQL'
SET graph.persist_on_build = on;
SELECT * FROM graph.build();
TRUNCATE graph._sync_log RESTART IDENTITY;
SELECT graph.enable_sync();
DROP TRIGGER IF EXISTS graph_read_latency_edges_sync_capture ON public.graph_read_latency_edges;
CREATE TRIGGER graph_read_latency_edges_sync_capture
AFTER INSERT OR UPDATE OR DELETE ON public.graph_read_latency_edges
FOR EACH ROW EXECUTE FUNCTION public.graph_read_latency_edges_sync_capture();
SQL
}

wait_for_pending_rows() {
  local minimum="$1"

  for _ in $(seq 1 30); do
    if (( "$(pending_rows)" >= minimum )); then
      return 0
    fi
    sleep 1
  done

  return 1
}

append_backlog() {
  local rows="$1"

  psql -X -v ON_ERROR_STOP=1 -v rows="$rows" "$DBNAME" <<'SQL'
WITH ids AS (
    SELECT nextval('public.graph_read_latency_node_seq') AS id
    FROM generate_series(1, :rows)
),
inserted_nodes AS (
    INSERT INTO public.graph_read_latency_nodes (id, name, score)
    SELECT id::text, 'pending-' || id::text, (id % 1000)::int
    FROM ids
    RETURNING id::bigint
)
INSERT INTO public.graph_read_latency_edges (from_id, to_id, weight)
SELECT '1', id::text, 1
FROM inserted_nodes;
SQL
}

percentile() {
  local file="$1"
  local pct="$2"

  awk -v pct="$pct" '
    { values[++count] = $1 }
    END {
      if (count == 0) {
        print "0";
        exit;
      }
      rank = int((pct * count + 99) / 100);
      if (rank < 1) {
        rank = 1;
      }
      if (rank > count) {
        rank = count;
      }
      print values[rank];
    }
  ' "$file"
}

measure_query() {
  local label="$1"
  local query_kind="$2"
  local samples="$3"
  local sql="$4"
  local backlog_setup="$5"
  local values_file="$WORKDIR/${label}-${query_kind}.latencies"

  if [[ "$backlog_setup" != "live" ]]; then
    reset_sync_backlog
    if (( backlog_setup > 0 )); then
      append_backlog "$backlog_setup"
    fi
  fi

  local backlog
  backlog="$(pending_rows)"

  : >"$values_file"
  for sample in $(seq 1 "$samples"); do
    local result
    local freshness_sql
    if [[ "$QUERY_FRESHNESS" == "default" ]]; then
      freshness_sql="RESET graph.query_freshness;"
    else
      freshness_sql="SET graph.query_freshness = '$QUERY_FRESHNESS';"
    fi
    result="$(
      psql -X -q -v ON_ERROR_STOP=1 -tA "$DBNAME" <<SQL
SET graph.auto_load = on;
$freshness_sql
WITH started AS (SELECT clock_timestamp() AS ts),
     measured AS (
       $sql
     ),
     finished AS (SELECT clock_timestamp() AS ts)
SELECT ((EXTRACT(EPOCH FROM (finished.ts - started.ts)) * 1000)::bigint)::text
       || '|' || measured.rows_seen::text
FROM started, measured, finished;
SQL
    )"

    local latency_ms rows_seen
    IFS='|' read -r latency_ms rows_seen <<<"$result"
    echo "$latency_ms" >>"$values_file"
    echo "$label,$query_kind,$QUERY_FRESHNESS,$sample,$latency_ms,$rows_seen,$backlog" >>"$SAMPLES_CSV"
  done

  sort -n "$values_file" -o "$values_file"

  local p50 p95 p99 min max
  p50="$(percentile "$values_file" 50)"
  p95="$(percentile "$values_file" 95)"
  p99="$(percentile "$values_file" 99)"
  min="$(head -n 1 "$values_file")"
  max="$(tail -n 1 "$values_file")"

  echo "$label,$query_kind,$QUERY_FRESHNESS,$samples,$p50,$p95,$p99,$min,$max,$backlog" >>"$SUMMARY_CSV"
  printf '%-22s %-24s freshness=%s samples=%s backlog=%s p50=%sms p95=%sms p99=%sms min=%sms max=%sms\n' \
    "$label" "$query_kind" "$QUERY_FRESHNESS" "$samples" "$backlog" "$p50" "$p95" "$p99" "$min" "$max"
}

measure_suite() {
  local label="$1"
  local samples="$2"
  local backlog_setup="$3"

  measure_query "$label" "traverse" "$samples" \
    "SELECT count(*) AS rows_seen FROM graph.traverse('public.graph_read_latency_nodes'::regclass, '1', 3, hydrate := false)" \
    "$backlog_setup"
  measure_query "$label" "shortest_path" "$samples" \
    "SELECT count(*) AS rows_seen FROM graph.shortest_path('public.graph_read_latency_nodes'::regclass, '1', 'public.graph_read_latency_nodes'::regclass, '$NODE_COUNT', 20, hydrate := false)" \
    "$backlog_setup"
  measure_query "$label" "weighted_shortest_path" "$samples" \
    "SELECT count(*) AS rows_seen FROM graph.weighted_shortest_path('public.graph_read_latency_nodes'::regclass, '1', 'public.graph_read_latency_nodes'::regclass, '$NODE_COUNT')" \
    "$backlog_setup"
}

measure_suite "no_pending_sync" "$SAMPLES" 0

measure_suite "small_pending_backlog" "$SAMPLES" "$SMALL_BACKLOG"

measure_suite "large_pending_backlog" "$SAMPLES" "$LARGE_BACKLOG"

reset_sync_backlog

writer_sql="$WORKDIR/read-latency-writer.sql"
cat >"$writer_sql" <<'SQL'
BEGIN;
WITH ids AS (
    SELECT nextval('public.graph_read_latency_node_seq') AS id
),
inserted_node AS (
    INSERT INTO public.graph_read_latency_nodes (id, name, score)
    SELECT id::text, 'live-' || id::text, (id % 1000)::int
    FROM ids
    RETURNING id::bigint
)
INSERT INTO public.graph_read_latency_edges (from_id, to_id, weight)
SELECT '1', id::text, 1
FROM inserted_node;
COMMIT;
SQL

pgbench "$DBNAME" \
  --client="$CLIENTS" \
  --jobs="$JOBS" \
  --time="$TIME" \
  --rate="$RATE" \
  --file="$writer_sql" \
  >"$WORKDIR/concurrent-writers.log" 2>&1 &
writer_pid=$!

wait_for_pending_rows 1 || {
  cat "$WORKDIR/concurrent-writers.log"
  echo "concurrent writer did not create pending sync rows"
  exit 1
}

measure_suite "concurrent_writers" "$CONCURRENT_SAMPLES" live

wait "$writer_pid" || {
  cat "$WORKDIR/concurrent-writers.log"
  exit 1
}

final_backlog="$(pending_rows)"

cp "$SAMPLES_CSV" "$OUTPUT_DIR/read-latency-samples.csv"
cp "$SUMMARY_CSV" "$OUTPUT_DIR/read-latency-summary.csv"

echo "Read-latency samples: $OUTPUT_DIR/read-latency-samples.csv"
echo "Read-latency summary: $OUTPUT_DIR/read-latency-summary.csv"
echo "Final sync backlog rows: $final_backlog"
