Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data buffer fixes #24

Merged
merged 6 commits into from
Aug 3, 2016
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 194 additions & 12 deletions odbc_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ typedef struct odbcFdwExecutionState
bool first_iteration;
List *col_position_mask;
List *col_size_array;
List *col_conversion_array;
char *sql_count;
int encoding;
} odbcFdwExecutionState;
Expand Down Expand Up @@ -126,6 +127,8 @@ static struct odbcFdwOption valid_options[] =
{ NULL, InvalidOid}
};

typedef enum { TEXT_CONVERSION, HEX_CONVERSION, BIN_CONVERSION, BOOL_CONVERSION } ColumnConversion;

/*
* SQL functions
*/
Expand Down Expand Up @@ -492,7 +495,14 @@ sql_data_type(
break;
case SQL_VARCHAR :
case SQL_WVARCHAR :
appendStringInfo(sql_type, "varchar(%u)", (unsigned)column_size);
if (column_size <= 255)
{
appendStringInfo(sql_type, "varchar(%u)", (unsigned)column_size);
}
else
{
appendStringInfo(sql_type, "text");
}
break;
case SQL_LONGVARCHAR :
case SQL_WLONGVARCHAR :
Expand All @@ -517,7 +527,13 @@ sql_data_type(
appendStringInfo(sql_type, "float8");
break;
case SQL_BIT :
appendStringInfo(sql_type, "bit(1)");
/* Use boolean instead of bit(1) because:
* * binary types are not yet fully supported
* * boolean is more commonly used in PG
* * With options BoolsAsChar=0 this allows
* preserving boolean columns from pSQL ODBC.
*/
appendStringInfo(sql_type, "boolean");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentantion

please start using a linter or fomater of your choice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that misalignment is due to how github handles tabs, but I sure do need a linter

break;
case SQL_SMALLINT :
case SQL_TINYINT :
Expand All @@ -526,12 +542,14 @@ sql_data_type(
case SQL_BIGINT :
appendStringInfo(sql_type, "bigint");
break;
/*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure you want to comment this block out?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the translation is correct (SQL_BINARY to bit(...) and SQL_VARBINARY to varbit(...)) but we reading thos values is complicated, so we don't support it yet. So we leave the sql_type blank which now makes the column to be ignored.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then I'd remove the block and rely on git to remember the history

case SQL_BINARY :
appendStringInfo(sql_type, "bit(%u)", (unsigned)column_size);
break;
case SQL_VARBINARY :
appendStringInfo(sql_type, "varbit(%u)", (unsigned)column_size);
break;
*/
case SQL_LONGVARBINARY :
appendStringInfo(sql_type, "bytea");
break;
Expand All @@ -550,11 +568,40 @@ sql_data_type(
case SQL_GUID :
appendStringInfo(sql_type, "uuid");
break;
};
}

static SQLULEN
minimum_buffer_size(SQLSMALLINT odbc_data_type)
{
switch(odbc_data_type)
{
case SQL_DECIMAL :
case SQL_NUMERIC :
return 32;
case SQL_INTEGER :
return 12;
case SQL_REAL :
case SQL_FLOAT :
return 18;
case SQL_DOUBLE :
return 26;
case SQL_SMALLINT :
case SQL_TINYINT :
return 6;
case SQL_BIGINT :
return 21;
case SQL_TYPE_DATE :
case SQL_DATE :
return 10;
case SQL_TYPE_TIME :
case SQL_TIME :
return 8;
case SQL_TYPE_TIMESTAMP :
case SQL_TIMESTAMP :
return 20;
default :
ereport(ERROR,
(errcode(ERRCODE_FDW_INVALID_DATA_TYPE),
errmsg("Data type not supported, code %d", odbc_data_type)
));
return 0;
};
}

Expand Down Expand Up @@ -1240,6 +1287,7 @@ odbcIterateForeignScan(ForeignScanState *node)
StringInfoData *table_columns = festate->table_columns;
List *col_position_mask = NIL;
List *col_size_array = NIL;
List *col_conversion_array = NIL;

#ifdef DEBUG
elog(DEBUG1, "odbcIterateForeignScan");
Expand Down Expand Up @@ -1272,10 +1320,12 @@ odbcIterateForeignScan(ForeignScanState *node)
prev_context = MemoryContextSwitchTo(executor_state->es_query_cxt);
col_position_mask = NIL;
col_size_array = NIL;
col_conversion_array = NIL;
num_of_result_cols = columns;
/* Obtain the column information of the first row. */
for (i = 1; i <= columns; i++)
{
ColumnConversion conversion = TEXT_CONVERSION;
found = FALSE;
ColumnName = (SQLCHAR *) palloc(sizeof(SQLCHAR) * MAXIMUM_COLUMN_NAME_LEN);
SQLDescribeCol(stmt,
Expand All @@ -1289,15 +1339,35 @@ odbcIterateForeignScan(ForeignScanState *node)
&NullablePtr);

sql_data_type(DataTypePtr, ColumnSizePtr, DecimalDigitsPtr, NullablePtr, &sql_type);
if (strcmp("bytea", (char*)sql_type.data) == 0)
{
conversion = HEX_CONVERSION;
}
if (strcmp("boolean", (char*)sql_type.data) == 0)
{
conversion = BOOL_CONVERSION;
}
else if (strncmp("bit(",(char*)sql_type.data,4)==0 || strncmp("varbit(",(char*)sql_type.data,7)==0)
{
conversion = BIN_CONVERSION;
}

/* Get the position of the column in the FDW table */
for (k=0; k<num_of_table_cols; k++)
{
if (strcmp(table_columns[k].data, (char *) ColumnName) == 0)
{
SQLULEN min_size = minimum_buffer_size(DataTypePtr);
SQLULEN max_size = 8192;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what types are affected by this hardoced limitation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Text (longvarchar in odbc parlance). In a text column coming from a PostGIS geometry we were getting a size larger than 1M.

This doesn't limit the data that we can read, since we can read it in chunks. But avoids using 1M chunks for every piece of data.

found = TRUE;
col_position_mask = lappend_int(col_position_mask, k);
if (ColumnSizePtr < min_size)
ColumnSizePtr = min_size;
if (ColumnSizePtr > max_size)
ColumnSizePtr = max_size;

col_size_array = lappend_int(col_size_array, (int) ColumnSizePtr);
col_conversion_array = lappend_int(col_conversion_array, (int) conversion);
break;
}
}
Expand All @@ -1306,12 +1376,14 @@ odbcIterateForeignScan(ForeignScanState *node)
{
col_position_mask = lappend_int(col_position_mask, -1);
col_size_array = lappend_int(col_size_array, -1);
col_conversion_array = lappend_int(col_conversion_array, 0);
}
pfree(ColumnName);
}
festate->num_of_result_cols = num_of_result_cols;
festate->col_position_mask = col_position_mask;
festate->col_size_array = col_size_array;
festate->col_conversion_array = col_conversion_array;
festate->first_iteration = FALSE;

MemoryContextSwitchTo(prev_context);
Expand All @@ -1321,6 +1393,7 @@ odbcIterateForeignScan(ForeignScanState *node)
num_of_result_cols = festate->num_of_result_cols;
col_position_mask = festate->col_position_mask;
col_size_array = festate->col_size_array;
col_conversion_array = festate->col_conversion_array;
}

ExecClearTuple(slot);
Expand All @@ -1334,10 +1407,12 @@ odbcIterateForeignScan(ForeignScanState *node)
{
SQLLEN indicator;
char * buf;
size_t buf_used;

int mask_index = i - 1;
int col_size = list_nth_int(col_size_array, mask_index);
int mapped_pos = list_nth_int(col_position_mask, mask_index);
ColumnConversion conversion = list_nth_int(col_conversion_array, mask_index);

/* Ignore this column if position is marked as invalid */
if (mapped_pos == -1)
Expand All @@ -1348,7 +1423,7 @@ odbcIterateForeignScan(ForeignScanState *node)
/* retrieve column data as a zero-terminated string */
/* TODO:
binary fields (SQL_C_BIT, SQL_C_BINARY) do not have
a traling zero; they should be copied as now but without
a trailing zero; they should be copied as now but without
adding 1 to col_size, or using SQL_C_BIT or SQL_C_BINARY
and then encoded into a binary PG literal (e.g. X'...'
or B'...')
Expand All @@ -1358,8 +1433,87 @@ odbcIterateForeignScan(ForeignScanState *node)
SQL_C_TYPE_DATE/SQL_C_TYPE_TIME/SQL_C_TYPE_TIMESTAMP.
And finally, SQL_C_NUMERIC and SQL_C_GUID could also be used.
*/
buf[0] = 0;
buf_used = 0;
ret = SQLGetData(stmt, i, SQL_C_CHAR,
buf, sizeof(char) * (col_size+1), &indicator);
buf_used = indicator;

if (ret == SQL_SUCCESS_WITH_INFO)
{
SQLCHAR sqlstate[5];
SQLGetDiagRec(SQL_HANDLE_STMT, stmt, 1, sqlstate, NULL, NULL, 0, NULL);
if (strcmp((char*)sqlstate, "01S07") == 0)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the meaning of "01S07"? isn't it better as a "named constant"?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the ODBC SQLSTATE for "Fractional truncation".

I'll define a constant to make it more clear.

{
/* Fractional truncation has occured;
* at this point we cannot obtain the lost digits
*/
if (buf[col_size])
{
/* The driver has omitted the trailing */
char *buf2 = (char *) palloc(sizeof(char) * (col_size+2));
strncpy(buf2, buf, col_size+1);
buf2[col_size+1] = 0;
pfree(buf);
buf = buf2;
}
elog(NOTICE,"Truncating number: %s",buf);
}
else
{
/* The output is incomplete, we need to obtain the rest of the data */
char* accum_buffer;
size_t accum_buffer_size;
size_t accum_used;
if (indicator == SQL_NO_TOTAL)
{
/* Unknown total size, must copy part by part */
accum_buffer_size = 0;
accum_buffer = NULL;
accum_used = 0;
while (1)
{
size_t buf_len = buf[col_size] ? col_size + 1 : col_size;
// Allocate new accumulation buffer if necessary
if (accum_used + buf_len > accum_buffer_size)
{
char *new_buff;
accum_buffer_size = accum_buffer_size == 0 ? col_size*2 : accum_buffer_size*2;
new_buff = (char *) palloc(sizeof(char) * (accum_buffer_size+1));
if (accum_buffer)
{
memmove(new_buff, accum_buffer, accum_used);
pfree(accum_buffer);
}
accum_buffer = new_buff;
accum_buffer[accum_used] = 0;
}
// Copy part to the accumulation buffer
strncpy(accum_buffer+accum_used, buf, buf_len);
accum_used += buf_len;
accum_buffer[accum_used] = 0;
// Get new part
if (ret != SQL_SUCCESS_WITH_INFO)
break;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I'd be wary of a while(1) with such break. Is this guaranteed to exit?
  • the nesting level makes this hard to understand. I'm tempted of adding a linter with cyclomatic complexity measure, could this be rewritten in a more readable way?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give it a try at rewriting this.

Than middle-of-the-loop exit point must eventually be take because GetData only returns SQL_SUCCESS_WITH_INFO until the whole data has been read.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave this for later at #25 to speed up current release

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sure

ret = SQLGetData(stmt, i, SQL_C_CHAR, buf, sizeof(char) * (col_size+1), &indicator);
};

}
else
{
/* We need to retrieve indicator more characters */
size_t buf_len = buf[col_size] ? col_size + 1 : col_size;
accum_buffer_size = buf_len + indicator;
accum_buffer = (char *) palloc(sizeof(char) * (accum_buffer_size+1));
strncpy(accum_buffer, buf, buf_len);
accum_buffer[buf_len] = 0;
ret = SQLGetData(stmt, i, SQL_C_CHAR, accum_buffer+buf_len, sizeof(char) * (indicator+1), &indicator);
}
pfree(buf);
buf = accum_buffer;
buf_used = accum_used;
}
}

if (SQL_SUCCEEDED(ret))
{
Expand All @@ -1377,7 +1531,25 @@ odbcIterateForeignScan(ForeignScanState *node)
buf = pg_any_to_server(buf, strlen(buf), festate->encoding);
}
initStringInfo(&col_data);
appendStringInfoString (&col_data, buf);
if (conversion == HEX_CONVERSION)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this block more suitable for a switch-case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmmm, yes it is.

{
appendStringInfoString (&col_data, "\\x");
}
if (conversion == BOOL_CONVERSION)
{
if (buf[0] == 0)
strcpy(buf, "F");
else if (buf[0] == 1)
strcpy(buf, "T");
}
if (conversion == BIN_CONVERSION)
{
ereport(ERROR,
(errcode(ERRCODE_FDW_INVALID_DATA_TYPE),
errmsg("Bit string columns are not supported")
));
}
appendStringInfoString (&col_data, buf);

values[mapped_pos] = col_data.data;
}
Expand Down Expand Up @@ -1575,6 +1747,11 @@ odbcImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
&Nullable);

sql_data_type(DataType, ColumnSize, DecimalDigits, Nullable, &sql_type);
if (is_blank_string(sql_type.data))
{
elog(NOTICE, "Data type not supported (%d) for column %s", DataType, ColumnName);
continue;
}
if (i > 1)
{
appendStringInfo(&col_str, ", ");
Expand Down Expand Up @@ -1719,10 +1896,6 @@ odbcImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
ret = SQLFetch(columns_stmt);
if (SQL_SUCCESS == ret)
{
if (++i > 1)
{
appendStringInfo(&col_str, ", ");
}
ret = SQLGetData(columns_stmt, 4, SQL_C_CHAR, ColumnName, MAXIMUM_COLUMN_NAME_LEN, &indicator);
// check_return(ret, "Reading column name", columns_stmt, SQL_HANDLE_STMT);
ret = SQLGetData(columns_stmt, 5, SQL_C_SSHORT, &DataType, MAXIMUM_COLUMN_NAME_LEN, &indicator);
Expand All @@ -1734,6 +1907,15 @@ odbcImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
ret = SQLGetData(columns_stmt, 11, SQL_C_SSHORT, &Nullable, 0, &indicator);
// check_return(ret, "Reading column nullable", columns_stmt, SQL_HANDLE_STMT);
sql_data_type(DataType, ColumnSize, DecimalDigits, Nullable, &sql_type);
if (is_blank_string(sql_type.data))
{
elog(NOTICE, "Data type not supported (%d) for column %s", DataType, ColumnName);
continue;
}
if (++i > 1)
{
appendStringInfo(&col_str, ", ");
}
appendStringInfo(&col_str, "\"%s\" %s", ColumnName, (char *) sql_type.data);
}
}
Expand Down