Skip to content

Commit

Permalink
Merge branch 'branch-24.12' into convert_table
Browse files Browse the repository at this point in the history
  • Loading branch information
ttnghia committed Nov 14, 2024
2 parents 74d858c + e5e1603 commit 7a32b6f
Show file tree
Hide file tree
Showing 22 changed files with 1,821 additions and 74 deletions.
4 changes: 4 additions & 0 deletions ci/submodule-sync.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ rapids_cmake_sha=$(git -C ${LIBCUDF_BUILD_PATH}/_deps/rapids-cmake-src/ rev-pars
echo "Update rapids-cmake pinned SHA1 to ${rapids_cmake_sha}"
echo "${rapids_cmake_sha}" > thirdparty/cudf-pins/rapids-cmake.sha

echo "Workaround for https://github.com/NVIDIA/spark-rapids-jni/issues/2582"
cudf_patch_path="cudf/cpp/cmake/thirdparty/patches"
sed -i "s|\${current_json_dir}|\${current_json_dir}/../${cudf_patch_path}|g" thirdparty/cudf-pins/versions.json

# Do the git add after the build so that we get
# the updated versions.json generated by the build
echo "Update cudf submodule to ${cudf_sha} with updated pinned versions"
Expand Down
21 changes: 19 additions & 2 deletions src/main/java/com/nvidia/spark/rapids/jni/Arms.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/**
* This class contains utility methods for automatic resource management.
*/
class Arms {
public class Arms {
/**
* This method close the resource if an exception is thrown while executing the function.
*/
Expand Down Expand Up @@ -54,7 +54,10 @@ public static <R extends AutoCloseable> void closeAll(Iterator<R> resources) {
Throwable t = null;
while (resources.hasNext()) {
try {
resources.next().close();
R resource = resources.next();
if (resource != null) {
resource.close();
}
} catch (Exception e) {
if (t == null) {
t = e;
Expand All @@ -81,4 +84,18 @@ public static <R extends AutoCloseable> void closeAll(R... resources) {
public static <R extends AutoCloseable> void closeAll(Collection<R> resources) {
closeAll(resources.iterator());
}

/**
* This method safely closes the resources after applying the function.
* <br/>
* See {@link #closeAll(Iterator)} for more details.
*/
public static <R extends AutoCloseable, C extends Collection<R>, V> V withResource(
C resource, Function<C, V> function) {
try {
return function.apply(resource);
} finally {
closeAll(resource);
}
}
}
2 changes: 1 addition & 1 deletion src/main/java/com/nvidia/spark/rapids/jni/Pair.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* A utility class for holding a pair of values.
*/
class Pair<K, V> {
public class Pair<K, V> {
private final K left;
private final V right;

Expand Down
14 changes: 14 additions & 0 deletions src/main/java/com/nvidia/spark/rapids/jni/Preconditions.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,18 @@ public static int ensureNonNegative(int value, String name) {
}
return value;
}

/**
* Check if the value is non-negative, otherwise throw an IllegalArgumentException with the given message.
* @param value the value to check
* @param name the name of the value
* @return the value if it is non-negative
* @throws IllegalArgumentException if the value is negative
*/
public static long ensureNonNegative(long value, String name) {
if (value < 0) {
throw new IllegalArgumentException(name + " must be non-negative, but was " + value);
}
return value;
}
}
121 changes: 121 additions & 0 deletions src/main/java/com/nvidia/spark/rapids/jni/kudo/ColumnOffsetInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.jni.kudo;

import ai.rapids.cudf.DeviceMemoryBufferView;

import static com.nvidia.spark.rapids.jni.Preconditions.ensureNonNegative;

/**
* This class is used to store the offsets of the buffer of a column in the serialized data.
*/
class ColumnOffsetInfo {
static final long INVALID_OFFSET = -1L;
private final long validity;
private final long validityBufferLen;
private final long offset;
private final long offsetBufferLen;
private final long data;
private final long dataBufferLen;

public ColumnOffsetInfo(long validity, long validityBufferLen, long offset, long offsetBufferLen, long data,
long dataBufferLen) {
ensureNonNegative(validityBufferLen, "validityBuffeLen");
ensureNonNegative(offsetBufferLen, "offsetBufferLen");
ensureNonNegative(dataBufferLen, "dataBufferLen");
this.validity = validity;
this.validityBufferLen = validityBufferLen;
this.offset = offset;
this.offsetBufferLen = offsetBufferLen;
this.data = data;
this.dataBufferLen = dataBufferLen;
}

/**
* Get the validity buffer offset.
* @return {@value #INVALID_OFFSET} if the validity buffer is not present, otherwise the offset.
*/
long getValidity() {
return validity;
}

/**
* Get a view of the validity buffer from underlying buffer.
* @param baseAddress the base address of underlying buffer.
* @return null if the validity buffer is not present, otherwise a view of the buffer.
*/
DeviceMemoryBufferView getValidityBuffer(long baseAddress) {
if (validity == INVALID_OFFSET) {
return null;
}
return new DeviceMemoryBufferView(validity + baseAddress, validityBufferLen);
}

/**
* Get the offset buffer offset.
* @return {@value #INVALID_OFFSET} if the offset buffer is not present, otherwise the offset.
*/
long getOffset() {
return offset;
}

/**
* Get a view of the offset buffer from underlying buffer.
* @param baseAddress the base address of underlying buffer.
* @return null if the offset buffer is not present, otherwise a view of the buffer.
*/
DeviceMemoryBufferView getOffsetBuffer(long baseAddress) {
if (offset == INVALID_OFFSET) {
return null;
}
return new DeviceMemoryBufferView(offset + baseAddress, offsetBufferLen);
}

/**
* Get the data buffer offset.
* @return {@value #INVALID_OFFSET} if the data buffer is not present, otherwise the offset.
*/
long getData() {
return data;
}

/**
* Get a view of the data buffer from underlying buffer.
* @param baseAddress the base address of underlying buffer.
* @return null if the data buffer is not present, otherwise a view of the buffer.
*/
DeviceMemoryBufferView getDataBuffer(long baseAddress) {
if (data == INVALID_OFFSET) {
return null;
}
return new DeviceMemoryBufferView(data + baseAddress, dataBufferLen);
}

long getDataBufferLen() {
return dataBufferLen;
}

@Override
public String toString() {
return "ColumnOffsets{" +
"validity=" + validity +
", offset=" + offset +
", data=" + data +
", dataLen=" + dataBufferLen +
'}';
}
}
66 changes: 66 additions & 0 deletions src/main/java/com/nvidia/spark/rapids/jni/kudo/ColumnViewInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.jni.kudo;

import ai.rapids.cudf.*;

import java.util.Optional;

import static com.nvidia.spark.rapids.jni.Preconditions.ensureNonNegative;

class ColumnViewInfo {
private final DType dtype;
private final ColumnOffsetInfo offsetInfo;
private final int nullCount;
private final int rowCount;

public ColumnViewInfo(DType dtype, ColumnOffsetInfo offsetInfo,
int nullCount, int rowCount) {
ensureNonNegative(nullCount, "nullCount");
ensureNonNegative(rowCount, "rowCount");
this.dtype = dtype;
this.offsetInfo = offsetInfo;
this.nullCount = nullCount;
this.rowCount = rowCount;
}

ColumnView buildColumnView(DeviceMemoryBuffer buffer, ColumnView[] childrenView) {
long baseAddress = buffer.getAddress();

if (dtype.isNestedType()) {
return new ColumnView(dtype, rowCount, Optional.of((long)nullCount),
offsetInfo.getValidityBuffer(baseAddress),
offsetInfo.getOffsetBuffer(baseAddress),
childrenView);
} else {
return new ColumnView(dtype, rowCount, Optional.of((long)nullCount),
offsetInfo.getDataBuffer(baseAddress),
offsetInfo.getValidityBuffer(baseAddress),
offsetInfo.getOffsetBuffer(baseAddress));
}
}

@Override
public String toString() {
return "ColumnViewInfo{" +
"dtype=" + dtype +
", offsetInfo=" + offsetInfo +
", nullCount=" + nullCount +
", rowCount=" + rowCount +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.jni.kudo;

import ai.rapids.cudf.*;
import com.nvidia.spark.rapids.jni.Arms;
import com.nvidia.spark.rapids.jni.schema.Visitors;

import java.util.List;

import static com.nvidia.spark.rapids.jni.Preconditions.ensure;
import static java.util.Objects.requireNonNull;

/**
* The result of merging several kudo tables into one contiguous table on the host.
*/
public class KudoHostMergeResult implements AutoCloseable {
private final Schema schema;
private final List<ColumnViewInfo> columnInfoList;
private HostMemoryBuffer hostBuf;

KudoHostMergeResult(Schema schema, HostMemoryBuffer hostBuf, List<ColumnViewInfo> columnInfoList) {
requireNonNull(schema, "schema is null");
requireNonNull(columnInfoList, "columnInfoList is null");
ensure(schema.getFlattenedColumnNames().length == columnInfoList.size(), () ->
"Column offsets size does not match flattened schema size, column offsets size: " + columnInfoList.size() +
", flattened schema size: " + schema.getFlattenedColumnNames().length);
this.schema = schema;
this.columnInfoList = columnInfoList;
this.hostBuf = requireNonNull(hostBuf, "hostBuf is null");
}

@Override
public void close() throws Exception {
hostBuf.close();
hostBuf = null;
}

/**
* Convert the host buffer into a cudf table.
* @return the cudf table
*/
public Table toTable() {
try (DeviceMemoryBuffer deviceMemBuf = DeviceMemoryBuffer.allocate(hostBuf.getLength())) {
if (hostBuf.getLength() > 0) {
deviceMemBuf.copyFromHostBufferAsync(hostBuf, Cuda.DEFAULT_STREAM);
}

try (TableBuilder builder = new TableBuilder(columnInfoList, deviceMemBuf)) {
Table t = Visitors.visitSchema(schema, builder);

Cuda.DEFAULT_STREAM.sync();
return t;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

@Override
public String toString() {
return "HostMergeResult{" +
"columnOffsets=" + columnInfoList +
", hostBuf length =" + hostBuf.getLength() +
'}';
}
}
Loading

0 comments on commit 7a32b6f

Please sign in to comment.