Skip to content

Commit

Permalink
nit: Remove checked allocator in arrow code usages (#6341)
Browse files Browse the repository at this point in the history
* close results in exports in sql resolver

* remove memory.CheckedAllocator from arrow code
  • Loading branch information
k-anshul authored Jan 6, 2025
1 parent 3d38966 commit 54500f6
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 16 deletions.
2 changes: 1 addition & 1 deletion runtime/drivers/blob/parquetreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const _batchSize = int64(1024)
// downloadParquet downloads partial file as per extractOption
func downloadParquet(ctx context.Context, bucket *blob.Bucket, obj *blob.ListObject, option *extractOption, fw *os.File) error {
reader := NewBlobObjectReader(ctx, bucket, obj)
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
mem := memory.DefaultAllocator
props := parquet.NewReaderProperties(mem)

pf, err := file.NewParquetReader(reader, file.WithReadProps(props))
Expand Down
2 changes: 1 addition & 1 deletion runtime/drivers/file/model_executor_olap_self.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func writeParquet(res *drivers.Result, fw io.Writer) error {
fields = append(fields, arrowField)
}
schema := arrow.NewSchema(fields, nil)
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
mem := memory.DefaultAllocator
recordBuilder := array.NewRecordBuilder(mem, schema)
defer recordBuilder.Release()

Expand Down
13 changes: 0 additions & 13 deletions runtime/drivers/snowflake/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,19 +261,6 @@ func (f *fileIterator) Next() ([]string, error) {
return []string{fw.Name()}, nil
}

// Size implements drivers.FileIterator.
func (f *fileIterator) Size(unit drivers.ProgressUnit) (int64, bool) {
switch unit {
case drivers.ProgressUnitFile:
return 1, true
// the number of records is unknown until the end of iteration
case drivers.ProgressUnitRecord:
return f.totalRecords, true
default:
return 0, false
}
}

func (f *fileIterator) Format() string {
return ""
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/queries/metricsview.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func WriteParquet(meta []*runtimev1.MetricsViewColumn, data []*structpb.Struct,
}
schema := arrow.NewSchema(fields, nil)

mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
mem := memory.DefaultAllocator
recordBuilder := array.NewRecordBuilder(mem, schema)
defer recordBuilder.Release()
for _, s := range data {
Expand Down
23 changes: 23 additions & 0 deletions runtime/server/queries_metrics_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func TestServer_MetricsViewRows_parquet_export(t *testing.T) {

f.Write(buf.Bytes())
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
rdr, err := file.OpenParquetFile(f.Name(), false, file.WithReadProps(parquet.NewReaderProperties(mem)))
require.NoError(t, err)
defer rdr.Close()
Expand Down Expand Up @@ -174,90 +175,106 @@ func TestServer_MetricsViewRows_parquet_export(t *testing.T) {
require.Equal(t, "timestamp__day", flds[index].Field.Name)
require.Equal(t, arrow.TIMESTAMP, flds[index].Field.Type.ID())
td := getColumnChunk(tbl, index).(*array.Timestamp)
defer td.Release()
require.Equal(t, "2023-01-01T00:00:00Z", td.Value(0).ToTime(arrow.Microsecond).Format(time.RFC3339))
index++

require.Equal(t, "tint1", flds[index].Field.Name)
require.Equal(t, arrow.INT8, flds[index].Field.Type.ID())
tint1 := getColumnChunk(tbl, index).(*array.Int8)
defer tint1.Release()

require.Equal(t, int8(-1), tint1.Value(0))
index++

require.Equal(t, "tint2", flds[index].Field.Name)
require.Equal(t, arrow.INT16, flds[index].Field.Type.ID())
tint2 := getColumnChunk(tbl, index).(*array.Int16)
defer tint2.Release()
require.Equal(t, int16(-2), tint2.Value(0))
index++

require.Equal(t, "tint4", flds[index].Field.Name)
require.Equal(t, arrow.INT32, flds[index].Field.Type.ID())
tint4 := getColumnChunk(tbl, index).(*array.Int32)
defer tint4.Release()
require.Equal(t, int32(-4), tint4.Value(0))
index++

require.Equal(t, "tint8", flds[index].Field.Name)
require.Equal(t, arrow.INT64, flds[index].Field.Type.ID())
tint8 := getColumnChunk(tbl, index).(*array.Int64)
defer tint8.Release()
require.Equal(t, int64(-8), tint8.Value(0))
index++

require.Equal(t, "tuint1", flds[index].Field.Name)
require.Equal(t, arrow.UINT8, flds[index].Field.Type.ID())
tuint1 := getColumnChunk(tbl, index).(*array.Uint8)
defer tuint1.Release()
require.Equal(t, uint8(1), tuint1.Value(0))
index++

require.Equal(t, "tuint2", flds[index].Field.Name)
require.Equal(t, arrow.UINT16, flds[index].Field.Type.ID())
tuint2 := getColumnChunk(tbl, index).(*array.Uint16)
defer tuint2.Release()
require.Equal(t, uint16(2), tuint2.Value(0))
index++

require.Equal(t, "tuint4", flds[index].Field.Name)
require.Equal(t, arrow.UINT32, flds[index].Field.Type.ID())
tuint4 := getColumnChunk(tbl, index).(*array.Uint32)
defer tuint4.Release()
require.Equal(t, uint32(4), tuint4.Value(0))
index++

require.Equal(t, "tuint8", flds[index].Field.Name)
require.Equal(t, arrow.UINT64, flds[index].Field.Type.ID())
tuint8 := getColumnChunk(tbl, index).(*array.Uint64)
defer tuint8.Release()
require.Equal(t, uint64(8), tuint8.Value(0))
index++

require.Equal(t, "thugeint", flds[index].Field.Name)
require.Equal(t, arrow.FLOAT64, flds[index].Field.Type.ID())
thugeint := getColumnChunk(tbl, index).(*array.Float64)
defer thugeint.Release()
require.Equal(t, float64(1), thugeint.Value(0))
index++

require.Equal(t, "tfloat4", flds[index].Field.Name)
require.Equal(t, arrow.FLOAT32, flds[index].Field.Type.ID())
tfloat4 := getColumnChunk(tbl, index).(*array.Float32)
defer tfloat4.Release()
require.Equal(t, float32(4), tfloat4.Value(0))
index++

require.Equal(t, "tfloat8", flds[index].Field.Name)
require.Equal(t, arrow.FLOAT64, flds[index].Field.Type.ID())
tfloat8 := getColumnChunk(tbl, index).(*array.Float64)
defer tfloat8.Release()
require.Equal(t, float64(8), tfloat8.Value(0))
index++

require.Equal(t, "tdecimal", flds[index].Field.Name)
require.Equal(t, arrow.DECIMAL128, flds[index].Field.Type.ID())
tdecimal := getColumnChunk(tbl, index).(*array.Decimal128)
defer tdecimal.Release()
require.Equal(t, float32(1), tdecimal.Value(0).ToFloat32(3))
index++

require.Equal(t, "tbool", flds[index].Field.Name)
require.Equal(t, arrow.BOOL, flds[index].Field.Type.ID())
tbool := getColumnChunk(tbl, index).(*array.Boolean)
defer tbool.Release()
require.Equal(t, true, tbool.Value(0))
index++

require.Equal(t, "tlist", flds[index].Field.Name)
require.Equal(t, arrow.LIST, flds[index].Field.Type.ID())
tlist := getColumnChunk(tbl, index).(*array.List)
defer tlist.Release()
strList := tlist.ListValues().(*array.String)
require.Equal(t, "a", strList.Value(0))
require.Equal(t, "b", strList.Value(1))
Expand All @@ -266,6 +283,7 @@ func TestServer_MetricsViewRows_parquet_export(t *testing.T) {
require.Equal(t, "tmap", flds[index].Field.Name)
require.Equal(t, arrow.MAP, flds[index].Field.Type.ID())
tmap := getColumnChunk(tbl, index).(*array.Map)
defer tmap.Release()
keys := tmap.Keys().(*array.String)
values := tmap.Items().(*array.Int32)
require.Equal(t, "f1", keys.Value(0))
Expand All @@ -277,6 +295,7 @@ func TestServer_MetricsViewRows_parquet_export(t *testing.T) {
require.Equal(t, "tstruct", flds[index].Field.Name)
require.Equal(t, arrow.STRUCT, flds[index].Field.Type.ID())
tstruct := getColumnChunk(tbl, index).(*array.Struct)
defer tstruct.Release()
fields := tstruct.Field(0).(*array.Int32)
require.Equal(t, int32(1), fields.Value(0))
substruct := tstruct.Field(1).(*array.Struct)
Expand All @@ -287,24 +306,28 @@ func TestServer_MetricsViewRows_parquet_export(t *testing.T) {
require.Equal(t, "timestamp", flds[index].Field.Name)
require.Equal(t, arrow.TIMESTAMP, flds[index].Field.Type.ID())
ttimestamp := getColumnChunk(tbl, index).(*array.Timestamp)
defer ttimestamp.Release()
require.Equal(t, "2023-01-01T00:00:00Z", ttimestamp.Value(0).ToTime(arrow.Microsecond).Format(time.RFC3339))
index++

require.Equal(t, "ttime", flds[index].Field.Name)
require.Equal(t, arrow.TIME64, flds[index].Field.Type.ID())
ttime := getColumnChunk(tbl, index).(*array.Time64)
defer ttime.Release()
require.Equal(t, "12:00:00", ttime.Value(0).ToTime(arrow.Microsecond).Format(time.TimeOnly))
index++

require.Equal(t, "tdate", flds[index].Field.Name)
require.Equal(t, arrow.DATE32, flds[index].Field.Type.ID())
tdate := getColumnChunk(tbl, index).(*array.Date32)
defer tdate.Release()
require.Equal(t, "2023-01-02", tdate.Value(0).FormattedString())
index++

require.Equal(t, "tuuid", flds[index].Field.Name)
require.Equal(t, arrow.FIXED_SIZE_BINARY, flds[index].Field.Type.ID())
tuuid := getColumnChunk(tbl, index).(*array.FixedSizeBinary)
defer tuuid.Release()
require.True(t, len(tuuid.Value(0)) > 0)
}

Expand Down

0 comments on commit 54500f6

Please sign in to comment.