diff --git a/src/backend/cdb/cdbparquetrowgroup.c b/src/backend/cdb/cdbparquetrowgroup.c index 743815d831..d0959119db 100644 --- a/src/backend/cdb/cdbparquetrowgroup.c +++ b/src/backend/cdb/cdbparquetrowgroup.c @@ -211,106 +211,90 @@ ParquetRowGroupReader_ScanNextTuple( TupleTableSlot *slot) { Assert(slot); - while (rowGroupReader->rowRead < rowGroupReader->rowCount) - { - /* - * get the next item (tuple) from the row group - */ - rowGroupReader->rowRead++; + int natts = slot->tts_tupleDescriptor->natts; + Assert(natts <= tupDesc->natts); - int natts = slot->tts_tupleDescriptor->natts; - Assert(natts <= tupDesc->natts); + bool useBloomFilter = false; + List *joinKeyAtts = NIL; + List *nonJoinKeyAtts = NIL; + List *allAtts = NIL; - Datum *values = slot_get_values(slot); - bool *nulls = slot_get_isnull(slot); + /* prepare data structure to sperate join keys from other attributes */ + if (rfState != NULL && rfState->hasRuntimeFilter && !rfState->stopRuntimeFilter) + { + useBloomFilter = true; - int colReaderIndex = 0; - for (int i = 0; i < natts; i++) + /* find out attributes in hash join key */ + ListCell *hk; + foreach(hk, rfState->joinkeys) { - if (projs[i] == false) - { - nulls[i] = true; - continue; - } - ParquetColumnReader *nextReader = - &rowGroupReader->columnReaders[colReaderIndex]; - int hawqTypeID = tupDesc->attrs[i]->atttypid; + AttrNumber attrno = (AttrNumber) lfirst(hk); + lappend_int(joinKeyAtts, attrno - 1); + } + } - if (hawqAttrToParquetColNum[i] == 1) - { - ParquetColumnReader_readValue(nextReader, &values[i], &nulls[i], - hawqTypeID); - } - else - { - /* - * Because there are some memory reused inside the whole column reader, so need - * to switch the context from PerTupleContext to rowgroup->context - */ - MemoryContext oldContext = MemoryContextSwitchTo( - rowGroupReader->memoryContext); - - switch (hawqTypeID) { - case HAWQ_TYPE_POINT: - ParquetColumnReader_readPoint(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_PATH: - ParquetColumnReader_readPATH(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_LSEG: - ParquetColumnReader_readLSEG(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_BOX: - ParquetColumnReader_readBOX(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_CIRCLE: - ParquetColumnReader_readCIRCLE(nextReader, &values[i], &nulls[i]); - break; - case HAWQ_TYPE_POLYGON: - ParquetColumnReader_readPOLYGON(nextReader, &values[i], &nulls[i]); - break; - default: - /* TODO array type */ - /* TODO UDT */ - Insist(false); - break; - } - - MemoryContextSwitchTo(oldContext); - } + /* find out attributes not in hash join keys */ + for (int i = 0; i < natts; i++) + { + lappend_int(allAtts, i); - colReaderIndex += hawqAttrToParquetColNum[i]; + if(joinKeyAtts != NIL && list_find_int(joinKeyAtts, i) < 0) + { + lappend_int(nonJoinKeyAtts, i); } + } + + while (rowGroupReader->rowRead < rowGroupReader->rowCount) + { + + /* + * get the next item (tuple) from the row group + */ + rowGroupReader->rowRead++; - if (rfState != NULL && rfState->hasRuntimeFilter - && !rfState->stopRuntimeFilter) + /* + * In case using BloomFilter, we first fetch those columns in hash join keys, + * then check whether their hash values contained by bloomfilter. If negative, + * we skip following columns reading and decoding to speed up. + */ + if (useBloomFilter) { - Assert(rfState->bloomfilter != NULL); + ParquetRowGroupReader_ScanNextTupleColumns(tupDesc, rowGroupReader, + hawqAttrToParquetColNum, projs, slot, joinKeyAtts); + + Datum *values = slot_get_values(slot); uint32_t hashkey = 0; + ListCell *hk; int i = 0; - foreach(hk, rfState->joinkeys) + foreach(hk, joinKeyAtts) { - AttrNumber attrno = (AttrNumber) lfirst(hk); Datum keyval; uint32 hkey; /* rotate hashkey left 1 bit at each step */ hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); - keyval = values[attrno - 1]; + keyval = values[lfirst_int(hk)]; /* Evaluate expression */ hkey = DatumGetUInt32( - FunctionCall1(&rfState->hashfunctions[i], keyval)); + FunctionCall1(&rfState->hashfunctions[i++], keyval)); hashkey ^= hkey; - i++; } if (!FindBloomFilter(rfState->bloomfilter, hashkey)) { continue; } + + ParquetRowGroupReader_ScanNextTupleColumns(tupDesc, rowGroupReader, + hawqAttrToParquetColNum, projs, slot, nonJoinKeyAtts); + } + else + { + ParquetRowGroupReader_ScanNextTupleColumns(tupDesc, rowGroupReader, + hawqAttrToParquetColNum, projs, slot, allAtts); } /*construct tuple, and return back*/ @@ -322,6 +306,91 @@ ParquetRowGroupReader_ScanNextTuple( return false; } +/* + * Get specified attributes of a tuple from current row group into slot. + */ +void +ParquetRowGroupReader_ScanNextTupleColumns( + TupleDesc tupDesc, + ParquetRowGroupReader *rowGroupReader, + int *hawqAttrToParquetColNum, + bool *projs, + TupleTableSlot *slot, + List *attsList) +{ + int natts = slot->tts_tupleDescriptor->natts; + Assert(natts <= tupDesc->natts); + + Datum *values = slot_get_values(slot); + bool *nulls = slot_get_isnull(slot); + + int colReaderIndex = 0; + for(int i = 0; i < natts; i++) + { + /* it is not expensive to do twice in case of bloomfilter */ + if(projs[i] == false) + { + nulls[i] = true; + continue; + } + + /* skip those attributes not in given list */ + if (attsList != NIL && list_find_int(attsList, i) < 0) + { + colReaderIndex += hawqAttrToParquetColNum[i]; + continue; + } + + ParquetColumnReader *nextReader = + &rowGroupReader->columnReaders[colReaderIndex]; + int hawqTypeID = tupDesc->attrs[i]->atttypid; + + if (hawqAttrToParquetColNum[i] == 1) + { + ParquetColumnReader_readValue(nextReader, &values[i], &nulls[i], hawqTypeID); + } + else + { + /* + * Because there are some memory reused inside the whole column reader, so need + * to switch the context from PerTupleContext to rowgroup->context + */ + MemoryContext oldContext = MemoryContextSwitchTo( + rowGroupReader->memoryContext); + + switch (hawqTypeID) { + case HAWQ_TYPE_POINT: + ParquetColumnReader_readPoint(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_PATH: + ParquetColumnReader_readPATH(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_LSEG: + ParquetColumnReader_readLSEG(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_BOX: + ParquetColumnReader_readBOX(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_CIRCLE: + ParquetColumnReader_readCIRCLE(nextReader, &values[i], &nulls[i]); + break; + case HAWQ_TYPE_POLYGON: + ParquetColumnReader_readPOLYGON(nextReader, &values[i], &nulls[i]); + break; + default: + /* TODO array type */ + /* TODO UDT */ + Insist(false); + break; + } + + MemoryContextSwitchTo(oldContext); + } + + colReaderIndex += hawqAttrToParquetColNum[i]; + } +} + /** * finish scanning row group, but keeping the structure palloced */ diff --git a/src/include/cdb/cdbparquetrowgroup.h b/src/include/cdb/cdbparquetrowgroup.h index 4f5ab7a4b7..1d94bc1484 100644 --- a/src/include/cdb/cdbparquetrowgroup.h +++ b/src/include/cdb/cdbparquetrowgroup.h @@ -75,6 +75,16 @@ ParquetRowGroupReader_ScanNextTuple( RuntimeFilterState *rfState, TupleTableSlot *slot); +/* Get specified attributes of a tuple into slot*/ +void +ParquetRowGroupReader_ScanNextTupleColumns( + TupleDesc pqs_tupDesc, + ParquetRowGroupReader *rowGroupReader, + int *hawqAttrToParquetColNum, + bool *projs, + TupleTableSlot *slot, + List *attsList); + /* Finish scanning current row group*/ void ParquetRowGroupReader_FinishedScanRowGroup(