/*
* pg_bulkload: bin/pg_bulkload.c
*
* Copyright (c) 2007-2023, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
*/
/**
* @file
* @brief Initiator and loader routine for the PostgreSQL high-speed loader.
*
* Calls pg_bulkload() as a user-defined function and performs loading.
*
* If -r option is specified, performs recovery to cancel inconveniences caused
* by errors in the previous loading.
*/
#include "common.h"
#include "pgut/pgut-fe.h"
#include "pgut/pgut-list.h"
const char *PROGRAM_VERSION = PG_BULKLOAD_VERSION;
const char *PROGRAM_URL = "http://github.com/ossc-db/pg_bulkload";
const char *PROGRAM_ISSUES = "http://github.com/ossc-db/pg_bulkload/issues";
/*
* Global variables
*/
/** @brief Database cluster directory. */
char *DataDir = NULL;
/** @Flag do recovery, or bulkload */
static bool recovery = false;
static char *infile = NULL; /* INFILE */
static char *input = NULL; /* INPUT */
static char *output = NULL; /* OUTPUT */
static char *logfile = NULL; /* LOGFILE */
static char *parse_badfile = NULL; /* PARSE_BADFILE */
static char *duplicate_badfile = NULL; /* DUPLICATE_BADFILE */
static List *bulkload_options = NIL;
static bool type_function = false;
static bool type_binary = false;
static bool writer_binary = false;
/*
* The length of the database cluster directory name should be short enough
* so that the length of LSF (load status file) full path name is not longer
* than MAXPGPATH, including the trailing '\0'. Since names of load status
* files are "/pg_bulkload/(oid).(oid).loadstatus" and the max value of oid
* is 4294967295 (10 chars), so we reserve 45 characters.
*/
#define MAX_LOADSTATUS_NAME 45
/*
* Prototypes
*/
static int LoaderLoadMain(List *options);
static List *ParseControlFile(const char *path);
extern int LoaderRecoveryMain(void);
static PGresult *RemoteLoad(PGconn *conn, FILE *copystream, bool isbinary);
static bool ParseControlFileLine(char buf[], char **outKeyword, char **outValue);
static char *TrimSpaces(char *str);
static char *UnquoteString(char *str, char quote, char escape);
static char *FindUnquotedChar(char *str, char target, char quote, char escape);
static void
parse_option(pgut_option *opt, char *arg)
{
opt->source = SOURCE_DEFAULT; /* -o can be specified many times */
if (arg && arg[0])
bulkload_options = lappend(bulkload_options, arg);
if (pg_strcasecmp(arg, "TYPE=FUNCTION") == 0)
type_function = true;
if (pg_strcasecmp(arg, "TYPE=BINARY") == 0 ||
pg_strcasecmp(arg, "TYPE=FIXED") == 0)
type_binary = true;
if (pg_strcasecmp(arg, "WRITER=BINARY") == 0)
writer_binary = true;
}
static pgut_option options[] =
{
/* Dataload options */
{ 's', 'i', "infile" , &infile },
{ 's', 'i', "input" , &input },
{ 's', 'O', "output" , &output },
{ 's', 'l', "logfile" , &logfile },
{ 's', 'P', "parse-badfile" , &parse_badfile },
{ 's', 'u', "duplicate-badfile" , &duplicate_badfile },
{ 'f', 'o', "option" , parse_option },
/* Recovery options */
{ 's', 'D', "pgdata" , &DataDir },
{ 'b', 'r', "recovery" , &recovery },
{ 0 }
};
#define NUM_PATH_OPTIONS 6
/**
* @brief Entry point for pg_bulkload command.
*
* Flow:
*
* - Parses command arguments.
* - Without -r option: Starts the loading.
* - With -r option: Starts the recovery.
*
*
* @param argc [in] Number of arguments.
* @param argv [in] Argument list.
* @return Returns zero if successful, 1 otherwise.
*/
int
main(int argc, char *argv[])
{
char cwd[MAXPGPATH];
char control_file[MAXPGPATH] = "";
int i;
pgut_init(argc, argv);
if (argc < 2)
{
help(false);
return E_PG_OTHER;
}
if (getcwd(cwd, MAXPGPATH) == NULL)
ereport(ERROR,
(errcode(EXIT_FAILURE),
errmsg("cannot read current directory: ")));
i = pgut_getopt(argc, argv, options);
for (; i < argc; i++)
{
if (control_file[0])
ereport(ERROR,
(errcode(EXIT_FAILURE),
errmsg("too many arguments")));
/* make absolute control file path */
if (is_absolute_path(argv[i]))
strlcpy(control_file, argv[i], MAXPGPATH);
else
join_path_components(control_file, cwd, argv[i]);
canonicalize_path(control_file);
}
/*
* Determines data loading or recovery.
*/
if (recovery)
{
/* verify arguments */
if (!DataDir && (DataDir = getenv("PGDATA")) == NULL)
elog(ERROR, "no $PGDATA specified");
if (strlen(DataDir) + MAX_LOADSTATUS_NAME >= MAXPGPATH)
elog(ERROR, "too long $PGDATA path length");
if (control_file[0] != '\0')
elog(ERROR, "invalid argument 'control file' for recovery");
return LoaderRecoveryMain();
}
else
{
/* verify arguments */
if (DataDir)
elog(ERROR, "invalid option '-D' for data load");
if (control_file[0])
bulkload_options = list_concat(
ParseControlFile(control_file), bulkload_options);
/* chdir control_file to the parent directory */
get_parent_directory(control_file);
/* add path options */
for (i = 0; i < NUM_PATH_OPTIONS; i++)
{
const pgut_option *opt = &options[i];
const char *path = *(const char **) opt->var;
char abspath[MAXPGPATH];
char item[MAXPGPATH + 32];
if (path == NULL)
continue;
if ((i == 0 || i == 1) &&
(pg_strcasecmp(path, "stdin") == 0 || type_function))
{
/* special case for stdin and input from function */
strlcpy(abspath, path, lengthof(abspath));
}
else if (is_absolute_path(path) || (i == 2 && !writer_binary))
{
/* absolute path */
strlcpy(abspath, path, lengthof(abspath));
}
else if (opt->source == SOURCE_FILE)
{
/* control file relative path */
join_path_components(abspath, control_file, path);
}
else
{
/* current working directory relative path */
join_path_components(abspath, cwd, path);
}
canonicalize_path(abspath);
snprintf(item, lengthof(item), "%s=%s", opt->lname, abspath);
bulkload_options = lappend(bulkload_options, pgut_strdup(item));
}
return LoaderLoadMain(bulkload_options);
}
}
void
pgut_help(bool details)
{
printf("%s is a bulk data loading tool for PostgreSQL\n", PROGRAM_NAME);
printf("\nUsage:\n");
printf(" Dataload: %s [dataload options] control_file_path\n", PROGRAM_NAME);
printf(" Recovery: %s -r [-D DATADIR]\n", PROGRAM_NAME);
if (!details)
return;
printf("\nDataload options:\n");
printf(" -i, --input=INPUT INPUT path or function\n");
printf(" -O, --output=OUTPUT OUTPUT path or table\n");
printf(" -l, --logfile=LOGFILE LOGFILE path\n");
printf(" -P, --parse-badfile=* PARSE_BADFILE path\n");
printf(" -u, --duplicate-badfile=* DUPLICATE_BADFILE path\n");
printf(" -o, --option=\"key=val\" additional option\n");
printf("\nRecovery options:\n");
printf(" -r, --recovery execute recovery\n");
printf(" -D, --pgdata=DATADIR database directory\n");
}
/**
* @brief Performs data loading.
*
* Invokes pg_bulkload() user-defined function with given parameters
* in single transaction.
*
* @return exitcode (always 0).
*/
static int
LoaderLoadMain(List *options)
{
PGresult *res;
const char *params[1];
StringInfoData buf;
int encoding;
int errors;
ListCell *cell;
if (options == NIL)
ereport(ERROR,
(errcode(EXIT_FAILURE),
errmsg("requires control file or command line options")));
initStringInfo(&buf);
reconnect(ERROR);
encoding = PQclientEncoding(connection);
elog(NOTICE, "BULK LOAD START");
/* form options as text[] */
appendStringInfoString(&buf, "{\"");
foreach (cell, options)
{
const char *item = lfirst(cell);
if (buf.len > 2)
appendStringInfoString(&buf, "\",\"");
/* escape " and \ */
while (*item)
{
if (*item == '"' || *item == '\\')
{
appendStringInfoChar(&buf, '\\');
appendStringInfoChar(&buf, *item);
item++;
}
else if (!IS_HIGHBIT_SET(*item))
{
appendStringInfoChar(&buf, *item);
item++;
}
else
{
int n = PQmblen(item, encoding);
appendBinaryStringInfo(&buf, item, n);
item += n;
}
}
}
appendStringInfoString(&buf, "\"}");
command("BEGIN", 0, NULL);
params[0] = buf.data;
res = execute("SELECT * FROM pgbulkload.pg_bulkload($1)", 1, params);
if (PQresultStatus(res) == PGRES_COPY_IN)
{
PQclear(res);
res = RemoteLoad(connection, stdin, type_binary);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
elog(ERROR, "copy failed: %s", PQerrorMessage(connection));
}
command("COMMIT", 0, NULL);
errors = atoi(PQgetvalue(res, 0, 2)) + /* parse errors */
atoi(PQgetvalue(res, 0, 3)); /* duplicate errors */
elog(NOTICE, "BULK LOAD END\n"
"\t%s Rows skipped.\n"
"\t%s Rows successfully loaded.\n"
"\t%s Rows not loaded due to parse errors.\n"
"\t%s Rows not loaded due to duplicate errors.\n"
"\t%s Rows replaced with new rows.",
PQgetvalue(res, 0, 0), PQgetvalue(res, 0, 1),
PQgetvalue(res, 0, 2), PQgetvalue(res, 0, 3),
PQgetvalue(res, 0, 4));
PQclear(res);
disconnect();
termStringInfo(&buf);
if (errors > 0)
{
elog(WARNING, "some rows were not loaded due to errors.");
return E_PG_USER;
}
else
return 0; /* succeeded without errors */
}
/*
* RemoteLoad : modified handleCopyIn() in bin/psql/copy.c
* sends data to complete a COPY ... FROM STDIN command
*
* conn should be a database connection that you just issued COPY FROM on
* and got back a PGRES_COPY_IN result.
* copystream is the file stream to read the data from.
* isbinary can be set from PQbinaryTuples().
*/
/* read chunk size for COPY IN - size is not critical */
#define COPYBUFSIZ 8192
static PGresult *
RemoteLoad(PGconn *conn, FILE *copystream, bool isbinary)
{
bool OK;
char buf[COPYBUFSIZ];
OK = true;
if (isbinary)
{
while (!interrupted)
{
int buflen;
buflen = fread(buf, 1, COPYBUFSIZ, copystream);
if (buflen <= 0)
break;
if (PQputCopyData(conn, buf, buflen) <= 0)
{
OK = false;
break;
}
}
}
else
{
bool copydone = false;
while (!interrupted && !copydone)
{ /* for each input line ... */
bool firstload;
bool linedone;
firstload = true;
linedone = false;
while (!linedone)
{ /* for each bufferload in line ... */
int linelen;
char *fgresult;
fgresult = fgets(buf, sizeof(buf), copystream);
if (!fgresult)
{
copydone = true;
break;
}
linelen = strlen(buf);
/* current line is done? */
if (linelen > 0 && buf[linelen - 1] == '\n')
linedone = true;
/* check for EOF marker, but not on a partial line */
if (firstload)
{
if (strcmp(buf, "\\.\n") == 0 ||
strcmp(buf, "\\.\r\n") == 0)
{
copydone = true;
break;
}
firstload = false;
}
if (PQputCopyData(conn, buf, linelen) <= 0)
{
OK = false;
copydone = true;
break;
}
}
}
}
if (interrupted)
{
PQputCopyEnd(conn, "canceled by user");
return PQgetResult(conn);
}
/* Check for read error */
if (ferror(copystream))
OK = false;
/* Terminate data transfer */
if (PQputCopyEnd(conn, OK ? NULL : "aborted because of read failure") <= 0)
OK = false;
/* Check command status and return to normal libpq state */
if (!OK)
return NULL;
return PQgetResult(conn);
}
static List *
ParseControlFile(const char *path)
{
#define LINEBUF 1024
char buf[LINEBUF];
int lineno;
FILE *file;
List *items = NIL;
file = pgut_fopen(path, "rt");
for (lineno = 1; fgets(buf, LINEBUF, file); lineno++)
{
char *keyword;
char *value;
int i;
if (!ParseControlFileLine(buf, &keyword, &value))
continue;
/* PATH_OPTIONS */
for (i = 0; i < NUM_PATH_OPTIONS; i++)
{
pgut_option *opt = &options[i];
if (pgut_keyeq(keyword, opt->lname))
{
pgut_setopt(opt, value, SOURCE_FILE);
break;
}
}
/* Other options */
if (i >= NUM_PATH_OPTIONS)
{
size_t len;
char *item;
len = strlen(keyword) + strlen(value) + 2;
item = pgut_malloc(len);
snprintf(item, len, "%s=%s", keyword, value);
items = lappend(items, item);
if (pg_strcasecmp(item, "TYPE=FUNCTION") == 0)
type_function = true;
if (pg_strcasecmp(item, "TYPE=BINARY") == 0 ||
pg_strcasecmp(item, "TYPE=FIXED") == 0)
type_binary = true;
if (pg_strcasecmp(item, "WRITER=BINARY") == 0)
writer_binary = true;
}
}
fclose(file);
return items;
}
/**
* @brief Parse a line in control file.
*/
static bool
ParseControlFileLine(char buf[], char **outKeyword, char **outValue)
{
char *keyword = NULL;
char *value = NULL;
char *p;
char *q;
*outKeyword = NULL;
*outValue = NULL;
if (buf[strlen(buf) - 1] != '\n')
ereport(ERROR,
(errcode(EXIT_FAILURE),
errmsg("too long line \"%s\"", buf)));
p = buf; /* pointer to keyword */
/*
* replace '\n' to '\0'
*/
q = strchr(buf, '\n');
if (q != NULL)
*q = '\0';
/*
* delete strings after a comment letter outside quotations
*/
q = FindUnquotedChar(buf, '#', '"', '\\');
if (q != NULL)
*q = '\0';
/*
* if result of trimming is a null string, it is treated as an empty line
*/
p = TrimSpaces(buf);
if (*p == '\0')
return false;
/*
* devide after '='
*/
q = FindUnquotedChar(buf, '=', '"', '\\');
if (q != NULL)
*q = '\0';
else
ereport(ERROR,
(errcode(EXIT_FAILURE),
errmsg("invalid input \"%s\"", buf)));
q++; /* pointer to input value */
/*
* return a value trimmed space
*/
keyword = TrimSpaces(p);
value = TrimSpaces(q);
if (!keyword[0] || !value[0])
ereport(ERROR,
(errcode(EXIT_FAILURE),
errmsg("invalid input \"%s\"", buf)));
value = UnquoteString(value, '"', '\\');
if (!value)
ereport(ERROR,
(errcode(EXIT_FAILURE),
errmsg("unterminated quoted field")));
*outKeyword = keyword;
*outValue = value;
return true;
}
/**
* @brief Trim white spaces before and after input value.
*
* Flow
*
* - Trim spaces after input value.
* - Search the first non-space character, and return the pointer.
*
* @param input [in/out] Input character string
* @return The pointer for the head of the character string after triming spaces
* @note Input string is over written.
* @note The returned value points the middle of input string.
*/
static char *
TrimSpaces(char *input)
{
char *beg;
char *end;
/* trim spaces at head */
for (beg = input; IsSpace(*beg); beg++);
/* trim spaces at tail */
for (end = beg + strlen(beg); end > beg && IsSpace(end[-1]); end--);
*end = '\0';
return beg;
}
/**
* @brief Trim quotes surrounding string
*
* Quoting character(i.e. quote and escape character) is transformed as follows.
*
* - abc -> abc
* - "abc" -> abc
* - "abc\"123" -> abc"123
* - "abc\\123" -> abc\123
* - "abc\123" -> abc\123
* - "abc"123 -> abc123
* - "abc""123" -> abc123
* - "abc -> NG(error occuring)
*
* @param str [in/out] Proccessed string
* @param quote [in] Quote mark character
* @param escape [in] Escape character
* @retval !NULL String not surrounding quote mark character
* @retval NULL Error(not closed by quote mark)
*/
static char *
UnquoteString(char *str, char quote, char escape)
{
int i; /* Read position */
int j; /* Write position */
int in_quote = 0;
for (i = 0, j = 0; str[i]; i++)
{
/*
* Find an opened quote mark.
*/
if (!in_quote && str[i] == quote)
{
in_quote = 1;
continue;
}
/*
* Find an closing quote mark.
*/
if (in_quote && str[i] == quote)
{
in_quote = 0;
continue;
}
/*
* Find an escape character.
* Process if the next is meta character.
*/
if (in_quote && str[i] == escape)
{
if (str[i + 1] == quote)
{
str[j++] = quote;
i++;
continue;
}
else if (str[i + 1] == escape)
{
str[j++] = escape;
i++;
continue;
}
}
/*
* If it is ordinal character, copy it without modification.
*/
str[j++] = str[i];
}
str[j] = '\0';
/*
* Quote mark is not closed
*/
if (in_quote)
return NULL;
return str;
}
/**
* @brief Find the first specified character outside of quote mark
* @param str [in] Searched string
* @param target [in] Searched character
* @param quote [in] Quote mark
* @param escape [in] Escape character
* @return If the specified character is found outside quoted string, return the
* pointer. If it is not found, return NULL.
*/
static char *
FindUnquotedChar(char *str, char target, char quote, char escape)
{
int i;
bool in_quote = false;
for (i = 0; str[i]; i++)
{
if (str[i] == escape)
{
/*
* Treat it as escape character if it is before meta character
*/
if (str[i + 1] == escape || str[i + 1] == quote)
i++;
}
else if (str[i] == quote)
in_quote = !in_quote;
else if (!in_quote && str[i] == target)
return str + i;
}
return NULL;
}