// Copyright (c) 2023-2025 Retake, Inc.
//
// This file is part of ParadeDB - Postgres for Search and Analytics
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
use anyhow::Result;
use pgrx::*;
use supabase_wrappers::prelude::{options_to_hashmap, user_mapping_options};
use crate::duckdb::connection;
use crate::duckdb::parquet::ParquetOption;
use crate::duckdb::utils;
use crate::fdw::base::register_duckdb_view;
use crate::fdw::handler::FdwHandler;
type ParquetSchemaRow = (
Option,
Option,
Option,
Option,
Option,
Option,
Option,
Option,
Option,
Option,
Option,
);
type ParquetDescribeRow = (
Option,
Option,
Option,
Option,
Option,
Option,
);
#[allow(clippy::type_complexity)]
#[pg_extern]
pub fn parquet_describe(
relation: PgRelation,
) -> iter::TableIterator<
'static,
(
name!(column_name, Option),
name!(column_type, Option),
name!(null, Option),
name!(key, Option),
name!(default, Option),
name!(extra, Option),
),
> {
let rows = parquet_describe_impl(relation).unwrap_or_else(|e| {
panic!("{}", e);
});
iter::TableIterator::new(rows)
}
#[allow(clippy::type_complexity)]
#[pg_extern]
pub fn parquet_schema(
relation: PgRelation,
) -> iter::TableIterator<
'static,
(
name!(file_name, Option),
name!(name, Option),
name!(type, Option),
name!(type_length, Option),
name!(repetition_type, Option),
name!(num_children, Option),
name!(converted_type, Option),
name!(scale, Option),
name!(precision, Option),
name!(field_id, Option),
name!(logical_type, Option),
),
> {
let rows = parquet_schema_impl(relation).unwrap_or_else(|e| {
panic!("{}", e);
});
iter::TableIterator::new(rows)
}
#[inline]
fn parquet_schema_impl(relation: PgRelation) -> Result> {
let foreign_table = unsafe { pg_sys::GetForeignTable(relation.oid()) };
let handler = FdwHandler::from(foreign_table);
if FdwHandler::from(foreign_table) != FdwHandler::Parquet {
panic!("relation is not a parquet table");
}
let foreign_server = unsafe { pg_sys::GetForeignServer((*foreign_table).serverid) };
let user_mapping_options = unsafe { user_mapping_options(foreign_server) };
let table_options = unsafe { options_to_hashmap((*foreign_table).options)? };
register_duckdb_view(
relation.name(),
relation.namespace(),
table_options.clone(),
user_mapping_options,
handler,
)?;
let files = utils::format_csv(
table_options
.get(ParquetOption::Files.as_ref())
.expect("table should have files option"),
);
let conn = unsafe { &*connection::get_global_connection().get() };
let query = format!("SELECT * FROM parquet_schema({files})");
let mut stmt = conn.prepare(&query)?;
Ok(stmt
.query_map([], |row| {
Ok((
row.get::<_, Option>(0)?,
row.get::<_, Option>(1)?,
row.get::<_, Option>(2)?,
row.get::<_, Option>(3)?,
row.get::<_, Option>(4)?,
row.get::<_, Option>(5)?,
row.get::<_, Option>(6)?,
row.get::<_, Option>(7)?,
row.get::<_, Option>(8)?,
row.get::<_, Option>(9)?,
row.get::<_, Option>(10)?,
))
})?
.map(|row| row.unwrap())
.collect::>())
}
#[inline]
fn parquet_describe_impl(relation: PgRelation) -> Result> {
let foreign_table = unsafe { pg_sys::GetForeignTable(relation.oid()) };
let handler = FdwHandler::from(foreign_table);
if FdwHandler::from(foreign_table) != FdwHandler::Parquet {
panic!("relation is not a parquet table");
}
let foreign_server = unsafe { pg_sys::GetForeignServer((*foreign_table).serverid) };
let user_mapping_options = unsafe { user_mapping_options(foreign_server) };
let table_options = unsafe { options_to_hashmap((*foreign_table).options)? };
register_duckdb_view(
relation.name(),
relation.namespace(),
table_options.clone(),
user_mapping_options,
handler,
)?;
let files = utils::format_csv(
table_options
.get(ParquetOption::Files.as_ref())
.expect("table should have files option"),
);
let conn = unsafe { &*connection::get_global_connection().get() };
let query = format!("DESCRIBE SELECT * FROM {files}");
let mut stmt = conn.prepare(&query)?;
Ok(stmt
.query_map([], |row| {
Ok((
row.get::<_, Option>(0)?,
row.get::<_, Option>(1)?,
row.get::<_, Option>(2)?,
row.get::<_, Option>(3)?,
row.get::<_, Option>(4)?,
row.get::<_, Option>(5)?,
))
})?
.map(|row| row.unwrap())
.collect::>())
}