Skip to content

Commit

Permalink
- Add 'group()' built-in function to DaphneDSL.
Browse files Browse the repository at this point in the history
 - This built-in function creates a GroupOp in DaphneIR.
 - Only support 'SUM' as an aggregation function.
 - Get only one aggregation column.
 - Get an arbitrary number of columns to group on.
- Add support for string values in the 'group' kernel function.
  - SUM, MIN, and MAX are the only aggregation functions applied to string columns.
  - Other aggregation functions throw an exception if they receive strings as arguments or results.
  - Additionally, 'DeduceValueTypeAndExecute' cannot handle string values due to unsupported operations on strings.
  - Therefore, 'ColumnGroupAggStringVTArg' that is specialized for strings is used.
  - Or 'ColumnGroupAgg' is called exclusively with string values.
- The 'group' function internally calls the 'order' and 'extractCol' kernel functions.
  - These two functions are updated to handle string values correctly.
- Added script-level test cases to validate the new functionality.
- Close issue #903
  • Loading branch information
saminbassiri committed Nov 20, 2024
1 parent e80a5e6 commit 02dd2cf
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 40 deletions.
35 changes: 35 additions & 0 deletions src/parser/daphnedsl/DaphneDSLBuiltins.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,41 @@ antlrcpp::Any DaphneDSLBuiltins::build(mlir::Location loc, const std::string &fu
utils.matrixOfSizeType, lhs, rhs, lhsOn, rhsOn, rhsAgg)
.getResults();
}
if (func == "group") {
// arbitrary number of columns to group on.
// A single column to calculate the sum on.
checkNumArgsMin(loc, func, numArgs, 3);
mlir::Value currentFrame = args[0];
mlir::Value aggCol = args[1];
std::vector<mlir::Value> groupName;
std::vector<mlir::Value> columnName;
std::vector<mlir::Type> colTypes;

// set aggregaton function to SUM
auto aggFunc = static_cast<mlir::Attribute>(
mlir::daphne::GroupEnumAttr::get(builder.getContext(), mlir::daphne::GroupEnum::SUM));
std::vector<mlir::Attribute> functionName;
functionName.push_back(aggFunc);

// get group columns
for (size_t i = 2; i < numArgs; i++) {
groupName.push_back(args[i]);
}

// get agg column
columnName.push_back(aggCol);

// result column types
mlir::Type vt = utils.unknownType;
for (size_t i = 0; i < groupName.size() + columnName.size(); i++) {
std::cout << "4";
colTypes.push_back(vt);
}

return static_cast<mlir::Value>(builder.create<GroupOp>(loc, FrameType::get(builder.getContext(), colTypes),
currentFrame, groupName, columnName,
builder.getArrayAttr(functionName)));
}

// ********************************************************************
// Frame label manipulation
Expand Down
27 changes: 17 additions & 10 deletions src/runtime/local/kernels/ExtractRow.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,23 @@ template <typename VTSel> struct ExtractRow<Frame, Frame, VTSel> {
throw std::out_of_range(errMsg.str());
}
for (size_t c = 0; c < numCols; c++) {
// We always copy in units of 8 bytes (uint64_t). If the
// actual element size is lower, the superfluous bytes will
// be overwritten by the next match. With this approach, we
// do not need to call memcpy for each element, nor
// interpret the types for a L/S of fitting size.
// TODO Don't multiply by elementSize, but left-shift by
// ld(elementSize).
*reinterpret_cast<uint64_t *>(resCols[c]) =
*reinterpret_cast<const uint64_t *>(argCols[c] + pos * elementSizes[c]);
resCols[c] += elementSizes[c];
if (schema[c] == ValueTypeCode::STR) {
// Handle std::string column
*reinterpret_cast<std::string *>(resCols[c]) =
*reinterpret_cast<const std::string *>(argCols[c] + pos * elementSizes[c]);
resCols[c] += elementSizes[c];
} else {
// We always copy in units of 8 bytes (uint64_t). If the
// actual element size is lower, the superfluous bytes will
// be overwritten by the next match. With this approach, we
// do not need to call memcpy for each element, nor
// interpret the types for a L/S of fitting size.
// TODO Don't multiply by elementSize, but left-shift by
// ld(elementSize).
*reinterpret_cast<uint64_t *>(resCols[c]) =
*reinterpret_cast<const uint64_t *>(argCols[c] + pos * elementSizes[c]);
resCols[c] += elementSizes[c];
}
}
}
res->shrinkNumRows(numRowsSel);
Expand Down
100 changes: 76 additions & 24 deletions src/runtime/local/kernels/Group.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,84 @@ void group(DT *&res, const DT *arg, const char **keyCols, size_t numKeyCols, con
// Frame <- Frame
// ----------------------------------------------------------------------------

inline std::string myStringifyGroupEnum(mlir::daphne::GroupEnum val) {
using mlir::daphne::GroupEnum;
switch (val) {
case GroupEnum::COUNT:
return "COUNT";
case GroupEnum::SUM:
return "SUM";
case GroupEnum::MIN:
return "MIN";
case GroupEnum::MAX:
return "MAX";
case GroupEnum::AVG:
return "AVG";
}
return "";
}

// returns the result of the aggregation function aggFunc over the (contiguous)
// memory between the begin and end pointer
template <typename VTRes, typename VTArg>
VTRes aggregate(const mlir::daphne::GroupEnum &aggFunc, const VTArg *begin, const VTArg *end) {
using mlir::daphne::GroupEnum;
switch (aggFunc) {
case GroupEnum::COUNT:
return end - begin;
if constexpr (std::is_same<VTRes, std::string>::value)
throw std::invalid_argument(std::string("aggregate: ") + myStringifyGroupEnum(aggFunc) +
std::string(" aggregation is not supported for these value types."));
else
return end - begin;
break; // TODO: Do we need to check for Null elements here?
case GroupEnum::SUM:
return std::accumulate(begin, end, (VTRes)0);
if constexpr ((std::is_same<VTRes, std::string>::value) || (std::is_same<VTArg, std::string>::value))
throw std::invalid_argument(std::string("aggregate: ") + myStringifyGroupEnum(aggFunc) +
std::string(" aggregation is not supported for these value types."));
else
return std::accumulate(begin, end, (VTRes)0);
break;
case GroupEnum::MIN:
return *std::min_element(begin, end);
if constexpr ((std::is_same<VTRes, std::string>::value) || (std::is_same<VTArg, std::string>::value))
throw std::invalid_argument(std::string("aggregate: ") + myStringifyGroupEnum(aggFunc) +
std::string(" aggregation is not supported for these value types."));
else
return *std::min_element(begin, end);
break;
case GroupEnum::MAX:
return *std::max_element(begin, end);
if constexpr ((std::is_same<VTRes, std::string>::value) || (std::is_same<VTArg, std::string>::value))
throw std::invalid_argument(std::string("aggregate: ") + myStringifyGroupEnum(aggFunc) +
std::string(" aggregation is not supported for these value types."));
else
return *std::max_element(begin, end);
break;
case GroupEnum::AVG:
return std::accumulate(begin, end, (double)0) / (double)(end - begin);
if constexpr ((std::is_same<VTRes, std::string>::value) || (std::is_same<VTArg, std::string>::value))
throw std::invalid_argument(std::string("aggregate: ") + myStringifyGroupEnum(aggFunc) +
std::string(" aggregation is not supported for these value types."));
else
return std::accumulate(begin, end, (double)0) / (double)(end - begin);
break;
default:
return *begin;
if constexpr (std::is_same<VTArg, std::string>::value || std::is_same<VTRes, std::string>::value)
throw std::invalid_argument("aggregate: Unsupported aggregation operation for string types.");
else
return *begin;
break;
}
}

template <>
std::string aggregate(const mlir::daphne::GroupEnum &aggFunc, const std::string *begin, const std::string *end) {
using mlir::daphne::GroupEnum;
if (aggFunc == GroupEnum::MIN)
return *std::min_element(begin, end);
if (aggFunc == GroupEnum::MAX)
return *std::max_element(begin, end);
else
return *begin;
}

// struct which calls the aggregate() function (specified via aggFunc) on each
// duplicate group in the groups vector and on all implied single groups for a
// sepcified column (colIdx) of the argument frame (arg) and stores the result
Expand Down Expand Up @@ -117,22 +168,14 @@ template <typename VTRes, typename VTArg> struct ColumnGroupAgg {
}
};

inline std::string myStringifyGroupEnum(mlir::daphne::GroupEnum val) {
using mlir::daphne::GroupEnum;
switch (val) {
case GroupEnum::COUNT:
return "COUNT";
case GroupEnum::SUM:
return "SUM";
case GroupEnum::MIN:
return "MIN";
case GroupEnum::MAX:
return "MAX";
case GroupEnum::AVG:
return "AVG";
// Since DeduceValueTypeAndExecute can not handle string values,
// we add special ColumnGroupAgg function for arg with std::string values.
template <typename VTRes> struct ColumnGroupAggStringVTArg {
static void apply(Frame *res, const Frame *arg, size_t colIdx, std::vector<std::pair<size_t, size_t>> *groups,
mlir::daphne::GroupEnum aggFunc, DCTX(ctx)) {
ColumnGroupAgg<VTRes, std::string>::apply(res, arg, colIdx, groups, aggFunc, ctx);
}
return "";
}
};

template <> struct Group<Frame> {
static void apply(Frame *&res, const Frame *arg, const char **keyCols, size_t numKeyCols, const char **aggCols,
Expand Down Expand Up @@ -270,9 +313,18 @@ template <> struct Group<Frame> {

// copying key columns and column-wise group aggregation
for (size_t i = 0; i < numColsRes; i++) {
DeduceValueTypeAndExecute<ColumnGroupAgg>::apply(
res->getSchema()[i], ordered->getSchema()[i], res, ordered, i, groups,
(i < numKeyCols) ? (GroupEnum)0 : aggFuncs[i - numKeyCols], ctx);
if (ordered->getSchema()[i] == ValueTypeCode::STR) {
if (ordered->getSchema()[i] == ValueTypeCode::STR)
ColumnGroupAgg<std::string, std::string>::apply(
res, ordered, i, groups, (i < numKeyCols) ? (GroupEnum)0 : aggFuncs[i - numKeyCols], ctx);
else
DeduceValueTypeAndExecute<ColumnGroupAggStringVTArg>::apply(
res->getSchema()[i], res, ordered, i, groups,
(i < numKeyCols) ? (GroupEnum)0 : aggFuncs[i - numKeyCols], ctx);
} else
DeduceValueTypeAndExecute<ColumnGroupAgg>::apply(
res->getSchema()[i], ordered->getSchema()[i], res, ordered, i, groups,
(i < numKeyCols) ? (GroupEnum)0 : aggFuncs[i - numKeyCols], ctx);
}
delete groups;
DataObjectFactory::destroy(ordered);
Expand Down
21 changes: 15 additions & 6 deletions src/runtime/local/kernels/Order.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,20 +197,29 @@ struct OrderFrame {

if (numColIdxs > 1) {
for (size_t i = 0; i < numColIdxs - 1; i++) {
DeduceValueTypeAndExecute<MultiColumnIDSort>::apply(arg->getSchema()[colIdxs[i]], arg, idx, groups,
ascending[i], colIdxs[i], ctx);
if (arg->getSchema()[colIdxs[i]] == ValueTypeCode::STR)
MultiColumnIDSort<std::string>::apply(arg, idx, groups, ascending[i], colIdxs[i], ctx);
else
DeduceValueTypeAndExecute<MultiColumnIDSort>::apply(arg->getSchema()[colIdxs[i]], arg, idx, groups,
ascending[i], colIdxs[i], ctx);
}
}

// efficient last sort pass OR finalizing the groups vector for further
// use
size_t colIdx = colIdxs[numColIdxs - 1];
if (groupsRes == nullptr) {
DeduceValueTypeAndExecute<ColumnIDSort>::apply(arg->getSchema()[colIdx], arg, idx, groups,
ascending[numColIdxs - 1], colIdx, ctx);
if (arg->getSchema()[colIdx] == ValueTypeCode::STR)
ColumnIDSort<std::string>::apply(arg, idx, groups, ascending[numColIdxs - 1], colIdx, ctx);
else
DeduceValueTypeAndExecute<ColumnIDSort>::apply(arg->getSchema()[colIdx], arg, idx, groups,
ascending[numColIdxs - 1], colIdx, ctx);
} else {
DeduceValueTypeAndExecute<MultiColumnIDSort>::apply(arg->getSchema()[colIdx], arg, idx, groups,
ascending[numColIdxs - 1], colIdx, ctx);
if (arg->getSchema()[colIdx] == ValueTypeCode::STR)
MultiColumnIDSort<std::string>::apply(arg, idx, groups, ascending[numColIdxs - 1], colIdx, ctx);
else
DeduceValueTypeAndExecute<MultiColumnIDSort>::apply(arg->getSchema()[colIdx], arg, idx, groups,
ascending[numColIdxs - 1], colIdx, ctx);
groupsRes->insert(groupsRes->end(), groups.begin(), groups.end());
}
}
Expand Down
1 change: 1 addition & 0 deletions test/api/cli/operations/OperationsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ MAKE_TEST_CASE("createFrame", 1)
MAKE_TEST_CASE("ctable", 1)
MAKE_TEST_CASE("fill", 1)
MAKE_TEST_CASE("gemv", 1)
MAKE_TEST_CASE("groupSum", 1)
MAKE_TEST_CASE("idxMax", 1)
MAKE_TEST_CASE("idxMin", 1)
MAKE_TEST_CASE("isNan", 1)
Expand Down
8 changes: 8 additions & 0 deletions test/api/cli/operations/groupSum_1.daphne
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// constant folding of casts between unsigned, signed, signless types

fr = createFrame([ "String1", "String1", "String2", "String3", "String3" ], [ 1, 1, 2, 3, 4 ],
[ 100, 200, 300, 400, 500 ], "a", "b", "c");

res = group(fr, "c", "a", "b");

print(res);
5 changes: 5 additions & 0 deletions test/api/cli/operations/groupSum_1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Frame(4x3, [a:std::string, b:int64_t, SUM(c):int64_t])
String1 1 300
String2 2 300
String3 3 400
String3 4 500

0 comments on commit 02dd2cf

Please sign in to comment.