Skip to content

Commit

Permalink
Initial RETURNING implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mkgrgis committed Dec 24, 2024
1 parent af42d07 commit cb754ac
Show file tree
Hide file tree
Showing 17 changed files with 3,719 additions and 210 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ EXTENSION = sqlite_fdw
DATA = sqlite_fdw--1.0.sql sqlite_fdw--1.0--1.1.sql

ifndef REGRESS
REGRESS = extra/sqlite_fdw_post types/bitstring types/bool types/float4 types/float8 types/int4 types/int8 types/numeric types/macaddr types/macaddr8 types/out_of_range types/timestamp types/uuid extra/join extra/limit extra/aggregates extra/prepare extra/select_having extra/select extra/insert extra/update extra/encodings sqlite_fdw type aggregate selectfunc
REGRESS = extra/sqlite_fdw_post types/bitstring types/bool types/float4 types/float8 types/int4 types/int8 types/numeric types/macaddr types/macaddr8 types/out_of_range types/timestamp types/uuid extra/join extra/limit extra/aggregates extra/prepare extra/select_having extra/select extra/insert extra/update extra/encodings sqlite_fdw type aggregate selectfunc extra/returning
endif

REGRESS_OPTS = --encoding=utf8
Expand Down
2 changes: 1 addition & 1 deletion connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ sqlite_cache_stmt(ForeignServer *server, sqlite3_stmt * *stmt)
}

/*
* finalize all sqlite statement
* finalize all SQLite statement
*/
static void
sqlite_finalize_list_stmt(List **list)
Expand Down
210 changes: 152 additions & 58 deletions deparse.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,16 @@ static void sqlite_print_remote_param(int paramindex, Oid paramtype, int32 param
static void sqlite_print_remote_placeholder(Oid paramtype, int32 paramtypmod,
deparse_expr_cxt *context);
static void sqlite_deparse_relation(StringInfo buf, Relation rel);
static void sqlite_deparse_target_list(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel,
Bitmapset *attrs_used, bool qualify_col, List **retrieved_attrs, bool is_concat, bool check_null);
static void sqlite_deparseTargetList(StringInfo buf,
PlannerInfo *root,
Index rtindex,
Relation rel,
bool is_returning,
Bitmapset *attrs_used,
bool qualify_col,
List **retrieved_attrs,
bool is_concat,
bool check_null);
static void sqlite_deparse_column_ref(StringInfo buf, int varno, int varattno, PlannerInfo *root, bool qualify_col, bool dml_context);
static void sqlite_deparse_select(List *tlist, bool is_subquery, List **retrieved_attrs, deparse_expr_cxt *context);
static void sqlite_deparse_subquery_target_list(deparse_expr_cxt *context);
Expand Down Expand Up @@ -1323,7 +1331,7 @@ sqlite_deparse_select(List *tlist, bool is_subquery, List **retrieved_attrs, dep
*/
Relation rel = table_open(rte->relid, NoLock);

sqlite_deparse_target_list(buf, root, foreignrel->relid, rel, fpinfo->attrs_used, false, retrieved_attrs, false, false);
sqlite_deparseTargetList(buf, root, foreignrel->relid, rel, false, fpinfo->attrs_used, false, retrieved_attrs, false, false);

table_close(rel, NoLock);
}
Expand Down Expand Up @@ -1845,6 +1853,60 @@ sqlite_deparse_range_tbl_ref(StringInfo buf, PlannerInfo *root, RelOptInfo *fore
params_list);
}

/*
* Add a RETURNING clause, if needed, to an INSERT/UPDATE/DELETE.
*/
static void
sqlite_deparseReturningList(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
bool trig_after_row,
List *withCheckOptionList,
List *returningList,
List **retrieved_attrs)
{
Bitmapset *attrs_used = NULL;

elog(DEBUG3, "sqlite_fdw : %s", __func__);
if (trig_after_row)
{
/* whole-row reference acquires all non-system columns */
attrs_used =
bms_make_singleton(0 - FirstLowInvalidHeapAttributeNumber);
}

if (withCheckOptionList != NIL)
{
/*
* We need the attrs, non-system and system, mentioned in the local
* query's WITH CHECK OPTION list.
*
* Note: we do this to ensure that WCO constraints will be evaluated
* on the data actually inserted/updated on the remote side, which
* might differ from the data supplied by the core code, for example
* as a result of remote triggers.
*/
pull_varattnos((Node *) withCheckOptionList, rtindex,
&attrs_used);
}

if (returningList != NIL)
{
/*
* We need the attrs, non-system and system, mentioned in the local
* query's RETURNING list.
*/
pull_varattnos((Node *) returningList, rtindex,
&attrs_used);
}
if (attrs_used != NULL)
{
sqlite_deparseTargetList(buf, root, rtindex, rel, true, attrs_used, false,
retrieved_attrs, false, false);
}
else
*retrieved_attrs = NIL;
}

/*
* deparse remote INSERT statement
*
Expand All @@ -1853,10 +1915,11 @@ sqlite_deparse_range_tbl_ref(StringInfo buf, PlannerInfo *root, RelOptInfo *fore
* to *retrieved_attrs.
*/
void
sqlite_deparse_insert(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
List *targetAttrs, bool doNothing,
int *values_end_len)
sqlite_deparseInsertSql(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
List *targetAttrs, bool doNothing,
List *withCheckOptionList, List *returningList,
List **retrieved_attrs, int *values_end_len)
{
#if PG_VERSION_NUM >= 140000
TupleDesc tupdesc = RelationGetDescr(rel);
Expand All @@ -1866,6 +1929,7 @@ sqlite_deparse_insert(StringInfo buf, PlannerInfo *root,
bool first;
ListCell *lc;

elog(DEBUG3, "sqlite_fdw : %s", __func__);
appendStringInfo(buf, "INSERT %sINTO ", doNothing ? "OR IGNORE " : "");
sqlite_deparse_relation(buf, rel);

Expand Down Expand Up @@ -1947,6 +2011,14 @@ sqlite_deparse_insert(StringInfo buf, PlannerInfo *root,
}
else
appendStringInfoString(buf, " DEFAULT VALUES");

if (doNothing)
appendStringInfoString(buf, " ON CONFLICT DO NOTHING");

sqlite_deparseReturningList(buf, root, rtindex, rel,
rel->trigdesc && rel->trigdesc->trig_insert_after_row,
withCheckOptionList, returningList, retrieved_attrs);

*values_end_len = buf->len;
}

Expand Down Expand Up @@ -2019,21 +2091,23 @@ sqlite_deparse_analyze(StringInfo sql, char *dbname, char *relname)
* This is used for both SELECT and RETURNING targetlists.
*/
static void
sqlite_deparse_target_list(StringInfo buf,
PlannerInfo *root,
Index rtindex,
Relation rel,
Bitmapset *attrs_used,
bool qualify_col,
List **retrieved_attrs,
bool is_concat,
bool check_null)
sqlite_deparseTargetList(StringInfo buf,
PlannerInfo *root,
Index rtindex,
Relation rel,
bool is_returning,
Bitmapset *attrs_used,
bool qualify_col,
List **retrieved_attrs,
bool is_concat,
bool check_null)
{
TupleDesc tupdesc = RelationGetDescr(rel);
bool have_wholerow;
bool first;
int i;

elog(DEBUG3, "sqlite_fdw : %s", __func__);
/* If there's a whole-row reference, we'll need all the columns. */
have_wholerow = bms_is_member(0 - FirstLowInvalidHeapAttributeNumber,
attrs_used);
Expand Down Expand Up @@ -2061,6 +2135,8 @@ sqlite_deparse_target_list(StringInfo buf,
else
appendStringInfoString(buf, ", ");
}
else if (is_returning)
appendStringInfoString(buf, " RETURNING ");
else if (is_concat)
appendStringInfoString(buf, "COALESCE(");
else if (check_null)
Expand Down Expand Up @@ -2167,6 +2243,7 @@ sqlite_deparse_column_ref(StringInfo buf, int varno, int varattno, PlannerInfo *
/* Required only to be passed down to deparseTargetList(). */
List *retrieved_attrs;

elog(DEBUG4, "sqlite_fdw : %s , whole row reference", __func__);
/*
* The lock on the relation will be held by upper callers, so it's
* fine to open it with no lock here.
Expand All @@ -2191,12 +2268,12 @@ sqlite_deparse_column_ref(StringInfo buf, int varno, int varattno, PlannerInfo *
* would be true.
*/
appendStringInfoString(buf, "CASE WHEN ");
sqlite_deparse_target_list(buf, root, varno, rel, attrs_used, qualify_col,
sqlite_deparseTargetList(buf, root, varno, rel, false, attrs_used, qualify_col,
&retrieved_attrs, false, true);
appendStringInfoString(buf, "THEN ");

appendStringInfoString(buf, "(\"(\" || ");
sqlite_deparse_target_list(buf, root, varno, rel, attrs_used, qualify_col,
sqlite_deparseTargetList(buf, root, varno, rel, false, attrs_used, qualify_col,
&retrieved_attrs, true, false);
appendStringInfoString(buf, "|| \")\")");
appendStringInfoString(buf, " END");
Expand All @@ -2212,7 +2289,7 @@ sqlite_deparse_column_ref(StringInfo buf, int varno, int varattno, PlannerInfo *
Oid pg_atttyp = 0;
bool no_unification = false;

elog(DEBUG3, "sqlite_fdw : %s , varattrno != 0", __func__);
elog(DEBUG4, "sqlite_fdw : %s , col reference", __func__);
/* varno must not be any of OUTER_VAR, INNER_VAR and INDEX_VAR. */
Assert(!IS_SPECIAL_VARNO(varno));

Expand All @@ -2228,10 +2305,9 @@ sqlite_deparse_column_ref(StringInfo buf, int varno, int varattno, PlannerInfo *
if (strcmp(def->defname, "column_name") == 0)
{
colname = defGetString(def);
elog(DEBUG3, "opt = %s\n", def->defname);
elog(DEBUG1, "altered column name = %s\n", colname);
break;
}
elog(DEBUG1, "column name = %s\n", colname);
}

/*
Expand Down Expand Up @@ -2453,6 +2529,36 @@ sqlite_deparse_expr(Expr *node, deparse_expr_cxt *context)
}
}

/*
* Preferred SQLite affinity from "column_type" foreign column option
* SQLITE_NULL if no value or no normal value
*/
int
preferred_sqlite_affinity (Oid relid, int varattno)
{
char *coltype = NULL;
List *options;
ListCell *lc;

elog(DEBUG4, "sqlite_fdw : %s ", __func__);
if (varattno == 0)
return SQLITE_NULL;

options = GetForeignColumnOptions(relid, varattno);
foreach(lc, options)
{
DefElem *def = (DefElem *) lfirst(lc);

if (strcmp(def->defname, "column_type") == 0)
{
coltype = defGetString(def);
elog(DEBUG4, "column type = %s", coltype);
break;
}
}
return sqlite_affinity_code(coltype);
}

/*
* deparse remote UPDATE statement
*
Expand All @@ -2461,9 +2567,12 @@ sqlite_deparse_expr(Expr *node, deparse_expr_cxt *context)
* to *retrieved_attrs.
*/
void
sqlite_deparse_update(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
List *targetAttrs, List *attnums)
sqlite_deparseUpdateSql(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
List *targetAttrs,
List *withCheckOptionList, List *returningList,
List **retrieved_attrs,
List *conditionAttr)
{
#if PG_VERSION_NUM >= 140000
TupleDesc tupdesc = RelationGetDescr(rel);
Expand Down Expand Up @@ -2499,7 +2608,7 @@ sqlite_deparse_update(StringInfo buf, PlannerInfo *root,
#endif
}
i = 0;
foreach(lc, attnums)
foreach(lc, conditionAttr)
{
int attnum = lfirst_int(lc);

Expand All @@ -2508,36 +2617,10 @@ sqlite_deparse_update(StringInfo buf, PlannerInfo *root,
appendStringInfo(buf, "=?");
i++;
}
}

/*
* Preferred SQLite affinity from "column_type" foreign column option
* SQLITE_NULL if no value or no normal value
*/
int
preferred_sqlite_affinity (Oid relid, int varattno)
{
char *coltype = NULL;
List *options;
ListCell *lc;

elog(DEBUG4, "sqlite_fdw : %s ", __func__);
if (varattno == 0)
return SQLITE_NULL;

options = GetForeignColumnOptions(relid, varattno);
foreach(lc, options)
{
DefElem *def = (DefElem *) lfirst(lc);

if (strcmp(def->defname, "column_type") == 0)
{
coltype = defGetString(def);
elog(DEBUG4, "column type = %s", coltype);
break;
}
}
return sqlite_affinity_code(coltype);
sqlite_deparseReturningList(buf, root, rtindex, rel,
rel->trigdesc && rel->trigdesc->trig_insert_after_row,
withCheckOptionList, returningList, retrieved_attrs);
}

/*
Expand Down Expand Up @@ -2693,16 +2776,18 @@ sqlite_deparse_direct_update_sql(StringInfo buf, PlannerInfo *root,
* to *retrieved_attrs.
*/
void
sqlite_deparse_delete(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
List *attname)
sqlite_deparseDeleteSql(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
List *returningList,
List **retrieved_attrs,
List *conditionAttr)
{
int i = 0;
ListCell *lc;

appendStringInfoString(buf, "DELETE FROM ");
sqlite_deparse_relation(buf, rel);
foreach(lc, attname)
foreach(lc, conditionAttr)
{
int attnum = lfirst_int(lc);

Expand All @@ -2711,6 +2796,15 @@ sqlite_deparse_delete(StringInfo buf, PlannerInfo *root,
appendStringInfo(buf, "=?");
i++;
}

sqlite_deparseReturningList(buf, root, rtindex, rel,
rel->trigdesc && rel->trigdesc->trig_delete_after_row,
NIL, returningList, retrieved_attrs);
if (returningList != NULL)
ereport(WARNING,
(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
errmsg("DELETE ... RETURNING doesn't return values (not implemented)")
));
}

/*
Expand Down
Loading

0 comments on commit cb754ac

Please sign in to comment.