-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathsqlite_fdw.h
422 lines (349 loc) · 14.2 KB
/
sqlite_fdw.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
/*-------------------------------------------------------------------------
*
* SQLite Foreign Data Wrapper for PostgreSQL
*
* Portions Copyright (c) 2018, TOSHIBA CORPORATION
*
* IDENTIFICATION
* sqlite_fdw.h
*
*-------------------------------------------------------------------------
*/
#ifndef SQLITE_FDW_H
#define SQLITE_FDW_H
#include "sqlite3.h"
#if (PG_VERSION_NUM >= 120000)
#include "nodes/pathnodes.h"
#include "access/table.h"
#include "utils/float.h"
#include "optimizer/optimizer.h"
#else
#include "nodes/relation.h"
#include "optimizer/var.h"
#endif
#include "funcapi.h"
#include "fmgr.h"
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "utils/rel.h"
#include "funcapi.h"
#define SQLITE_PREFETCH_ROWS 100
#define SQLITE_BLKSIZ (1024 * 4)
#define SQLITE_PORT 3306
#define MAXDATALEN 1024 * 64
#define WAIT_TIMEOUT 0
#define INTERACTIVE_TIMEOUT 0
#define CR_NO_ERROR 0
#define SQLITE_FDW_BIT_DATATYPE_BUF_SIZE sizeof(sqlite3_int64) * CHAR_BIT + 1
#define MACADDR_LEN 6
#define MACADDR8_LEN 8
#if (PG_VERSION_NUM < 120000)
#define table_close(rel, lock) heap_close(rel, lock)
#define table_open(rel, lock) heap_open(rel, lock)
#define exec_rt_fetch(rtindex, estate) rt_fetch(rtindex, estate->es_range_table)
#endif
/* Code version is updated at new release. */
#define CODE_VERSION 20500
#if (PG_VERSION_NUM < 100000)
/*
* Is the given relation a simple relation i.e a base or "other" member
* relation?
*/
#define IS_SIMPLE_REL(rel) \
((rel)->reloptkind == RELOPT_BASEREL || \
(rel)->reloptkind == RELOPT_OTHER_MEMBER_REL)
/* Is the given relation a join relation? */
#define IS_JOIN_REL(rel) \
((rel)->reloptkind == RELOPT_JOINREL)
/* Is the given relation an upper relation? */
#define IS_UPPER_REL(rel) \
((rel)->reloptkind == RELOPT_UPPER_REL)
/* Is the given relation an "other" relation? */
#define IS_OTHER_REL(rel) \
((rel)->reloptkind == RELOPT_OTHER_MEMBER_REL)
#endif
#if PG_VERSION_NUM < 130000
#define list_concat(X, Y) list_concat(X, list_copy(Y))
#endif
#if PG_VERSION_NUM < 120000
/* NullableDatum is introduced from PG12, we define it here in case of PG11 or earlier. */
typedef struct NullableDatum
{
#define FIELDNO_NULLABLE_DATUM_DATUM 0
Datum value;
#define FIELDNO_NULLABLE_DATUM_ISNULL 1
bool isnull;
/* due to alignment padding this could be used for flags for free */
} NullableDatum;
#endif
/*
* Options structure to store the Sqlite
* server information
*/
typedef struct sqlite_opt
{
int svr_port; /* SQLite port number */
char *svr_address; /* SQLite server ip address */
char *svr_database; /* SQLite database name */
char *svr_table; /* SQLite table name */
char *svr_init_command; /* SQLite SQL statement to execute when
* connecting to the SQLite server. */
unsigned long max_blob_size; /* Max blob size to read without
* truncation */
} sqlite_opt;
/* Struct for extra information passed to sqlite_estimate_path_cost_size() */
typedef struct SqliteFdwPathExtraData
{
PathTarget *target;
bool has_final_sort;
bool has_limit;
double limit_tuples;
int64 count_est;
int64 offset_est;
} SqliteFdwPathExtraData;
/*
* FDW-specific information for ForeignScanState
* fdw_state.
*/
typedef struct SQLiteFdwExecState
{
ForeignServer *server; /* Foreign server handle */
ForeignTable *table; /* Foreign scan deal with this foreign table */
sqlite3 *conn; /* SQLite connection handle */
sqlite3_stmt *stmt; /* SQLite prepared stament handle */
char *query; /* Query string */
Relation rel; /* relcache entry for the foreign table */
TupleDesc tupdesc; /* tuple descriptor of scan */
AttInMetadata *attinmeta; /* attribute datatype conversion */
List *retrieved_attrs; /* list of target attribute numbers */
bool cursor_exists; /* have we created the cursor? */
int numParams; /* number of parameters passed to query */
FmgrInfo *param_flinfo; /* output conversion functions for them */
List *param_exprs; /* executable expressions for param values */
const char **param_values; /* textual values of query parameters */
Oid *param_types; /* type of query parameters */
int p_nums; /* number of parameters to transmit */
FmgrInfo *p_flinfo; /* output conversion functions for them */
/* batch operation stuff */
int num_slots; /* number of slots to insert */
char *orig_query; /* original text of INSERT command */
List *target_attrs; /* list of target attribute numbers */
int values_end; /* length up to the end of VALUES */
sqlite_opt *sqliteFdwOptions; /* SQLite FDW options */
List *attr_list; /* query attribute list */
List *column_list; /* Column list of SQLite Column structures */
int64 row_nums; /* number of rows */
Datum **rows; /* all rows of scan */
int64 rowidx; /* current index of rows */
bool **rows_isnull; /* is null */
bool for_update; /* true if this scan is update target */
int batch_size; /* value of FDW option "batch_size" */
/* working memory context */
MemoryContext temp_cxt; /* context for per-tuple temporary data */
AttrNumber *junk_idx;
} SqliteFdwExecState;
typedef struct SqliteFdwRelationInfo
{
/*
* True means that the relation can be pushed down. Always true for simple
* foreign scan.
*/
bool pushdown_safe;
/* baserestrictinfo clauses, broken down into safe and unsafe subsets. */
List *remote_conds;
List *local_conds;
/* Actual remote restriction clauses for scan (sans RestrictInfos) */
List *final_remote_exprs;
/* Estimated size and cost for a scan or join. */
double rows;
int width;
Cost startup_cost;
Cost total_cost;
/* Costs excluding costs for transferring data from the foreign server */
double retrieved_rows;
Cost rel_startup_cost;
Cost rel_total_cost;
/* Options extracted from catalogs. */
Cost fdw_startup_cost;
Cost fdw_tuple_cost;
/* Bitmap of attr numbers we need to fetch from the remote server. */
Bitmapset *attrs_used;
/* True means that the query_pathkeys is safe to push down */
bool qp_is_pushdown_safe;
/* Cost and selectivity of local_conds. */
QualCost local_conds_cost;
Selectivity local_conds_sel;
/* Selectivity of join conditions */
Selectivity joinclause_sel;
/* Join information */
RelOptInfo *outerrel;
RelOptInfo *innerrel;
JoinType jointype;
List *joinclauses;
/* Upper relation information */
UpperRelationKind stage;
/* Cached catalog information. */
ForeignTable *table;
ForeignServer *server;
int fetch_size; /* fetch size for this remote table */
/*
* Name of the relation while EXPLAINing ForeignScan. It is used for join
* relations but is set for all relations. For join relation, the name
* indicates which foreign tables are being joined and the join type used.
*/
char *relation_name;
/* Grouping information */
List *grouped_tlist;
/* Subquery information */
bool make_outerrel_subquery; /* do we deparse outerrel as a
* subquery? */
bool make_innerrel_subquery; /* do we deparse innerrel as a
* subquery? */
Relids lower_subquery_rels; /* all relids appearing in lower
* subqueries */
#if (PG_VERSION_NUM >= 170000)
Relids hidden_subquery_rels; /* relids, which can't be referred to
* from upper relations, used
* internally for equivalence member
* search */
#endif
/*
* Index of the relation. It is used to create an alias to a subquery
* representing the relation.
*/
int relation_index;
/* Function pushdown surppot in target list */
bool is_tlist_func_pushdown;
} SqliteFdwRelationInfo;
/*
* Execution state of a foreign scan that modifies a foreign table directly.
*/
typedef struct SqliteFdwDirectModifyState
{
ForeignServer *server; /* Foreign server handle */
ForeignTable *table; /* Foreign scan deal with this foreign table */
Relation rel; /* relcache entry for the foreign table */
AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
/* extracted fdw_private data */
char *query; /* text of UPDATE/DELETE command */
bool has_returning; /* is there a RETURNING clause? */
List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
bool set_processed; /* do we set the command es_processed? */
/* for remote query execution */
sqlite3 *conn; /* connection for the update */
sqlite3_stmt *stmt; /* SQLite prepared stament handle */
int numParams; /* number of parameters passed to query */
FmgrInfo *param_flinfo; /* output conversion functions for them */
List *param_exprs; /* executable expressions for param values */
const char **param_values; /* textual values of query parameters */
Oid *param_types; /* type of query parameters */
/* for storing result tuples */
int num_tuples; /* # of result tuples */
int next_tuple; /* index of next one to return */
Relation resultRel; /* relcache entry for the target relation */
AttrNumber *attnoMap; /* array of attnums of input user columns */
AttrNumber ctidAttno; /* attnum of input ctid column */
AttrNumber oidAttno; /* attnum of input oid column */
bool hasSystemCols; /* are there system columns of resultRel? */
/* working memory context */
MemoryContext temp_cxt; /* context for per-tuple temporary data */
} SqliteFdwDirectModifyState;
typedef struct blobOutput
{
const char* dat;
int len;
} blobOutput;
extern bool sqlite_is_foreign_expr(PlannerInfo *root,
RelOptInfo *baserel,
Expr *expr);
extern bool sqlite_is_foreign_param(PlannerInfo *root,
RelOptInfo *baserel,
Expr *expr);
extern bool sqlite_is_foreign_pathkey(PlannerInfo *root,
RelOptInfo *baserel,
PathKey *pathkey);
extern bool sqlite_is_foreign_function_tlist(PlannerInfo *root,
RelOptInfo *baserel,
List *tlist);
extern bool sqlite_is_builtin(Oid objectId);
extern EquivalenceMember *sqlite_find_em_for_rel(PlannerInfo *root,
EquivalenceClass *ec,
RelOptInfo *rel);
extern EquivalenceMember *sqlite_find_em_for_rel_target(PlannerInfo *root,
EquivalenceClass *ec,
RelOptInfo *fallbackRel);
/* Base ∞ constants */
extern const char * infs;
extern const char * infl;
extern const char hex_dig[];
/* in sqlite_fdw.c */
extern int sqlite_set_transmission_modes(void);
extern void sqlite_reset_transmission_modes(int nestlevel);
extern const int sqlite_affinity_code(char* t);
/* option.c headers */
extern sqlite_opt * sqlite_get_options(Oid foreigntableid);
/* depare.c headers */
extern void sqlite_deparse_select_stmt_for_rel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel,
List *tlist, List *remote_conds, List *pathkeys,
bool has_final_sort, bool has_limit, bool is_subquery,
List **retrieved_attrs, List **params_list);
extern void sqlite_deparse_insert(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *targetAttrs, bool doNothing, int *values_end_len);
#if PG_VERSION_NUM >= 140000
extern void sqlite_rebuild_insert(StringInfo buf, Relation rel, char *orig_query, List *target_attrs, int values_end_len, int num_params, int num_rows);
extern void sqlite_deparse_truncate(StringInfo buf, List *rels);
#endif
extern void sqlite_deparse_update(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *targetAttrs, List *attname);
extern void sqlite_deparse_direct_update_sql(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
RelOptInfo *foreignrel,
List *targetlist,
List *targetAttrs,
List *remote_conds,
List **params_list,
List **retrieved_attrs);
extern void sqlite_deparse_delete(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *name);
extern void sqlite_deparse_direct_delete_sql(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
RelOptInfo *foreignrel,
List *remote_conds,
List **params_list,
List **retrieved_attrs);
extern void sqlite_deparse_analyze(StringInfo buf, char *dbname, char *relname);
extern void sqlite_deparse_string_literal(StringInfo buf, const char *val);
extern List *sqlite_build_tlist_to_deparse(RelOptInfo *foreignrel);
int sqlite_set_transmission_modes(void);
void sqlite_reset_transmission_modes(int nestlevel);
extern const char *sqlite_get_jointype_name(JoinType jointype);
extern void sqlite_classify_conditions(PlannerInfo *root,
RelOptInfo *baserel,
List *input_conds,
List **remote_conds,
List **local_conds);
/* connection.c headers */
sqlite3 *sqlite_get_connection(ForeignServer *server, bool truncatable);
sqlite3 *sqlite_connect(char *svr_address, char *svr_username, char *svr_password, char *svr_database,
int svr_port, bool svr_sa, char *svr_init_command,
char *ssl_key, char *ssl_cert, char *ssl_ca, char *ssl_capath,
char *ssl_cipher);
void sqlite_cleanup_connection(void);
void sqlite_rel_connection(sqlite3 * conn);
void sqlitefdw_report_error(int elevel, sqlite3_stmt * stmt, sqlite3 * conn, const char *sql, int rc);
void sqlite_cache_stmt(ForeignServer *server, sqlite3_stmt * *stmt);
NullableDatum sqlite_convert_to_pg(Form_pg_attribute att, sqlite3_value * val, AttInMetadata *attinmeta, AttrNumber attnum, int sqlite_value_affinity, int AffinityBehaviourFlags);
void sqlite_bind_sql_var(Form_pg_attribute att, int attnum, Datum value, sqlite3_stmt * stmt, bool *isnull, Oid relid);
extern void sqlite_do_sql_command(sqlite3 * conn, const char *sql, int level, List **busy_connection);
void sqlite_fdw_data_norm_functs_init(sqlite3* db);
/* sqlite_query.c headers */
sqlite3_int64 binstr2int64(const char *s);
bool listed_datatype (const char * tn, const char ** arr);
blobOutput sqlite_datum_to_blob (Datum value);
bool listed_datatype_oid (Oid atttypid, int32 atttypmod, const char **arr);
/* sqlite_gis.c headers */
extern const char *postGisSpecificTypes[];
extern const char *postGisSQLiteCompatibleTypes[];
#ifdef SQLITE_FDW_GIS_ENABLE
char* SpatiaLiteAsPostGISgeom (blobOutput spatiaLiteBlob, Form_pg_attribute att);
blobOutput PostGISgeomAsSpatiaLite (Datum value, Form_pg_attribute att);
void sqlite_deparse_PostGIS_value(StringInfo buf, char *extval);
#endif
#endif /* SQLITE_FDW_H */