/*---------------------------------------------------------- * * foreign-data wrapper for CouchDB * * Copyright (c) 2011, PostgreSQL Global Development Group * * This software is released under the PostgreSQL Licence. * * Author: Zheng Yang * * IDENTIFICATION * couchdb_fdw/couchdb_fdw.c * *---------------------------------------------------------- */ /* Debug mode flag */ /* #define DEBUG */ #include "postgres.h" #include #include "funcapi.h" #include "access/reloptions.h" #include "catalog/pg_foreign_server.h" #include "catalog/pg_foreign_table.h" #include "catalog/pg_user_mapping.h" #include "catalog/pg_type.h" #include "commands/defrem.h" #include "commands/explain.h" #include "foreign/fdwapi.h" #include "foreign/foreign.h" #include "utils/memutils.h" #include "utils/builtins.h" #include "utils/relcache.h" #include "storage/lock.h" #include "miscadmin.h" #include "curl/curl.h" #include #include #include #include "couchdb_fdw.h" PG_MODULE_MAGIC; #define PROCID_TEXTEQ 67 /* * A string buffer to store id's. */ static int BUFFER_SIZE = 5; /* * Array of valid options, the rest will be assumed as column->json attribute mapping. */ static struct CouchDBFdwOption valid_options[] = { /* Foreign server options */ { "address", ForeignServerRelationId }, { "port", ForeignServerRelationId }, /* Foreign table options */ { "database", ForeignTableRelationId }, /* User Mapping options */ { "username", UserMappingRelationId}, { "password", UserMappingRelationId}, /* Sentinel */ { NULL, InvalidOid} }; /* * SQL functions */ extern Datum couchdb_fdw_handler(PG_FUNCTION_ARGS); extern Datum couchdb_fdw_validator(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(couchdb_fdw_handler); PG_FUNCTION_INFO_V1(couchdb_fdw_validator); static void couchdbGetDatabaseSize(const char *address, const int port, const char *database, const char *username, const char *password, int *dbsize); void* palloc_wrapper (void *ctx, size_t sz) { //MemoryContext * mctx = (MemoryContext *) ctx; //return MemoryContextAlloc( (MemoryContext) ctx, sz); return malloc(sz); } void* repalloc_wrapper (void *ctx, void *ptr, size_t sz) { return realloc(ptr, sz); } void pfree_wrapper(void *ctx, void *ptr) { free(ptr); } Datum couchdb_fdw_handler(PG_FUNCTION_ARGS) { FdwRoutine *fdwroutine = makeNode(FdwRoutine); fdwroutine->PlanForeignScan = couchdbPlanForeignScan; fdwroutine->ExplainForeignScan = couchdbExplainForeignScan; fdwroutine->BeginForeignScan = couchdbBeginForeignScan; fdwroutine->IterateForeignScan = couchdbIterateForeignScan; fdwroutine->ReScanForeignScan = couchdbReScanForeignScan; fdwroutine->EndForeignScan = couchdbEndForeignScan; PG_RETURN_POINTER(fdwroutine); } /*START*******************************************************************************************************/ /* * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER, * USER MAPPING or FOREIGN TABLE that uses couchdb_fdw. * * TODO: Should there be a validator function because the options can be any * string unless we have a way to retrieve the column names of the newly created * table. Also we need to know which statement (CREATE SERVER? CREATE FDW? or CREATE TBALE?) * is calling the validator because each will have its own valid options. */ Datum couchdb_fdw_validator(PG_FUNCTION_ARGS) { List *options_list = untransformRelOptions(PG_GETARG_DATUM(0)); Oid catalog = PG_GETARG_OID(1); char *svr_address = NULL; int svr_port = 0; char *svr_database = NULL; char *username = NULL; char *password = NULL; ListCell *cell; #ifdef DEBUG elog(NOTICE, "couchdb_fdw_validator"); #endif /* * Check that the necessary options: address, port, database */ foreach(cell, options_list) { DefElem *def = (DefElem *) lfirst(cell); /* Compain invalid options */ if (!couchdbIsValidOption(def->defname, catalog)) { struct CouchDBFdwOption *opt; StringInfoData buf; /* * Unknown option specified, complain about it. Provide a hint * with list of valid options for the object. */ initStringInfo(&buf); for (opt = valid_options; opt->optname; opt++) { if (catalog == opt->optcontext) appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "", opt->optname); } ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), errmsg("invalid option \"%s\"", def->defname), errhint("Valid options in this context are: %s", buf.len ? buf.data : "") )); } /* Complain about redundent options */ if (strcmp(def->defname, "address") == 0) { if (svr_address) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options: address (%s)", defGetString(def)) )); svr_address = defGetString(def); } else if (strcmp(def->defname, "port") == 0) { if (svr_port) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options: port (%s)", defGetString(def)) )); svr_port = atoi(defGetString(def)); } else if (strcmp(def->defname, "database") == 0) { if (svr_database) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options: database (%s)", defGetString(def)) )); svr_database = defGetString(def); } else if (strcmp(def->defname, "username") == 0) { if (username) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options: username (%s)", defGetString(def)) )); username = defGetString(def); } else if (strcmp(def->defname, "password") == 0) { if (password) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options: password (%s)", defGetString(def)) )); password = defGetString(def); } } PG_RETURN_VOID(); } /* * Check if the provided option is one of the valid options. * context is the Oid of the catalog holding the object the option is for. */ static bool couchdbIsValidOption(const char *option, Oid context) { struct CouchDBFdwOption *opt; #ifdef DEBUG elog(NOTICE, "couchdbIsValidOption"); #endif /* Check if the options presents in the valid option list */ for (opt = valid_options; opt->optname; opt++) { if (context == opt->optcontext && strcmp(opt->optname, option) == 0) return true; } /* Foreign table may have anything as a mapping option */ if (context == ForeignTableRelationId) return true; else return false; } /* * Fetch the options for a couchdb_fdw foreign table. */ static void couchdbGetOptions(Oid foreigntableid, char **address, int *port, char **database, char **username, char **password, List **mapping_list) { ForeignTable *table; ForeignServer *server; UserMapping *mapping; List *options; ListCell *lc; #ifdef DEBUG elog(NOTICE, "couchdbGetOptions"); #endif /* * Extract options from FDW objects. */ table = GetForeignTable(foreigntableid); server = GetForeignServer(table->serverid); mapping = GetUserMapping(GetUserId(), table->serverid); options = NIL; options = list_concat(options, table->options); options = list_concat(options, server->options); options = list_concat(options, mapping->options); *mapping_list = NIL; /* Loop through the options, and get the foreign table options */ foreach(lc, options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "address") == 0) { *address = defGetString(def); continue; } if (strcmp(def->defname, "port") == 0) { *port = atoi(defGetString(def)); continue; } if (strcmp(def->defname, "database") == 0) { *database = defGetString(def); continue; } if (strcmp(def->defname, "username") == 0) { *username = defGetString(def); continue; } if (strcmp(def->defname, "password") == 0) { *password = defGetString(def); continue; } /* Column mapping goes here */ *mapping_list = lappend(*mapping_list, def); } #ifdef DEBUG elog(NOTICE, "list length: %i", (*mapping_list)->length); #endif /* Default values, if required */ if (!*address) *address = "127.0.0.1"; if (!*port) *port = 5984; if (!*database) *database = "test"; if (!*username) *username = NULL; if (!*password) *password = NULL; } /* * couchdbPlanForeignScan * Create a FdwPlan for a scan on the foreign table */ static FdwPlan * couchdbPlanForeignScan(Oid foreigntableid, PlannerInfo *root, RelOptInfo *baserel) { FdwPlan *fdwplan; char *svr_address = NULL; int svr_port = 0; char *svr_database = NULL; char *username = NULL; char *password = NULL; int dbsize = 0; List *col_mapping_list; #ifdef DEBUG elog(NOTICE, "couchdbPlanForeignScan"); #endif /* Fetch options */ couchdbGetOptions(foreigntableid, &svr_address, &svr_port, &svr_database, &username, &password, &col_mapping_list); //elog(NOTICE, "list length: %i", col_mapping_list->length); /* * Construct FdwPlan with cost estimates. */ fdwplan = makeNode(FdwPlan); /* Local databases are probably faster */ if (strcmp(svr_address, "127.0.0.1") == 0 || strcmp(svr_address, "localhost") == 0) { fdwplan->startup_cost = 10; } else fdwplan->startup_cost = 25; fdwplan->total_cost = 100 + fdwplan->startup_cost; fdwplan->fdw_private = NIL; /* not used */ couchdbGetDatabaseSize(svr_address, svr_port, svr_database, username, password, &dbsize); fdwplan->total_cost = fdwplan->total_cost + dbsize; #ifdef DEBUG elog(NOTICE, "new total cost: %f", fdwplan->total_cost); #endif return fdwplan; } /* * couchdbBeginForeignScan * */ static void couchdbBeginForeignScan(ForeignScanState *node, int eflags) { CouchDBFdwExecutionState *festate; char *svr_address; int svr_port; char *svr_database; char *username; char *password; StringInfoData *columns; int num_of_columns; StringInfoData *id_buffer; Relation rel; long long int total_rows; long long int offset; List *col_mapping_list; ListCell *col_mapping; List *qual_list; char *qual_key = NULL; char *qual_value = NULL; bool equal = FALSE; bool pushdown = FALSE; int i; #ifdef DEBUG elog(NOTICE, "couchdbBeginForeignScan"); #endif /* fetch options */ couchdbGetOptions(RelationGetRelid(node->ss.ss_currentRelation), &svr_address, &svr_port, &svr_database, &username, &password, &col_mapping_list); /* fill in id buffer */ couchdbGetAllDocs(svr_address, svr_port, svr_database, username, password, BUFFER_SIZE, NULL, false, &id_buffer, &total_rows, &offset); /* fetch the table column info */ rel = heap_open(RelationGetRelid(node->ss.ss_currentRelation), AccessShareLock); num_of_columns = rel->rd_att->natts; columns = (StringInfoData *) palloc(sizeof(StringInfoData) * num_of_columns); for (i = 0; i < num_of_columns; i++) { StringInfoData col; StringInfoData mapping; bool mapped; /* retrieve the column name */ initStringInfo(&col); appendStringInfo(&col, "%s", NameStr(rel->rd_att->attrs[i]->attname)); mapped = FALSE; /* check if the column name is mapping to a different name in couchdb */ foreach(col_mapping, col_mapping_list) { DefElem *def = (DefElem *) lfirst(col_mapping); if (strcmp(def->defname, col.data) == 0) { initStringInfo(&mapping); appendStringInfo(&mapping, "%s", defGetString(def)); mapped = TRUE; break; } } /* decide which name is going to be used */ if (mapped) columns[i] = mapping; else columns[i] = col; } heap_close(rel, NoLock); /* init fdw state and save it to the fdw state node */ festate = (CouchDBFdwExecutionState *) palloc(sizeof(CouchDBFdwExecutionState)); festate->cursor = 0; festate->address = svr_address; festate->port = svr_port; festate->database = svr_database; festate->username = username; festate->password = password; festate->buffer_size = BUFFER_SIZE; festate->id_buffer = id_buffer; festate->num_of_columns = num_of_columns; festate->columns = columns; festate->total_rows = total_rows; festate->offset = offset; festate->qual_list = NIL; festate->qual_scanned = FALSE; /* Store the additional state info */ festate->attinmeta = TupleDescGetAttInMetadata(node->ss.ss_currentRelation->rd_att); node->fdw_state = (void *) festate; #ifdef DEBUG for (i = 0; iss.ps.plan->qual) { StringInfoData rev; StringInfoData id; ListCell *lc; foreach (lc, node->ss.ps.qual) { /* Only _id can be pushed down */ ExprState *state = lfirst(lc); couchdbGetQual((Node *) state->expr, node->ss.ss_currentRelation->rd_att, col_mapping_list, &qual_key, &qual_value, &equal); if (equal) { if (strcmp(qual_key, "_id") == 0) { initStringInfo(&id); appendStringInfo(&id, "%s", qual_value); pushdown = TRUE; } /* always push in rev first */ if (strcmp(qual_key, "_rev") == 0) { initStringInfo(&rev); appendStringInfo(&rev, "%s", qual_value); qual_list = lappend(qual_list, rev.data); } } } if (pushdown) qual_list = lappend(qual_list, id.data); festate->qual_list = list_copy(qual_list); } } /* * couchdbIterateForeignScan * */ static TupleTableSlot * couchdbIterateForeignScan(ForeignScanState *node) { TupleTableSlot *slot; int cursor; CouchDBFdwExecutionState* fstate; StringInfoData *id_buffer; StringInfoData *column_data; StringInfoData *columns; int num_of_columns; char *svr_address; int svr_port; char *svr_database; char *username; char *password; long long int total_rows; long long int offset; List *qual_list; bool qual_scanned; bool endOfScan; #ifdef DEBUG elog(NOTICE, "couchdbIterateForeignScan"); #endif endOfScan = false; /* obtaining the state of current scan */ fstate = (CouchDBFdwExecutionState *) node->fdw_state; cursor = fstate->cursor; svr_address = fstate->address ; svr_port = fstate->port; svr_database = fstate->database; username = fstate->username; password = fstate->password; id_buffer = fstate->id_buffer; num_of_columns = fstate->num_of_columns; columns = fstate->columns; total_rows = fstate->total_rows; offset = fstate->offset; qual_list = fstate->qual_list; qual_scanned = fstate->qual_scanned; /* got quals to pushdown */ if (list_length(qual_list) != 0) { if (qual_scanned) { endOfScan = TRUE; } else { char *_id; char *_rev; _id = (char *) ((list_length(qual_list) == 1) ? list_nth(qual_list, 0) : list_nth(qual_list, 1)); _rev = (char *) ((list_length(qual_list) == 1) ? NULL : list_nth(qual_list, 1)); couchdbGetDoc(svr_address, svr_port, svr_database, username, password, _id, _rev, columns, num_of_columns, &column_data); fstate->qual_scanned = TRUE; } } else { /* if not the last page */ if (offset + BUFFER_SIZE < total_rows) { /* if the cursor is at the end of the BUFFER */ if (cursor == BUFFER_SIZE) { /* retrieve new set of ids */ char *start_key; start_key = id_buffer[BUFFER_SIZE - 1].data; pfree(id_buffer); id_buffer = (StringInfoData *) palloc(sizeof(StringInfoData) * BUFFER_SIZE); couchdbGetAllDocs(svr_address, svr_port, svr_database, username, password, BUFFER_SIZE, start_key, false, &id_buffer, &total_rows, &offset); fstate->id_buffer = id_buffer; /* reset cursor (skip the first id of next batch)*/ cursor = 1; /* retrieve doc */ column_data = (StringInfoData *) palloc(sizeof(StringInfoData) * num_of_columns); couchdbGetDoc(svr_address, svr_port, svr_database, username, password, id_buffer[cursor].data, NULL, columns, num_of_columns, &column_data); cursor ++; } /* else increment the cursor by 1 */ else { column_data = (StringInfoData *) palloc(sizeof(StringInfoData) * num_of_columns); couchdbGetDoc(svr_address, svr_port, svr_database, username, password, id_buffer[cursor].data, NULL, columns, num_of_columns, &column_data); cursor ++; } } /* if on the last page */ else { int remainder; remainder = total_rows - offset; /* if there are still ids to loop */ if (cursor <= remainder - 1) { column_data = (StringInfoData *) palloc(sizeof(StringInfoData) * num_of_columns); couchdbGetDoc(svr_address, svr_port, svr_database, username, password, id_buffer[cursor].data, NULL, columns, num_of_columns, &column_data); cursor ++; } /* else implies no more records */ else { endOfScan = true; } } /* save the cursor into the state */ fstate->cursor = cursor; /* save the total_rows and offset into the state */ fstate->total_rows = total_rows; fstate->offset = offset; } /* prepare for returning the tuple */ slot = node->ss.ss_ScanTupleSlot; /* Cleanup */ ExecClearTuple(slot); /* if not the end of the scan, construct the tuple */ if (!endOfScan) { /* * put doc into a row */ char **values; HeapTuple tuple; int i; values = (char **) palloc(sizeof(char *) * num_of_columns); for (i = 0; i < num_of_columns; i++) { values[i] = column_data[i].data; } tuple = BuildTupleFromCStrings(fstate->attinmeta, values); ExecStoreTuple(tuple, slot, InvalidBuffer, FALSE); } return slot; } /* * couchdbExplainForeignScan * */ static void couchdbExplainForeignScan(ForeignScanState *node, ExplainState *es) { CouchDBFdwExecutionState *festate; int dbsize; #ifdef DEBUG elog(NOTICE, "couchdbExplainForeignScan"); #endif festate = (CouchDBFdwExecutionState *) node->fdw_state; /* Execute a query to get the database size */ couchdbGetDatabaseSize(festate->address, festate->port, festate->database, festate->username, festate->password, &dbsize); /* Suppress file size if we're not showing cost details */ if (es->costs) { ExplainPropertyLong("Foreign CouchDB Database Size", dbsize, es); } } /* * couchdbEndForeignScan * Finish scanning foreign table and dispose objects used for this scan */ static void couchdbEndForeignScan(ForeignScanState *node) { CouchDBFdwExecutionState *festate; #ifdef DEBUG elog(NOTICE, "couchdbEndForeignScan"); #endif festate = (CouchDBFdwExecutionState *) node->fdw_state; /* if festate is NULL, we are in EXPLAIN; nothing to do */ if (festate) { if (festate->id_buffer) pfree(festate->id_buffer); if (festate->columns) pfree(festate->columns); } } /* * couchdbReScanForeignScan * Rescan table, possibly with new parameters */ static void couchdbReScanForeignScan(ForeignScanState *node) { CouchDBFdwExecutionState *festate; #ifdef DEBUG elog(NOTICE, "couchdbReScanForeignScan"); #endif festate = (CouchDBFdwExecutionState *) node->fdw_state; festate->cursor = 0; } static void couchdbGetDatabaseSize(const char *address, const int port, const char *database, const char *username, const char *password, int *dbsize) { CURL *curl; StringInfoData dbsize_url; StringInfoData auth_str; yajl_handle handle; bool hasAuth; dbsize_context *ctx; yajl_alloc_funcs *funcs; yajl_callbacks *callbacks; #ifdef DEBUG elog(NOTICE, "couchdbGetDatabaseInfo"); #endif if (username != NULL && password != NULL) { initStringInfo(&auth_str); appendStringInfo(&auth_str, "%s:%s@", username, password); hasAuth = true; } else { hasAuth = false; } initStringInfo(&dbsize_url); appendStringInfo(&dbsize_url, "http://%s%s:%i/%s", hasAuth ? auth_str.data : "", address, port,database ); #ifdef DEBUG elog(NOTICE, "%s", dbsize_url.data); #endif callbacks = (yajl_callbacks *) palloc(sizeof(yajl_callbacks)); callbacks->yajl_null = NULL; callbacks->yajl_boolean = NULL; callbacks->yajl_integer = NULL; callbacks->yajl_double = NULL; callbacks->yajl_number = dbsize_handle_number; callbacks->yajl_string = NULL; callbacks->yajl_start_map = NULL; callbacks->yajl_map_key = dbsize_handle_map_key; callbacks->yajl_end_map = NULL; callbacks->yajl_start_array = NULL; callbacks->yajl_end_array = NULL; /* * Initialize YAJL Parser */ funcs = (yajl_alloc_funcs *) palloc(sizeof(yajl_alloc_funcs)); funcs->malloc = palloc_wrapper; funcs->realloc = repalloc_wrapper; funcs->free = pfree_wrapper; funcs->ctx = (void *) &TopMemoryContext; ctx = (dbsize_context *) palloc(sizeof(dbsize_context)); ctx->doc_count = 0; handle = yajl_alloc(callbacks, funcs, (void *) ctx); /* * CURL the database url to get the database meta information */ curl = curl_easy_init(); curl_easy_setopt(curl, CURLOPT_URL, dbsize_url.data); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, couchdbsize_writer); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &handle); curl_easy_perform(curl); curl_easy_cleanup(curl); #ifdef DEBUG elog(NOTICE, "num of docs: %ld", ctx->doc_count); #endif *dbsize = ctx->doc_count; } static size_t couchdbsize_writer(void *buffer, size_t size, size_t nmemb, void *userp) { yajl_handle *handle = (yajl_handle *) userp; yajl_status stat = yajl_parse(*handle, (unsigned char *) buffer, size * nmemb); if (stat != yajl_status_ok) elog(NOTICE, "cannot parse"); return size * nmemb; } static int dbsize_handle_number(void * ctx, const char * s, size_t l) { /* If the value's key is doc_count, store it into the id_buffer array */ dbsize_context *context = (dbsize_context *) ctx; if ( strcmp("doc_count", context->map_key) == 0 ) { context->doc_count = atoi(s); } return 1; } static int dbsize_handle_map_key(void * ctx, const unsigned char * stringVal, size_t stringLen) { dbsize_context *context; StringInfoData map_key; initStringInfo(&map_key); appendBinaryStringInfo(&map_key, (char *) stringVal, (int) stringLen); /* Save the map key to context, the following value parsed will belong to this key */ context = (dbsize_context *) ctx; context->map_key = map_key.data; return 1; } /* * function to retrieve a single doc from couchdb */ static void couchdbGetDoc(const char *address, const int port, const char *database, const char *username, const char *password, const char *id, const char *rev, StringInfoData *columns, const int col_size, StringInfoData **column_data) { CURL *curl; StringInfoData doc_url; StringInfoData auth_str; StringInfoData rev_str; yajl_alloc_funcs* funcs; yajl_callbacks* callbacks; yajl_handle handle; doc_context *ctx; yajl_gen gen; yajl_gen doc_gen; bool hasAuth; bool hasRev; #ifdef DEBUG elog(NOTICE, "couchdbGetDoc"); #endif /* Check if there is authen info specified */ if (username != NULL && password != NULL) { initStringInfo(&auth_str); appendStringInfo(&auth_str, "%s:%s@", username, password); hasAuth = true; } else { hasAuth = false; } /* Check if there is revision number specified */ if (rev != NULL) { initStringInfo(&rev_str); appendStringInfo(&rev_str, "?rev=%s", rev); hasRev = true; } else { hasRev = false; } /* Construct the URL we are going to curl */ initStringInfo(&doc_url); appendStringInfo(&doc_url, "http://%s%s:%i/%s/%s%s", hasAuth ? auth_str.data : "", address, port, database, id, hasRev ? rev_str.data : ""); #ifdef DEBUG elog(NOTICE, "%s", doc_url.data); #endif /* example: http://127.0.0.1:5984/cooldb/123456?rev=1-abc */ callbacks = (yajl_callbacks *) palloc(sizeof(yajl_callbacks)); callbacks->yajl_null = doc_handle_null; callbacks->yajl_boolean = doc_handle_boolean; callbacks->yajl_integer = doc_handle_integer; callbacks->yajl_double = doc_handle_double; callbacks->yajl_number = doc_handle_number; callbacks->yajl_string = doc_handle_string; callbacks->yajl_start_map = doc_handle_start_map; callbacks->yajl_map_key = doc_handle_map_key; callbacks->yajl_end_map = doc_handle_end_map; callbacks->yajl_start_array = doc_handle_start_array; callbacks->yajl_end_array = doc_handle_end_array; /* init column_data array */ *column_data = (StringInfoData *) palloc(sizeof(StringInfoData) * col_size); /* * Initialize YAJL Parser */ funcs = (yajl_alloc_funcs *) palloc(sizeof(yajl_alloc_funcs)); funcs->malloc = palloc_wrapper; funcs->realloc = repalloc_wrapper; funcs->free = pfree_wrapper; funcs->ctx = (void *) &TopMemoryContext; /* * json generator */ gen = yajl_gen_alloc(funcs); doc_gen = yajl_gen_alloc(funcs); /* * parsing context */ ctx = (doc_context *) palloc(sizeof(doc_context)); ctx->depth = 0; ctx->column_list = columns; ctx->column_data = *column_data; ctx->column_index = -1; ctx->num_of_columns = col_size; ctx->gen = gen; ctx->doc_gen = doc_gen; ctx->funcs = funcs; handle = yajl_alloc(callbacks, funcs, (void *) ctx); /* * CURL the database url to get the database meta information */ curl = curl_easy_init(); curl_easy_setopt(curl, CURLOPT_URL, doc_url.data); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, couchdbdoc_writer); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &handle); curl_easy_perform(curl); curl_easy_cleanup(curl); /* if parsing is completed, free the parser */ /* yajl_complete_parse(handle); yajl_gen_free(gen); yajl_free(handle); */ #ifdef DEBUG elog(NOTICE, "columndata: '%s'", (*column_data)[0].data); elog(NOTICE, "columndata: '%s'", (*column_data)[1].data); #endif } static size_t couchdbdoc_writer(void *buffer, size_t size, size_t nmemb, void *userp) { yajl_handle *handle = (yajl_handle *) userp; yajl_status stat = yajl_parse(*handle, (unsigned char *) buffer, size * nmemb); if (stat != yajl_status_ok) { unsigned char * str = yajl_get_error(*handle, 1, (unsigned char *) buffer, size * nmemb); elog(NOTICE, "%s", (char *) str); yajl_free_error(*handle, str); } return size * nmemb; } static int doc_handle_null(void * ctx) { doc_context *context; yajl_gen doc_gen; #ifdef DEBUG elog(NOTICE, "doc_handle_null"); #endif context = (doc_context *) ctx; /* for _doc attr if exits */ doc_gen = context->doc_gen; yajl_gen_null(doc_gen); if (context->column_index == -1) { /* ignore such value */ return 1; } else { if (context->depth == 1) { /* store this value directly in the column array */ StringInfoData str; initStringInfo(&str); appendStringInfo(&str, "%s", "null"); context->column_data[context->column_index] = str; return 1; } else { /* the value need to be pushed into a json structrue */ yajl_gen gen; gen = (yajl_gen) context->gen; return yajl_gen_status_ok == yajl_gen_null(gen); } } } static int doc_handle_boolean(void * ctx, int boolean) { doc_context *context; yajl_gen doc_gen; #ifdef DEBUG elog(NOTICE, "doc_handle_boolean"); #endif context = (doc_context *) ctx; /* for _doc attr if exits */ doc_gen = context->doc_gen; yajl_gen_bool(doc_gen, boolean); if (context->column_index == -1) { /* ignore such value */ return 1; } else { if (context->depth == 1) { /* store this value directly in the column array */ StringInfoData str; initStringInfo(&str); appendStringInfo(&str, "%s", boolean ? "true" : "false"); context->column_data[context->column_index] = str; return 1; } else { /* the value need to be pushed into a json structrue */ yajl_gen gen; gen = (yajl_gen) context->gen; return yajl_gen_status_ok == yajl_gen_bool(gen, boolean); } } } static int doc_handle_integer(void *ctx, long long integerVal) { #ifdef DEBUG elog(NOTICE, "doc_handle_integer"); #endif return 1; } static int doc_handle_double(void *ctx, double doubleVal) { #ifdef DEBUG elog(NOTICE, "doc_handle_double"); #endif return 1; } static int doc_handle_number(void * ctx, const char * s, size_t l) { doc_context *context; yajl_gen doc_gen; #ifdef DEBUG elog(NOTICE, "doc_handle_number"); #endif context = (doc_context *) ctx; /* for _doc attr if exits */ doc_gen = context->doc_gen; yajl_gen_number(doc_gen, s, l); if (context->column_index == -1) { /* ignore such value */ return 1; } else { if (context->depth == 1) { /* store this value directly in the column array */ StringInfoData str; initStringInfo(&str); appendBinaryStringInfo(&str, s, (int) l); context->column_data[context->column_index] = str; return 1; } else { /* the value need to be pushed into a json structrue */ yajl_gen gen; gen = (yajl_gen) context->gen; return yajl_gen_status_ok == yajl_gen_number(gen, s, l); } } } static int doc_handle_string(void * ctx, const unsigned char * stringVal, size_t stringLen) { doc_context *context; yajl_gen doc_gen; context = (doc_context *) ctx; #ifdef DEBUG elog(NOTICE, "doc_handle_string"); #endif /* for _doc attr if exits */ doc_gen = context->doc_gen; yajl_gen_string(doc_gen, stringVal, stringLen); if (context->column_index == -1) { /* ignore such value */ return 1; } else { if (context->depth == 1) { /* store this value directly in the column array */ StringInfoData str; initStringInfo(&str); appendBinaryStringInfo(&str, (char *) stringVal, (int) stringLen); context->column_data[context->column_index] = str; return 1; } else { /* the value need to be pushed into a json structrue */ yajl_gen gen; gen = (yajl_gen) context->gen; return yajl_gen_status_ok == yajl_gen_string(gen, stringVal, stringLen); } } } static int doc_handle_start_map (void *ctx) { doc_context *context; yajl_gen doc_gen; #ifdef DEBUG elog(NOTICE, "doc_handle_start_map"); #endif context = (doc_context *) ctx; (context->depth) ++; /* for _doc attr if exits */ doc_gen = context->doc_gen; yajl_gen_map_open(doc_gen); if (context->column_index == -1) { /* ignore such value */ return 1; } else { /* the value need to be pushed into a json structrue */ yajl_gen gen; gen = (yajl_gen) context->gen; return yajl_gen_status_ok == yajl_gen_map_open(gen); } } static int doc_handle_map_key(void *ctx, const unsigned char * stringVal, size_t stringLen) { doc_context *context; StringInfoData *column_list; StringInfoData map_key; yajl_gen doc_gen; #ifdef DEBUG elog(NOTICE, "doc_handle_map_key"); #endif context = (doc_context *) ctx; initStringInfo(&map_key); appendBinaryStringInfo(&map_key, (char *) stringVal, stringLen); column_list = context->column_list; /* for _doc attr if exits */ doc_gen = context->doc_gen; yajl_gen_string(doc_gen, stringVal, stringLen); if (context->depth == 1) { int i; /* re-init column_index as not-found */ context->column_index = -1; /* if the current map key maps to a column, set the index */ for (i = 0; i < context->num_of_columns; i++) { if (strcmp(column_list[i].data, map_key.data) == 0) { context->column_index = i; } } } else { if (context->column_index == -1) { return 1; } else { yajl_gen gen; gen = (yajl_gen) context->gen; return yajl_gen_status_ok == yajl_gen_string(gen, stringVal, stringLen); } } /* Save the map key to context, the following value parsed will belong to this key */ context->map_key = map_key.data; return 1; } static int doc_handle_end_map(void * ctx) { doc_context *context; yajl_gen doc_gen; #ifdef DEBUG elog(NOTICE, "doc_handle_end_map"); #endif context = (doc_context *) ctx; (context->depth) --; /* for _doc attr if exits */ doc_gen = context->doc_gen; yajl_gen_map_close(doc_gen); if (context->column_index == -1 || context->depth == 0) { if (context->depth == 0) { /* check if there a _doc mapping column which means the entire doc will be used as a column */ int doc_index; bool has_doc = FALSE; const unsigned char *buf; size_t len; StringInfoData str; for (doc_index = 0; doc_index < context->num_of_columns; doc_index++) { if (strcmp(context->column_list[doc_index].data, "_doc") == 0) { has_doc = TRUE; break; } } /* if there is _doc column, we need to flush the doc_gen buffer */ yajl_gen_get_buf(doc_gen, &buf, &len); initStringInfo(&str); appendBinaryStringInfo(&str, (char *) buf, (int) len); context->column_data[doc_index] = str; yajl_gen_clear(doc_gen); yajl_gen_free(doc_gen); } /* ignore such value */ return 1; } else { /* the value need to be pushed into a json structrue */ yajl_gen gen; yajl_gen_status status; gen = (yajl_gen) context->gen; status = yajl_gen_map_close(gen); /* flush buffer */ if (context->depth == 1) { const unsigned char *buf; size_t len; StringInfoData str; yajl_gen_get_buf(gen, &buf, &len); initStringInfo(&str); appendBinaryStringInfo(&str, (char *) buf, (int) len); context->column_data[context->column_index] = str; yajl_gen_clear(gen); yajl_gen_free(gen); /* re-init gen */ gen = yajl_gen_alloc(context->funcs); } return yajl_gen_status_ok == status; } } static int doc_handle_start_array(void * ctx) { doc_context *context; yajl_gen doc_gen; #ifdef DEBUG elog(NOTICE, "doc_handle_start_array"); #endif context = (doc_context *) ctx; context->depth ++; /* for _doc attr if exits */ doc_gen = context->doc_gen; yajl_gen_array_open(doc_gen); if (context->column_index == -1) { /* ignore such value */ return 1; } else { /* the value need to be pushed into a json structrue */ yajl_gen gen; gen = (yajl_gen) context->gen; return yajl_gen_status_ok == yajl_gen_array_open(gen); } } static int doc_handle_end_array(void * ctx) { doc_context *context; yajl_gen doc_gen; #ifdef DEBUG elog(NOTICE, "doc_handle_end_array"); #endif context = (doc_context *) ctx; context->depth --; /* for _doc attr if exits */ doc_gen = context->doc_gen; yajl_gen_array_close(doc_gen); if (context->column_index == -1 || context->depth == 0) { /* ignore such value */ return 1; } else { /* the value need to be pushed into a json structrue */ yajl_gen gen; yajl_gen_status status; gen = (yajl_gen) context->gen; status = yajl_gen_array_close(gen); /* flush buffer */ if (context->depth == 1) { const unsigned char *buf; size_t len; StringInfoData str; yajl_gen_get_buf(gen, &buf, &len); initStringInfo(&str); appendBinaryStringInfo(&str, (char *) buf, (int) len); context->column_data[context->column_index] = str; yajl_gen_clear(gen); yajl_gen_free(gen); /* re-init gen */ gen = yajl_gen_alloc(context->funcs); } return yajl_gen_status_ok == status; } } /* * function to retrieve a list of docs in couchdb */ static void couchdbGetAllDocs(const char *address, const int port, const char *database, const char *username, const char *password, const int limit, const char *startkey, const bool descending, StringInfoData *ids[], long long int *total_rows, long long int *offset) { CURL *curl; StringInfoData alldocs_url; StringInfoData auth_str; StringInfoData startkey_str; StringInfoData limit_str; alldocs_context *ctx; yajl_handle handle; yajl_callbacks *callbacks; yajl_alloc_funcs *funcs; bool hasAuth; bool hasStartkey; bool hasLimit; #ifdef DEBUG elog(NOTICE, "couchdbGetAllDocs"); #endif /* Check if there is authen info specified */ if (username != NULL && password != NULL) { initStringInfo(&auth_str); appendStringInfo(&auth_str, "%s:%s@", username, password); hasAuth = true; } else { hasAuth = false; } /* Check if there is startkey specified */ if (startkey != NULL) { initStringInfo(&startkey_str); appendStringInfo(&startkey_str, "&startkey=\"%s\"", startkey); hasStartkey = true; } else { hasStartkey = false; } /* Check if there is limit specified */ if (limit != -1) { initStringInfo(&limit_str); appendStringInfo(&limit_str, "&limit=%i", limit); hasLimit = true; } else { hasLimit = false; } /* Construct the URL we are going to curl */ initStringInfo(&alldocs_url); appendStringInfo(&alldocs_url, "http://%s%s:%i/%s/_all_docs?descending=%s%s%s", hasAuth ? auth_str.data : "", address, port, database, descending ? "true" : "false", hasStartkey ? startkey_str.data : "", hasLimit ? limit_str.data : ""); /* example: http://127.0.0.1:5984/cooldb/_all_docs?descending=false?startkey=123&limit=50 */ #ifdef DEBUG elog(NOTICE, "%s", alldocs_url.data); #endif /* init array of strings to hold fetched ids */ *ids = (StringInfoData *) palloc(sizeof(StringInfoData) * (hasLimit ? limit : 0)); callbacks = (yajl_callbacks *) palloc(sizeof(yajl_callbacks)); callbacks->yajl_null = NULL; callbacks->yajl_boolean = NULL; callbacks->yajl_integer = NULL; callbacks->yajl_double = NULL; callbacks->yajl_number = alldocs_handle_number; callbacks->yajl_string = alldocs_handle_string; callbacks->yajl_start_map = NULL; callbacks->yajl_map_key = alldocs_handle_map_key; callbacks->yajl_end_map = NULL; callbacks->yajl_start_array = NULL; callbacks->yajl_end_array = NULL; /* * Initialize YAJL Parser */ funcs = (yajl_alloc_funcs *) palloc(sizeof(yajl_alloc_funcs)); funcs->malloc = palloc_wrapper; funcs->realloc = repalloc_wrapper; funcs->free = pfree_wrapper; funcs->ctx = (void *) &TopMemoryContext; ctx = (alldocs_context *) palloc(sizeof(alldocs_context)); ctx->total_rows = 0; ctx->offset = 0; ctx->counter = 0; ctx->ids = *ids; handle = yajl_alloc(callbacks, funcs, (void *) ctx); /* * CURL the database url to get the database meta information */ curl = curl_easy_init(); curl_easy_setopt(curl, CURLOPT_URL, alldocs_url.data); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, alldocs_writer); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &handle); curl_easy_perform(curl); curl_easy_cleanup(curl); *total_rows = ctx->total_rows; *offset = ctx->offset; } static size_t alldocs_writer(void *buffer, size_t size, size_t nmemb, void *userp) { yajl_handle *handle = (yajl_handle *) userp; yajl_status stat = yajl_parse(*handle, (unsigned char *) buffer, size * nmemb); if (stat != yajl_status_ok) elog(NOTICE, "cannot parse"); return size * nmemb; } static int alldocs_handle_number(void * ctx, const char * s, size_t l) { /* If the value's key is doc_count, store it into the id_buffer array */ alldocs_context *context; #ifdef DEBUG elog(NOTICE, "alldocs_handle_number"); #endif context = (alldocs_context *) ctx; if ( strcmp("total_rows", context->map_key) == 0 ) { context->total_rows = (long long int) atoi(s); } else if ( strcmp("offset", context->map_key) == 0 ) { context->offset = (long long int) atoi(s); } return 1; } static int alldocs_handle_string(void * ctx, const unsigned char * stringVal, size_t stringLen) { /* If the value's key is doc_count, store it into the id_buffer array */ alldocs_context *context; StringInfoData str; #ifdef DEBUG elog(NOTICE, "alldocs_handle_string"); #endif context = (alldocs_context *) ctx; if ( strcmp("id", context->map_key) == 0 ) { initStringInfo(&str); appendBinaryStringInfo(&str, (char *) stringVal, (int) stringLen); context->ids[context->counter] = str; (context->counter) ++; } return 1; } static int alldocs_handle_map_key(void * ctx, const unsigned char * stringVal, size_t stringLen) { StringInfoData map_key; alldocs_context * context; #ifdef DEBUG elog(NOTICE, "alldocs_handle_map_key"); #endif initStringInfo(&map_key); appendBinaryStringInfo(&map_key, (char *) stringVal, (int) stringLen); /* Save the map key to context, the following value parsed will belong to this key */ context = (alldocs_context *) ctx; context->map_key = map_key.data; return 1; } /* * get quals in the select if there is one */ static void couchdbGetQual(Node *node, TupleDesc tupdesc, List *col_mapping_list, char **key, char **value, bool *equal) { ListCell *col_mapping; *key = NULL; *value = NULL; *equal = false; #ifdef DEBUG elog(NOTICE, "couchdbGetQual"); #endif if (!node) return; if (IsA(node, OpExpr)) { OpExpr *op = (OpExpr *) node; Node *left, *right; Index varattno; if (list_length(op->args) != 2) return; left = list_nth(op->args, 0); if (!IsA(left, Var)) return; varattno = ((Var *) left)->varattno; right = list_nth(op->args, 1); if (IsA(right, Const)) { StringInfoData buf; initStringInfo(&buf); /* And get the column and value... */ *key = NameStr(tupdesc->attrs[varattno - 1]->attname); *value = TextDatumGetCString(((Const *) right)->constvalue); /* convert qual keys to mapped couchdb attribute name */ foreach(col_mapping, col_mapping_list) { DefElem *def = (DefElem *) lfirst(col_mapping); if (strcmp(def->defname, *key) == 0) { *key = defGetString(def); break; } } /* * We can push down this qual if: * - The operatory is TEXTEQ * - The qual is on the _id column (in addition, _rev column can be also valid) */ if (op->opfuncid == PROCID_TEXTEQ) *equal = true; #ifdef DEBUG elog(NOTICE, "Got qual %s = %s", *key, *value); #endif return; } } return; }