Skip to content

Commit

Permalink
fix UT failures
Browse files Browse the repository at this point in the history
  • Loading branch information
tedxu committed Oct 23, 2024
1 parent 5f8c55c commit e60f422
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 73 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ jobs:
minio/minio server /data
- name: Run tests
run: cd go && go test -v ./...
run: cd go && make test
3 changes: 2 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,5 @@ if (WITH_BENCHMARK)
endif()

configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/milvus-storage.pc.in "${CMAKE_CURRENT_BINARY_DIR}/milvus-storage.pc" @ONLY)
install(FILES "${CMAKE_CURRENT_BINARY_DIR}/milvus-storage.pc" DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
install(FILES "${CMAKE_CURRENT_BINARY_DIR}/milvus-storage.pc" DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
install(FILES "${CMAKE_CURRENT_BINARY_DIR}/libmilvus-storage.dylib" DESTINATION "${CMAKE_INSTALL_LIBDIR}")
10 changes: 3 additions & 7 deletions cpp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,9 @@ endif

build:
mkdir -p build && cd build && \
conan install .. --build=missing --update && \
conan build ..

conan-create:
mkdir -p build && cd build && \
conan create .. --build=missing --update && \
conan install milvus-storage/0.1.0@ --install-folder . -g make
conan install .. --build=missing -u && \
conan build .. && \
conan install .. --install-folder . -g make

debug:
mkdir -p build && cd build && \
Expand Down
19 changes: 11 additions & 8 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@
namespace milvus_storage {

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::string& path,
const std::shared_ptr<arrow::Schema> schema,
const int64_t buffer_size)
: PackedRecordBatchReader(fs, std::vector<std::string>{path + "/0"}, schema, std::vector<ColumnOffset>(), std::set<int>(), buffer_size) {

}
const std::string& path,
const std::shared_ptr<arrow::Schema> schema,
const int64_t buffer_size)
: PackedRecordBatchReader(
fs, std::vector<std::string>{path}, schema, std::vector<ColumnOffset>(), std::set<int>(), buffer_size) {}

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<std::string>& paths,
Expand All @@ -56,7 +55,7 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
auto offsets = std::vector<ColumnOffset>(column_offsets);
if (column_offsets.empty()) {
for (int i = 0; i < schema->num_fields(); i++) {
offsets.emplace_back(i, 0);
offsets.emplace_back(0, i);
}
}
std::set<int> needed_paths;
Expand All @@ -67,13 +66,18 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
for (auto i : needed_paths) {
auto result = MakeArrowFileReader(fs, paths[i]);
if (!result.ok()) {
LOG_STORAGE_ERROR_ << "Error making file reader " << i << ":" << result.status().ToString();
throw std::runtime_error(result.status().ToString());
}
file_readers_.emplace_back(std::move(result.value()));
}

for (int i = 0; i < file_readers_.size(); ++i) {
auto metadata = file_readers_[i]->parquet_reader()->metadata()->key_value_metadata()->Get(ROW_GROUP_SIZE_META_KEY);
if (!metadata.ok()) {
LOG_STORAGE_ERROR_ << "metadata not found in file " << i;
throw std::runtime_error(metadata.status().ToString());
}
row_group_sizes_.push_back(PackedMetaSerde::deserialize(metadata.ValueOrDie()));
LOG_STORAGE_DEBUG_ << " file " << i << " metadata size: " << file_readers_[i]->parquet_reader()->metadata()->size();
}
Expand Down Expand Up @@ -169,7 +173,6 @@ arrow::Status PackedRecordBatchReader::advanceBuffer() {
}
buffer_available_ -= plan_buffer_size;
row_limit_ = sorted_offsets.top().second;

return arrow::Status::OK();
}

Expand Down
38 changes: 21 additions & 17 deletions cpp/src/packed/reader_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,35 @@
// limitations under the License.

#include "packed/reader_c.h"
#include "common/log.h"
#include "packed/reader.h"
#include "filesystem/fs.h"
#include "common/config.h"

#include <arrow/c/bridge.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/status.h>
#include <iostream>
#include <memory>

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out) {

auto truePath = std::string(path);
auto factory = std::make_shared<milvus_storage::FileSystemFactory>();
auto conf = milvus_storage::StorageConfig();
conf.uri = "file:///tmp/";
auto r = factory->BuildFileSystem(conf, &truePath);
if (!r.ok()) {
return -1;
}
auto trueFs = r.value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto reader = std::make_shared<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, buffer_size);
auto status = ExportRecordBatchReader(reader, out);
if (!status.ok()) {
return -1;
}
return 0;
auto truePath = std::string(path);
auto factory = std::make_shared<milvus_storage::FileSystemFactory>();
auto conf = milvus_storage::StorageConfig();
conf.uri = "file:///tmp/";
auto r = factory->BuildFileSystem(conf, &truePath);
if (!r.ok()) {
LOG_STORAGE_ERROR_ << "Error building filesystem: " << path;
return -2;
}
auto trueFs = r.value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
auto reader = std::make_shared<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, buffer_size);
auto status = ExportRecordBatchReader(reader, out);
LOG_STORAGE_ERROR_ << "read export done";
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Error exporting record batch reader" << status.ToString();
return static_cast<int>(status.code());
}
return 0;
}
13 changes: 8 additions & 5 deletions go/Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
include ../cpp/build/conanbuildinfo.mak

MILVUS_STORAGE_ROOT = $(abspath $(CURDIR)/..)
MILVUS_STORAGE_INCLUDE_DIR = $(abspath $(MILVUS_STORAGE_ROOT)/cpp/include)
MILVUS_STORAGE_LD_DIR = $(abspath $(MILVUS_STORAGE_ROOT)/cpp/build/Release)

CFLAGS += $(CONAN_CFLAGS)
CXXFLAGS += $(CONAN_CXXFLAGS)
INCLUDE_DIRS = $(CONAN_INCLUDE_DIRS_ARROW) $(CONAN_INCLUDE_DIRS_MILVUS-STORAGE)
CPPFLAGS += $(addprefix -I, $(INCLUDE_DIRS))
LDFLAGS += $(addprefix -L, $(CONAN_LIB_DIRS_MILVUS-STORAGE))
INCLUDE_DIRS = $(CONAN_INCLUDE_DIRS_ARROW) $(MILVUS_STORAGE_INCLUDE_DIR)
CPPFLAGS = $(addprefix -I, $(INCLUDE_DIRS))
LDFLAGS += $(addprefix -L, $(MILVUS_STORAGE_LD_DIR))

.EXPORT_ALL_VARIABLES:
.PHONY: build
Expand All @@ -13,8 +17,7 @@ build:
CGO_CFLAGS="$(CPPFLAGS)" CGO_LDFLAGS="$(LDFLAGS)" go build ./...

test:
CGO_CFLAGS="$(CPPFLAGS)" CGO_LDFLAGS="$(LDFLAGS)" go test -timeout 30s ./...

CGO_CFLAGS="$(CPPFLAGS)" CGO_LDFLAGS="$(LDFLAGS) -Wl,-rpath,$(MILVUS_STORAGE_LD_DIR)" go test -timeout 30s ./...
proto:
mkdir -p proto/manifest_proto
mkdir -p proto/schema_proto
Expand Down
2 changes: 1 addition & 1 deletion go/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/milvus-io/milvus-storage/go

go 1.21
go 1.23

toolchain go1.23.1

Expand Down
20 changes: 6 additions & 14 deletions go/packed/packed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package packed

/*
#cgo LDFLAGS: -v -lmilvus-storage -Wl,-rpath,@executable_path/
#cgo LDFLAGS: -lmilvus-storage
#include <stdlib.h>
#include "milvus-storage/packed/reader_c.h"
Expand All @@ -25,36 +25,28 @@ package packed
import "C"
import (
"errors"
"fmt"
"unsafe"

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/arrio"
"github.com/apache/arrow/go/v12/arrow/cdata"
)

type Reader struct {
reader arrio.Reader
}

func Open(path string, schema *arrow.Schema, bufferSize int) (arrio.Reader, error) {
// var cSchemaPtr uintptr
// cSchema := cdata.SchemaFromPtr(cSchemaPtr)
var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas)
casPtr := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))
var caasPtr uintptr
var cass cdata.CArrowArrayStream

cPath := C.CString(path)
defer C.free(unsafe.Pointer(cPath))
status := C.Open(cPath, casPtr, C.int64_t(bufferSize), (*C.struct_ArrowArrayStream)(unsafe.Pointer(caasPtr)))
status := C.Open(cPath, casPtr, C.int64_t(bufferSize), (*C.struct_ArrowArrayStream)(unsafe.Pointer(&cass)))
if status != 0 {
return nil, errors.New("failed to open")
return nil, errors.New(fmt.Sprintf("failed to open file: %s, status: %d", path, status))
}

reader := cdata.ImportCArrayStream((*cdata.CArrowArrayStream)(unsafe.Pointer(caasPtr)), schema)
reader := cdata.ImportCArrayStream((*cdata.CArrowArrayStream)(unsafe.Pointer(&cass)), schema)
return reader, nil
}

func (r *Reader) Read() (arrow.Record, error) {
return r.reader.Read()
}
39 changes: 20 additions & 19 deletions go/packed/packed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,54 +15,55 @@
package packed

import (
"os"
"testing"

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"github.com/stretchr/testify/assert"
)

func TestRead(t *testing.T) {
schema := arrow.NewSchema([]arrow.Field{
{Name: "a", Type: arrow.PrimitiveTypes.Int64},
{Name: "b", Type: arrow.BinaryTypes.String},
{Name: "a", Type: arrow.PrimitiveTypes.Int32},
{Name: "b", Type: arrow.PrimitiveTypes.Int64},
{Name: "c", Type: arrow.BinaryTypes.String},
}, nil)

b := array.NewRecordBuilder(memory.DefaultAllocator, schema)
defer b.Release()
for idx := range schema.Fields() {
switch idx {
case 0:
b.Field(idx).(*array.Int64Builder).AppendValues(
[]int64{int64(1), int64(2), int64(3)}, nil,
b.Field(idx).(*array.Int32Builder).AppendValues(
[]int32{int32(1), int32(2), int32(3)}, nil,
)
case 1:
b.Field(idx).(*array.Int64Builder).AppendValues(
[]int64{int64(4), int64(5), int64(6)}, nil,
)
case 2:
b.Field(idx).(*array.StringBuilder).AppendValues(
[]string{"a", "b", "c"}, nil,
)
}
}
rec := b.NewRecord()
//rec := b.NewRecord()

path := "/tmp/test.parquet"
file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0666)
assert.NoError(t, err)
writer, err := pqarrow.NewFileWriter(schema, file, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
assert.NoError(t, err)
err = writer.Write(rec)
assert.NoError(t, err)
err = writer.Close()
assert.NoError(t, err)
path := "testdata/0"
// file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0666)
// assert.NoError(t, err)
// writer, err := pqarrow.NewFileWriter(schema, file, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
// assert.NoError(t, err)
// err = writer.Write(rec)
// assert.NoError(t, err)
// err = writer.Close()
// assert.NoError(t, err)

reader, err := Open(path, schema, 10*1024*1024 /* 10MB */)
assert.NoError(t, err)

rr, err := reader.Read()
assert.NoError(t, err)
defer rr.Release()
assert.Equal(t, rec.NumRows(), rr.NumRows())
assert.Equal(t, int64(300), rr.NumRows())
}
Binary file added go/packed/testdata/0
Binary file not shown.

0 comments on commit e60f422

Please sign in to comment.