#include #include #include #include "curl/curl.h" #include "postgres.h" #include "fmgr.h" #include "access/reloptions.h" #include "catalog/pg_foreign_table.h" #include "catalog/pg_user_mapping.h" #include "commands/copy.h" #include "commands/defrem.h" #include "commands/explain.h" #include "foreign/fdwapi.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "optimizer/cost.h" #include "postmaster/fork_process.h" #include "storage/fd.h" #include "storage/ipc.h" #include "utils/builtins.h" #include "utils/rel.h" //#include "utils/resowner.h" #include "connutil.h" PG_MODULE_MAGIC; /* * Describes the valid options for objects that use this wrapper. */ struct S3FdwOption { const char *optname; Oid optcontext; /* Oid of catalog in which option may appear */ }; /* * Valid options for s3_fdw. * These options are based on the options for COPY FROM command. * * Note: If you are adding new option for user mapping, you need to modify * s3GetOptions(), which currently doesn't bother to look at user mappings. */ static struct S3FdwOption valid_options[] = { /* File options */ {"filename", ForeignTableRelationId}, {"bucketname", ForeignTableRelationId}, {"hostname", ForeignTableRelationId}, /* Format options */ /* oids option is not supported */ {"format", ForeignTableRelationId}, {"header", ForeignTableRelationId}, {"delimiter", ForeignTableRelationId}, {"quote", ForeignTableRelationId}, {"escape", ForeignTableRelationId}, {"null", ForeignTableRelationId}, {"encoding", ForeignTableRelationId}, {"accesskey", UserMappingRelationId}, {"secretkey", UserMappingRelationId}, /* Sentinel */ {NULL, InvalidOid} }; /* * FDW-specific information for ForeignScanState.fdw_state. */ typedef struct S3FdwExecutionState { char *hostname; char *bucketname; char *filename; /* file to read */ char *accesskey; char *secretkey; List *copy_options; /* merged COPY options, excluding filename */ CopyState cstate; /* state of reading file */ char *datafn; } S3FdwExecutionState; /* * forked processes communicate via FIFO, which is described * in this struct. Some experiments tell that it should be * a bad idead to re-open these FIFO; we prepare two files * as one for synchronizing flag, the other for data transfer. */ typedef struct s3_ipc_context { char datafn[MAXPGPATH]; FILE *datafp; char flagfn[MAXPGPATH]; FILE *flagfp; } s3_ipc_context; /* * Function declarations */ PG_FUNCTION_INFO_V1(s3_test); PG_FUNCTION_INFO_V1(s3_fdw_handler); PG_FUNCTION_INFO_V1(s3_fdw_validator); Datum s3_test(PG_FUNCTION_ARGS); Datum s3_fdw_handler(PG_FUNCTION_ARGS); Datum s3_fdw_validator(PG_FUNCTION_ARGS); /* * FDW callback routines */ static FdwPlan *s3PlanForeignScan(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel); static void s3ExplainForeignScan(ForeignScanState *node, ExplainState *es); static void s3BeginForeignScan(ForeignScanState *node, int eflags); static TupleTableSlot *s3IterateForeignScan(ForeignScanState *node); static void s3ReScanForeignScan(ForeignScanState *node); static void s3EndForeignScan(ForeignScanState *node); /* * Helper functions */ static bool is_valid_option(const char *option, Oid context); static void s3GetOptions(Oid foreigntableid, S3FdwExecutionState *state); static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel, const char *filename, Cost *startup_cost, Cost *total_cost); static size_t header_handler(void *buffer, size_t size, size_t nmemb, void *buf); static size_t body_handler(void *buffer, size_t size, size_t nmemb, void *userp); static size_t write_data_to_buf(void *buffer, size_t size, size_t nmemb, void *buf); static char *create_tempprefix(char *seed); static void s3_on_exit(int code, Datum arg); void _PG_init(void); Datum s3_test(PG_FUNCTION_ARGS) { CURL *curl; StringInfoData buf, url; int sc; char tmp[1024]; struct curl_slist *slist; char *datestring; char *signature; char *hostname; char *bucketname; char *filename; char *accesskey; char *secretkey; hostname = text_to_cstring(PG_GETARG_TEXT_P(0)); bucketname = text_to_cstring(PG_GETARG_TEXT_P(1)); filename = text_to_cstring(PG_GETARG_TEXT_P(2)); accesskey = text_to_cstring(PG_GETARG_TEXT_P(3)); secretkey = text_to_cstring(PG_GETARG_TEXT_P(4)); initStringInfo(&url); appendStringInfo(&url, "http://%s/%s/%s", hostname, bucketname, filename); datestring = httpdate(NULL); signature = s3_signature("GET", datestring, bucketname, filename, secretkey); slist = NULL; snprintf(tmp, sizeof(tmp), "Date: %s", datestring); slist = curl_slist_append(slist, tmp); snprintf(tmp, sizeof(tmp), "Authorization: AWS %s:%s", accesskey, signature); slist = curl_slist_append(slist, tmp); initStringInfo(&buf); curl = curl_easy_init(); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, slist); curl_easy_setopt(curl, CURLOPT_URL, url.data); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_data_to_buf); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buf); sc = curl_easy_perform(curl); curl_easy_cleanup(curl); PG_RETURN_TEXT_P(cstring_to_text(buf.data)); } /* * Foreign-data wrapper handler function: return a struct with pointers * to my callback routines. */ Datum s3_fdw_handler(PG_FUNCTION_ARGS) { FdwRoutine *fdwroutine = makeNode(FdwRoutine); fdwroutine->PlanForeignScan = s3PlanForeignScan; fdwroutine->ExplainForeignScan = s3ExplainForeignScan; fdwroutine->BeginForeignScan = s3BeginForeignScan; fdwroutine->IterateForeignScan = s3IterateForeignScan; fdwroutine->ReScanForeignScan = s3ReScanForeignScan; fdwroutine->EndForeignScan = s3EndForeignScan; PG_RETURN_POINTER(fdwroutine); } /* * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER, * USER MAPPING or FOREIGN TABLE that uses file_fdw. * * Raise an ERROR if the option or its value is considered invalid. */ Datum s3_fdw_validator(PG_FUNCTION_ARGS) { List *options_list = untransformRelOptions(PG_GETARG_DATUM(0)); Oid catalog = PG_GETARG_OID(1); char *filename = NULL, *bucketname = NULL, *hostname = NULL, *accesskey = NULL, *secretkey = NULL; List *copy_options = NIL; ListCell *cell; /* * Check that only options supported by s3_fdw, and allowed for the * current object type, are given. */ foreach(cell, options_list) { DefElem *def = (DefElem *) lfirst(cell); if (!is_valid_option(def->defname, catalog)) { struct S3FdwOption *opt; StringInfoData buf; /* * Unknown option specified, complain about it. Provide a hint * with list of valid options for the object. */ initStringInfo(&buf); for (opt = valid_options; opt->optname; opt++) { if (catalog == opt->optcontext) appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "", opt->optname); } ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), errmsg("invalid option \"%s\"", def->defname), errhint("Valid options in this context are: %s", buf.data))); } /* Separate out filename, since ProcessCopyOptions won't allow it */ if (strcmp(def->defname, "filename") == 0) { if (filename) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); filename = defGetString(def); } else if(strcmp(def->defname, "bucketname") == 0) { if (bucketname) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); bucketname = defGetString(def); } else if(strcmp(def->defname, "hostname") == 0) { if (hostname) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); hostname = defGetString(def); } else if(strcmp(def->defname, "accesskey") == 0) { if (accesskey) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); accesskey = defGetString(def); } else if(strcmp(def->defname, "secretkey") == 0) { if (secretkey) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); secretkey = defGetString(def); } else copy_options = lappend(copy_options, def); } /* * Now apply the core COPY code's validation logic for more checks. */ ProcessCopyOptions(NULL, true, copy_options); /* * Hostname option is required for s3_fdw foreign tables. */ if (catalog == ForeignTableRelationId && hostname == NULL) ereport(ERROR, (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED), errmsg("hostname is required for s3_fdw foreign tables"))); /* * Bucketname option is required for s3_fdw foreign tables. */ if (catalog == ForeignTableRelationId && bucketname == NULL) ereport(ERROR, (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED), errmsg("bucketname is required for s3_fdw foreign tables"))); /* * Filename option is required for s3_fdw foreign tables. */ if (catalog == ForeignTableRelationId && filename == NULL) ereport(ERROR, (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED), errmsg("filename is required for s3_fdw foreign tables"))); PG_RETURN_VOID(); } /* * Check if the provided option is one of the valid options. * context is the Oid of the catalog holding the object the option is for. */ static bool is_valid_option(const char *option, Oid context) { struct S3FdwOption *opt; for (opt = valid_options; opt->optname; opt++) { if (context == opt->optcontext && strcmp(opt->optname, option) == 0) return true; } return false; } /* * Fetch the options for a s3_fdw foreign table. * * We have to separate out "filename", "bucketname" and "hostname" * from the other options because it must not appear in the options * list passed to the core COPY code. */ static void s3GetOptions(Oid foreigntableid, S3FdwExecutionState *state) { ForeignTable *table; ForeignServer *server; ForeignDataWrapper *wrapper; UserMapping *mapping; List *options, *new_options; ListCell *lc; /* * Extract options from FDW objects. */ table = GetForeignTable(foreigntableid); server = GetForeignServer(table->serverid); wrapper = GetForeignDataWrapper(server->fdwid); mapping = GetUserMapping(GetUserId(), table->serverid); options = NIL; options = list_concat(options, wrapper->options); options = list_concat(options, server->options); options = list_concat(options, mapping->options); options = list_concat(options, table->options); /* * Separate out the host, bucket and filename. */ state->hostname = NULL; state->bucketname = NULL; state->filename = NULL; new_options = NIL; foreach(lc, options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "hostname") == 0) { state->hostname = defGetString(def); } else if (strcmp(def->defname, "bucketname") == 0) { state->bucketname = defGetString(def); } else if (strcmp(def->defname, "filename") == 0) { state->filename = defGetString(def); } else if (strcmp(def->defname, "accesskey") == 0) { state->accesskey = defGetString(def); } else if (strcmp(def->defname, "secretkey") == 0) { state->secretkey = defGetString(def); } else new_options = lappend(new_options, def); } /* * The validator should have checked those mandatory options were * included in the options, but check again, just in case. */ if (state->hostname == NULL) elog(ERROR, "hostname is required for s3_fdw foreign tables"); if (state->bucketname == NULL) elog(ERROR, "bucketname is required for s3_fdw foreign tables"); if (state->filename == NULL) elog(ERROR, "filename is required for s3_fdw foreign tables"); state->copy_options = new_options; } /* * s3PlanForeignScan * Create a FdwPlan for a scan on the foreign table */ static FdwPlan * s3PlanForeignScan(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel) { FdwPlan *fdwplan; S3FdwExecutionState state; /* Fetch options -- it's not sure what is needed here */ s3GetOptions(foreigntableid, &state); /* Construct FdwPlan with cost estimates */ fdwplan = makeNode(FdwPlan); estimate_costs(root, baserel, state.filename, &fdwplan->startup_cost, &fdwplan->total_cost); fdwplan->fdw_private = NIL; /* not used */ return fdwplan; } /* * s3ExplainForeignScan * Produce extra output for EXPLAIN */ static void s3ExplainForeignScan(ForeignScanState *node, ExplainState *es) { S3FdwExecutionState state; StringInfoData url; initStringInfo(&url); /* Fetch options */ s3GetOptions(RelationGetRelid(node->ss.ss_currentRelation), &state); appendStringInfo(&url, "http://%s/%s/%s", state.hostname, state.bucketname, state.filename); ExplainPropertyText("Foreign URL", url.data, es); } /* * s3BeginForeignScan * Initiate access to the file by creating CopyState */ static void s3BeginForeignScan(ForeignScanState *node, int eflags) { CopyState cstate; S3FdwExecutionState *festate; StringInfoData buf; char *url, *datestring, *signature; char *prefix; pid_t pid; s3_ipc_context ctx; /* * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. */ if (eflags & EXEC_FLAG_EXPLAIN_ONLY) return; festate = (S3FdwExecutionState *) palloc(sizeof(S3FdwExecutionState)); /* Fetch options of foreign table */ s3GetOptions(RelationGetRelid(node->ss.ss_currentRelation), festate); initStringInfo(&buf); appendStringInfo(&buf, "http://%s/%s/%s", festate->hostname, festate->bucketname, festate->filename); url = pstrdup(buf.data); datestring = httpdate(NULL); signature = s3_signature("GET", datestring, festate->bucketname, festate->filename, festate->secretkey); prefix = create_tempprefix(signature); snprintf(ctx.flagfn, sizeof(ctx.flagfn), "%s.flag", prefix); snprintf(ctx.datafn, sizeof(ctx.datafn), "%s.data", prefix); // unlink(ctx.flagfn); // unlink(ctx.datafn); if (mkfifo(ctx.flagfn, S_IRUSR | S_IWUSR) != 0) elog(ERROR, "mkfifo failed(%d):%s", errno, ctx.flagfn); if (mkfifo(ctx.datafn, S_IRUSR | S_IWUSR) != 0) elog(ERROR, "mkfifo failed(%d):%s", errno, ctx.datafn); /* * Fork to maximize parallelism of input from HTTP and output to SQL. * The spawned child process cheats by on_exit_rest() to die immediately. */ pid = fork_process(); if (pid == 0) /* child */ { struct curl_slist *slist; CURL *curl; int sc; MyProcPid = getpid(); /* reset MyProcPid */ /* * The exit callback routines clean up * unnecessary resources holded the parent process. * The child dies silently when finishing its job. */ on_exit_reset(); /* * Set up request header list */ slist = NULL; initStringInfo(&buf); appendStringInfo(&buf, "Date: %s", datestring); slist = curl_slist_append(slist, buf.data); resetStringInfo(&buf); appendStringInfo(&buf, "Authorization: AWS %s:%s", festate->accesskey, signature); slist = curl_slist_append(slist, buf.data); /* * Set up CURL instance. */ curl = curl_easy_init(); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, slist); curl_easy_setopt(curl, CURLOPT_URL, url); curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_handler); curl_easy_setopt(curl, CURLOPT_HEADERDATA, &ctx); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, body_handler); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &ctx); sc = curl_easy_perform(curl); if (ctx.datafp) FreeFile(ctx.datafp); if (sc != 0) { elog(NOTICE, "%s:curl_easy_perform = %d", url, sc); unlink(ctx.datafn); } curl_slist_free_all(slist); curl_easy_cleanup(curl); proc_exit(0); } elog(DEBUG1, "child pid = %d", pid); { int status; FILE *fp; fp = AllocateFile(ctx.flagfn, PG_BINARY_R); read(fileno(fp), &status, sizeof(int)); FreeFile(fp); unlink(ctx.flagfn); if (status != 200) { elog(ERROR, "bad input from API. Status code: %d", status); } } /* * Create CopyState from FDW options. We always acquire all columns, so * as to match the expected ScanTupleSlot signature. */ cstate = BeginCopyFrom(node->ss.ss_currentRelation, ctx.datafn, NIL, festate->copy_options); /* * Save state in node->fdw_state. We must save enough information to call * BeginCopyFrom() again. */ festate->cstate = cstate; festate->datafn = pstrdup(ctx.datafn); node->fdw_state = (void *) festate; } /* * s3IterateForeignScan * Read next record from the data file and store it into the * ScanTupleSlot as a virtual tuple */ static TupleTableSlot * s3IterateForeignScan(ForeignScanState *node) { S3FdwExecutionState *festate = (S3FdwExecutionState *) node->fdw_state; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; bool found; ErrorContextCallback errcontext; /* Set up callback to identify error line number. */ errcontext.callback = CopyFromErrorCallback; errcontext.arg = (void *) festate->cstate; errcontext.previous = error_context_stack; error_context_stack = &errcontext; /* * The protocol for loading a virtual tuple into a slot is first * ExecClearTuple, then fill the values/isnull arrays, then * ExecStoreVirtualTuple. If we don't find another row in the file, we * just skip the last step, leaving the slot empty as required. * * We can pass ExprContext = NULL because we read all columns from the * file, so no need to evaluate default expressions. * * We can also pass tupleOid = NULL because we don't allow oids for * foreign tables. */ ExecClearTuple(slot); found = NextCopyFrom(festate->cstate, NULL, slot->tts_values, slot->tts_isnull, NULL); if (found) ExecStoreVirtualTuple(slot); /* Remove error callback. */ error_context_stack = errcontext.previous; return slot; } /* * s3EndForeignScan * Finish scanning foreign table and dispose objects used for this scan */ static void s3EndForeignScan(ForeignScanState *node) { S3FdwExecutionState *festate = (S3FdwExecutionState *) node->fdw_state; /* if festate is NULL, we are in EXPLAIN; nothing to do */ if (festate) { EndCopyFrom(festate->cstate); unlink(festate->datafn); } } /* * s3ReScanForeignScan * Rescan table, possibly with new parameters */ static void s3ReScanForeignScan(ForeignScanState *node) { S3FdwExecutionState *festate = (S3FdwExecutionState *) node->fdw_state; EndCopyFrom(festate->cstate); festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation, festate->filename, NIL, festate->copy_options); } /* * Estimate costs of scanning a foreign table. */ static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel, const char *filename, Cost *startup_cost, Cost *total_cost) { // struct stat stat_buf; BlockNumber pages; // int tuple_width; double ntuples; double nrows; Cost run_cost = 0; Cost cpu_per_tuple; /* * Get size of the file. It might not be there at plan time, though, in * which case we have to use a default estimate. */ // if (stat(filename, &stat_buf) < 0) // stat_buf.st_size = 10 * BLCKSZ; /* * Convert size to pages for use in I/O cost estimate below. */ // pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ; // if (pages < 1) // pages = 1; pages = 10; /* * Estimate the number of tuples in the file. We back into this estimate * using the planner's idea of the relation width; which is bogus if not * all columns are being read, not to mention that the text representation * of a row probably isn't the same size as its internal representation. * FIXME later. */ // tuple_width = MAXALIGN(baserel->width) + MAXALIGN(sizeof(HeapTupleHeaderData)); // ntuples = clamp_row_est((double) stat_buf.st_size / (double) tuple_width); ntuples = 1000; /* * Now estimate the number of rows returned by the scan after applying the * baserestrictinfo quals. This is pretty bogus too, since the planner * will have no stats about the relation, but it's better than nothing. */ nrows = ntuples * clauselist_selectivity(root, baserel->baserestrictinfo, 0, JOIN_INNER, NULL); nrows = clamp_row_est(nrows); /* Save the output-rows estimate for the planner */ baserel->rows = nrows; /* * Now estimate costs. We estimate costs almost the same way as * cost_seqscan(), thus assuming that I/O costs are equivalent to a * regular table file of the same size. However, we take per-tuple CPU * costs as 10x of a seqscan, to account for the cost of parsing records. */ run_cost += seq_page_cost * pages; *startup_cost = baserel->baserestrictcost.startup; cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple; run_cost += cpu_per_tuple * ntuples; *total_cost = *startup_cost + run_cost; } static size_t header_handler(void *buffer, size_t size, size_t nmemb, void *userp) { const char *HTTP_1_1 = "HTTP/1.1"; size_t segsize = size * nmemb; s3_ipc_context *ctx = (s3_ipc_context *) userp; if (strncmp(buffer, HTTP_1_1, strlen(HTTP_1_1)) == 0) { int status; status = atoi((char *) buffer + strlen(HTTP_1_1) + 1); ctx->flagfp = AllocateFile(ctx->flagfn, PG_BINARY_W); write(fileno(ctx->flagfp), &status, sizeof(int)); FreeFile(ctx->flagfp); if (status != 200) { ctx->datafp = NULL; /* interrupt */ return 0; } /* iif success */ // ctx->datafp = AllocateFile(ctx->dataname, PG_BINARY_W); ctx->datafp = AllocateFile(ctx->datafn, PG_BINARY_W); } return segsize; } static size_t body_handler(void *buffer, size_t size, size_t nmemb, void *userp) { size_t segsize = size * nmemb; s3_ipc_context *ctx = (s3_ipc_context *) userp; fwrite(buffer, size, nmemb, ctx->datafp); return segsize; } static size_t write_data_to_buf(void *buffer, size_t size, size_t nmemb, void *userp) { size_t segsize = size * nmemb; StringInfo info = (StringInfo) userp; appendBinaryStringInfo(info, (const char *) buffer, segsize); return segsize; } static char * create_tempprefix(char *seed) { char filename[MAXPGPATH], path[MAXPGPATH], *s; snprintf(filename, sizeof(filename), "%u.%s", MyProcPid, seed); s = &filename[0]; while(*s) { if (*s == '/') *s = '%'; s++; } mkdir("base/" PG_TEMP_FILES_DIR, S_IRWXU); snprintf(path, sizeof(path), "base/%s/%s", PG_TEMP_FILES_DIR, filename); return pstrdup(path); } static bool presuffix_test(char *base, char *prefix, char *suffix) { int len, plen, slen; len = strlen(base); plen = strlen(prefix); slen = strlen(suffix); if (len < plen + slen) return false; return memcmp(base, prefix, plen) == 0 && memcmp(base + len - slen, suffix, slen) == 0; } /* * Clean up fifos on process exit. * We don't care other process's fifo since it may be * in use right now. It might be better to delete fifos * as soone as possible, but they don't consume disk space * so let's postpone it till exit, where it's sure to delete. */ static void s3_on_exit(int code, Datum arg) { char prefix[32]; char *dirname = "base/" PG_TEMP_FILES_DIR; DIR *dir; struct dirent *ent; snprintf(prefix, sizeof(prefix), "%d.", MyProcPid); dir = AllocateDir(dirname); while((ent = ReadDir(dir, dirname)) != NULL) { int len; len = strlen(ent->d_name); if (presuffix_test(ent->d_name, prefix, ".data") || presuffix_test(ent->d_name, prefix, ".flag")) { unlink(ent->d_name); } } FreeDir(dir); } void _PG_init(void) { on_proc_exit(s3_on_exit, (Datum) 0); // RegisterResourceReleaseCallback(s3_resource_release, NULL); }