Skip to content

Commit

Permalink
Merge pull request #24 from CartoDB/23-getdata
Browse files Browse the repository at this point in the history
Data buffer fixes
  • Loading branch information
Rafa de la Torre authored Aug 3, 2016
2 parents 53090b7 + cb4a4c2 commit 3ea8a68
Showing 1 changed file with 202 additions and 12 deletions.
214 changes: 202 additions & 12 deletions odbc_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ PG_MODULE_MAGIC;
#define MAXIMUM_TABLE_NAME_LEN 255
#define MAXIMUM_COLUMN_NAME_LEN 255

/* Maximum GetData buffer size */
#define MAXIMUM_BUFFER_SIZE 8192

#define ODBC_SQLSTATE_FRACTIONAL_TRUNCATION "01S07"
typedef struct odbcFdwOptions
{
char *schema; /* Foreign schema name */
Expand All @@ -89,6 +93,7 @@ typedef struct odbcFdwExecutionState
bool first_iteration;
List *col_position_mask;
List *col_size_array;
List *col_conversion_array;
char *sql_count;
int encoding;
} odbcFdwExecutionState;
Expand Down Expand Up @@ -126,6 +131,8 @@ static struct odbcFdwOption valid_options[] =
{ NULL, InvalidOid}
};

typedef enum { TEXT_CONVERSION, HEX_CONVERSION, BIN_CONVERSION, BOOL_CONVERSION } ColumnConversion;

/*
* SQL functions
*/
Expand Down Expand Up @@ -492,7 +499,14 @@ sql_data_type(
break;
case SQL_VARCHAR :
case SQL_WVARCHAR :
appendStringInfo(sql_type, "varchar(%u)", (unsigned)column_size);
if (column_size <= 255)
{
appendStringInfo(sql_type, "varchar(%u)", (unsigned)column_size);
}
else
{
appendStringInfo(sql_type, "text");
}
break;
case SQL_LONGVARCHAR :
case SQL_WLONGVARCHAR :
Expand All @@ -517,7 +531,13 @@ sql_data_type(
appendStringInfo(sql_type, "float8");
break;
case SQL_BIT :
appendStringInfo(sql_type, "bit(1)");
/* Use boolean instead of bit(1) because:
* * binary types are not yet fully supported
* * boolean is more commonly used in PG
* * With options BoolsAsChar=0 this allows
* preserving boolean columns from pSQL ODBC.
*/
appendStringInfo(sql_type, "boolean");
break;
case SQL_SMALLINT :
case SQL_TINYINT :
Expand All @@ -526,12 +546,14 @@ sql_data_type(
case SQL_BIGINT :
appendStringInfo(sql_type, "bigint");
break;
/*
case SQL_BINARY :
appendStringInfo(sql_type, "bit(%u)", (unsigned)column_size);
break;
case SQL_VARBINARY :
appendStringInfo(sql_type, "varbit(%u)", (unsigned)column_size);
break;
*/
case SQL_LONGVARBINARY :
appendStringInfo(sql_type, "bytea");
break;
Expand All @@ -550,11 +572,40 @@ sql_data_type(
case SQL_GUID :
appendStringInfo(sql_type, "uuid");
break;
};
}

static SQLULEN
minimum_buffer_size(SQLSMALLINT odbc_data_type)
{
switch(odbc_data_type)
{
case SQL_DECIMAL :
case SQL_NUMERIC :
return 32;
case SQL_INTEGER :
return 12;
case SQL_REAL :
case SQL_FLOAT :
return 18;
case SQL_DOUBLE :
return 26;
case SQL_SMALLINT :
case SQL_TINYINT :
return 6;
case SQL_BIGINT :
return 21;
case SQL_TYPE_DATE :
case SQL_DATE :
return 10;
case SQL_TYPE_TIME :
case SQL_TIME :
return 8;
case SQL_TYPE_TIMESTAMP :
case SQL_TIMESTAMP :
return 20;
default :
ereport(ERROR,
(errcode(ERRCODE_FDW_INVALID_DATA_TYPE),
errmsg("Data type not supported, code %d", odbc_data_type)
));
return 0;
};
}

Expand Down Expand Up @@ -1240,6 +1291,7 @@ odbcIterateForeignScan(ForeignScanState *node)
StringInfoData *table_columns = festate->table_columns;
List *col_position_mask = NIL;
List *col_size_array = NIL;
List *col_conversion_array = NIL;

#ifdef DEBUG
elog(DEBUG1, "odbcIterateForeignScan");
Expand Down Expand Up @@ -1272,10 +1324,12 @@ odbcIterateForeignScan(ForeignScanState *node)
prev_context = MemoryContextSwitchTo(executor_state->es_query_cxt);
col_position_mask = NIL;
col_size_array = NIL;
col_conversion_array = NIL;
num_of_result_cols = columns;
/* Obtain the column information of the first row. */
for (i = 1; i <= columns; i++)
{
ColumnConversion conversion = TEXT_CONVERSION;
found = FALSE;
ColumnName = (SQLCHAR *) palloc(sizeof(SQLCHAR) * MAXIMUM_COLUMN_NAME_LEN);
SQLDescribeCol(stmt,
Expand All @@ -1289,15 +1343,35 @@ odbcIterateForeignScan(ForeignScanState *node)
&NullablePtr);

sql_data_type(DataTypePtr, ColumnSizePtr, DecimalDigitsPtr, NullablePtr, &sql_type);
if (strcmp("bytea", (char*)sql_type.data) == 0)
{
conversion = HEX_CONVERSION;
}
if (strcmp("boolean", (char*)sql_type.data) == 0)
{
conversion = BOOL_CONVERSION;
}
else if (strncmp("bit(",(char*)sql_type.data,4)==0 || strncmp("varbit(",(char*)sql_type.data,7)==0)
{
conversion = BIN_CONVERSION;
}

/* Get the position of the column in the FDW table */
for (k=0; k<num_of_table_cols; k++)
{
if (strcmp(table_columns[k].data, (char *) ColumnName) == 0)
{
SQLULEN min_size = minimum_buffer_size(DataTypePtr);
SQLULEN max_size = MAXIMUM_BUFFER_SIZE;
found = TRUE;
col_position_mask = lappend_int(col_position_mask, k);
if (ColumnSizePtr < min_size)
ColumnSizePtr = min_size;
if (ColumnSizePtr > max_size)
ColumnSizePtr = max_size;

col_size_array = lappend_int(col_size_array, (int) ColumnSizePtr);
col_conversion_array = lappend_int(col_conversion_array, (int) conversion);
break;
}
}
Expand All @@ -1306,12 +1380,14 @@ odbcIterateForeignScan(ForeignScanState *node)
{
col_position_mask = lappend_int(col_position_mask, -1);
col_size_array = lappend_int(col_size_array, -1);
col_conversion_array = lappend_int(col_conversion_array, 0);
}
pfree(ColumnName);
}
festate->num_of_result_cols = num_of_result_cols;
festate->col_position_mask = col_position_mask;
festate->col_size_array = col_size_array;
festate->col_conversion_array = col_conversion_array;
festate->first_iteration = FALSE;

MemoryContextSwitchTo(prev_context);
Expand All @@ -1321,6 +1397,7 @@ odbcIterateForeignScan(ForeignScanState *node)
num_of_result_cols = festate->num_of_result_cols;
col_position_mask = festate->col_position_mask;
col_size_array = festate->col_size_array;
col_conversion_array = festate->col_conversion_array;
}

ExecClearTuple(slot);
Expand All @@ -1334,10 +1411,12 @@ odbcIterateForeignScan(ForeignScanState *node)
{
SQLLEN indicator;
char * buf;
size_t buf_used;

int mask_index = i - 1;
int col_size = list_nth_int(col_size_array, mask_index);
int mapped_pos = list_nth_int(col_position_mask, mask_index);
ColumnConversion conversion = list_nth_int(col_conversion_array, mask_index);

/* Ignore this column if position is marked as invalid */
if (mapped_pos == -1)
Expand All @@ -1348,7 +1427,7 @@ odbcIterateForeignScan(ForeignScanState *node)
/* retrieve column data as a zero-terminated string */
/* TODO:
binary fields (SQL_C_BIT, SQL_C_BINARY) do not have
a traling zero; they should be copied as now but without
a trailing zero; they should be copied as now but without
adding 1 to col_size, or using SQL_C_BIT or SQL_C_BINARY
and then encoded into a binary PG literal (e.g. X'...'
or B'...')
Expand All @@ -1358,8 +1437,87 @@ odbcIterateForeignScan(ForeignScanState *node)
SQL_C_TYPE_DATE/SQL_C_TYPE_TIME/SQL_C_TYPE_TIMESTAMP.
And finally, SQL_C_NUMERIC and SQL_C_GUID could also be used.
*/
buf[0] = 0;
buf_used = 0;
ret = SQLGetData(stmt, i, SQL_C_CHAR,
buf, sizeof(char) * (col_size+1), &indicator);
buf_used = indicator;

if (ret == SQL_SUCCESS_WITH_INFO)
{
SQLCHAR sqlstate[5];
SQLGetDiagRec(SQL_HANDLE_STMT, stmt, 1, sqlstate, NULL, NULL, 0, NULL);
if (strcmp((char*)sqlstate, ODBC_SQLSTATE_FRACTIONAL_TRUNCATION) == 0)
{
/* Fractional truncation has occured;
* at this point we cannot obtain the lost digits
*/
if (buf[col_size])
{
/* The driver has omitted the trailing */
char *buf2 = (char *) palloc(sizeof(char) * (col_size+2));
strncpy(buf2, buf, col_size+1);
buf2[col_size+1] = 0;
pfree(buf);
buf = buf2;
}
elog(NOTICE,"Truncating number: %s",buf);
}
else
{
/* The output is incomplete, we need to obtain the rest of the data */
char* accum_buffer;
size_t accum_buffer_size;
size_t accum_used;
if (indicator == SQL_NO_TOTAL)
{
/* Unknown total size, must copy part by part */
accum_buffer_size = 0;
accum_buffer = NULL;
accum_used = 0;
while (1)
{
size_t buf_len = buf[col_size] ? col_size + 1 : col_size;
// Allocate new accumulation buffer if necessary
if (accum_used + buf_len > accum_buffer_size)
{
char *new_buff;
accum_buffer_size = accum_buffer_size == 0 ? col_size*2 : accum_buffer_size*2;
new_buff = (char *) palloc(sizeof(char) * (accum_buffer_size+1));
if (accum_buffer)
{
memmove(new_buff, accum_buffer, accum_used);
pfree(accum_buffer);
}
accum_buffer = new_buff;
accum_buffer[accum_used] = 0;
}
// Copy part to the accumulation buffer
strncpy(accum_buffer+accum_used, buf, buf_len);
accum_used += buf_len;
accum_buffer[accum_used] = 0;
// Get new part
if (ret != SQL_SUCCESS_WITH_INFO)
break;
ret = SQLGetData(stmt, i, SQL_C_CHAR, buf, sizeof(char) * (col_size+1), &indicator);
};

}
else
{
/* We need to retrieve indicator more characters */
size_t buf_len = buf[col_size] ? col_size + 1 : col_size;
accum_buffer_size = buf_len + indicator;
accum_buffer = (char *) palloc(sizeof(char) * (accum_buffer_size+1));
strncpy(accum_buffer, buf, buf_len);
accum_buffer[buf_len] = 0;
ret = SQLGetData(stmt, i, SQL_C_CHAR, accum_buffer+buf_len, sizeof(char) * (indicator+1), &indicator);
}
pfree(buf);
buf = accum_buffer;
buf_used = accum_used;
}
}

if (SQL_SUCCEEDED(ret))
{
Expand All @@ -1377,7 +1535,29 @@ odbcIterateForeignScan(ForeignScanState *node)
buf = pg_any_to_server(buf, strlen(buf), festate->encoding);
}
initStringInfo(&col_data);
appendStringInfoString (&col_data, buf);
switch (conversion)
{
case TEXT_CONVERSION :
appendStringInfoString (&col_data, buf);
break;
case HEX_CONVERSION :
appendStringInfoString (&col_data, "\\x");
appendStringInfoString (&col_data, buf);
break;
case BOOL_CONVERSION :
if (buf[0] == 0)
strcpy(buf, "F");
else if (buf[0] == 1)
strcpy(buf, "T");
appendStringInfoString (&col_data, buf);
break;
case BIN_CONVERSION :
ereport(ERROR,
(errcode(ERRCODE_FDW_INVALID_DATA_TYPE),
errmsg("Bit string columns are not supported")
));
break;
}

values[mapped_pos] = col_data.data;
}
Expand Down Expand Up @@ -1575,6 +1755,11 @@ odbcImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
&Nullable);

sql_data_type(DataType, ColumnSize, DecimalDigits, Nullable, &sql_type);
if (is_blank_string(sql_type.data))
{
elog(NOTICE, "Data type not supported (%d) for column %s", DataType, ColumnName);
continue;
}
if (i > 1)
{
appendStringInfo(&col_str, ", ");
Expand Down Expand Up @@ -1719,10 +1904,6 @@ odbcImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
ret = SQLFetch(columns_stmt);
if (SQL_SUCCESS == ret)
{
if (++i > 1)
{
appendStringInfo(&col_str, ", ");
}
ret = SQLGetData(columns_stmt, 4, SQL_C_CHAR, ColumnName, MAXIMUM_COLUMN_NAME_LEN, &indicator);
// check_return(ret, "Reading column name", columns_stmt, SQL_HANDLE_STMT);
ret = SQLGetData(columns_stmt, 5, SQL_C_SSHORT, &DataType, MAXIMUM_COLUMN_NAME_LEN, &indicator);
Expand All @@ -1734,6 +1915,15 @@ odbcImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
ret = SQLGetData(columns_stmt, 11, SQL_C_SSHORT, &Nullable, 0, &indicator);
// check_return(ret, "Reading column nullable", columns_stmt, SQL_HANDLE_STMT);
sql_data_type(DataType, ColumnSize, DecimalDigits, Nullable, &sql_type);
if (is_blank_string(sql_type.data))
{
elog(NOTICE, "Data type not supported (%d) for column %s", DataType, ColumnName);
continue;
}
if (++i > 1)
{
appendStringInfo(&col_str, ", ");
}
appendStringInfo(&col_str, "\"%s\" %s", ColumnName, (char *) sql_type.data);
}
}
Expand Down

0 comments on commit 3ea8a68

Please sign in to comment.