Skip to content

Commit

Permalink
Merge pull request #106 from JuliaIO/tan/renameparfile
Browse files Browse the repository at this point in the history
rename ParFile to Parquet.File
  • Loading branch information
tanmaykm authored Oct 21, 2020
2 parents c931a83 + c9598d9 commit 9cc50c8
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 71 deletions.
34 changes: 17 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@

Load a [parquet file](https://en.wikipedia.org/wiki/Apache_Parquet). Only metadata is read initially, data is loaded in chunks on demand. (Note: [ParquetFiles.jl](https://github.com/queryverse/ParquetFiles.jl) also provides load support for Parquet files under the FileIO.jl package.)

`ParFile` represents a Parquet file at `path` open for reading.
`Parquet.File` represents a Parquet file at `path` open for reading.

```
ParFile(path) => ParFile
Parquet.File(path) => Parquet.File
```

`ParFile` keeps a handle to the open file and the file metadata and also holds a weakly referenced cache of page data read. If the parquet file references other files in its metadata, they will be opened as and when required for reading and closed when they are not needed anymore.
`Parquet.File` keeps a handle to the open file and the file metadata and also holds a weakly referenced cache of page data read. If the parquet file references other files in its metadata, they will be opened as and when required for reading and closed when they are not needed anymore.

The `close` method closes the reader, releases open files and makes cached internal data structures available for GC. A `ParFile` instance must not be used once closed.
The `close` method closes the reader, releases open files and makes cached internal data structures available for GC. A `Parquet.File` instance must not be used once closed.

```julia
julia> using Parquet

julia> parfile = "customer.impala.parquet";
julia> filename = "customer.impala.parquet";

julia> p = ParFile(parfile)
julia> parquetfile = Parquet.File(filename)
Parquet file: customer.impala.parquet
version: 1
nrows: 150000
Expand All @@ -34,13 +34,13 @@ Parquet file: customer.impala.parquet
Examine the schema.

```julia
julia> nrows(p)
julia> nrows(parquetfile)
150000

julia> ncols(p)
julia> ncols(parquetfile)
8

julia> colnames(p)
julia> colnames(parquetfile)
8-element Array{Array{String,1},1}:
["c_custkey"]
["c_name"]
Expand All @@ -51,7 +51,7 @@ julia> colnames(p)
["c_mktsegment"]
["c_comment"]

julia> schema(p)
julia> schema(parquetfile)
Schema:
schema {
optional INT64 c_custkey
Expand All @@ -70,7 +70,7 @@ The reader performs logical type conversions automatically for String (from byte
```julia
julia> mapping = Dict(["column_name"] => (String, Parquet.logical_string));

julia> par = ParFile("filename"; map_logical_types=mapping);
julia> parquetfile = Parquet.File("filename"; map_logical_types=mapping);
```

The reader will interpret logical types based on the `map_logical_types` provided. The following logical type mapping methods are available in the Parquet package.
Expand All @@ -86,7 +86,7 @@ Variants of these methods or custom methods can also be applied by caller.
Create cursor to iterate over batches of column values. Each iteration returns a named tuple of column names with batch of column values. Files with nested schemas can not be read with this cursor.

```julia
BatchedColumnsCursor(par::ParFile; kwargs...)
BatchedColumnsCursor(parquetfile::Parquet.File; kwargs...)
```

Cursor options:
Expand All @@ -100,9 +100,9 @@ Example:
```julia
julia> typemap = Dict(["c_name"]=>(String,Parquet.logical_string), ["c_address"]=>(String,Parquet.logical_string));

julia> par = ParFile("customer.impala.parquet"; map_logical_types=typemap);
julia> parquetfile = Parquet.File("customer.impala.parquet"; map_logical_types=typemap);

julia> cc = BatchedColumnsCursor(par)
julia> cc = BatchedColumnsCursor(parquetfile)
Batched Columns Cursor on customer.impala.parquet
rows: 1:150000
batches: 1
Expand Down Expand Up @@ -130,7 +130,7 @@ julia> batchvals.c_name[1:5]
Create cursor to iterate over records. In parallel mode, multiple remote cursors can be created and iterated on in parallel.

```julia
RecordCursor(par::ParFile; kwargs...)
RecordCursor(parquetfile::Parquet.File; kwargs...)
```

Cursor options:
Expand All @@ -142,9 +142,9 @@ Example:
```julia
julia> typemap = Dict(["c_name"]=>(String,Parquet.logical_string), ["c_address"]=>(String,Parquet.logical_string));

julia> p = ParFile("customer.impala.parquet"; map_logical_types=typemap);
julia> parquetfile = Parquet.File("customer.impala.parquet"; map_logical_types=typemap);

julia> rc = RecordCursor(p)
julia> rc = RecordCursor(parquetfile)
Record Cursor on customer.impala.parquet
rows: 1:150000
cols: c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment
Expand Down
2 changes: 1 addition & 1 deletion src/Parquet.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ end

import Base: show, open, close, values, eltype, length

export is_par_file, ParFile, show, nrows, ncols, rowgroups, columns, pages, bytes, values, colname, colnames
export is_par_file, show, nrows, ncols, rowgroups, columns, pages, bytes, values, colname, colnames
export schema
export logical_timestamp, logical_string
export RecordCursor, BatchedColumnsCursor
Expand Down
22 changes: 11 additions & 11 deletions src/cursor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# Each iteration returns the value (as a Union{T,Nothing}), definition level, and repetition level for each value.
# Row can be deduced from repetition level.
mutable struct ColCursor{T}
par::ParFile
par::Parquet.File
colname::Vector{String} # column name (full path in schema)
colnamesym::Vector{Symbol} # column name converted to symbols
required_at::Vector{Bool} # at what level in the schema the column is required
Expand All @@ -34,7 +34,7 @@ mutable struct ColCursor{T}
levelpos::Int64 # current position within levels of the current page
levelend::Int64

function ColCursor{T}(par::ParFile, row_positions::Vector{Int64}, colname::Vector{String}, rows::UnitRange) where T
function ColCursor{T}(par::Parquet.File, row_positions::Vector{Int64}, colname::Vector{String}, rows::UnitRange) where T
colnamesym = [Symbol(name) for name in colname]
sch = schema(par)

Expand All @@ -53,7 +53,7 @@ mutable struct ColCursor{T}
end
end

function ColCursor(par::ParFile, colname::Vector{String}; rows::UnitRange=1:nrows(par), row::Signed=first(rows))
function ColCursor(par::Parquet.File, colname::Vector{String}; rows::UnitRange=1:nrows(par), row::Signed=first(rows))
row_positions = rowgroup_row_positions(par)
@assert last(rows) <= nrows(par)
@assert first(rows) >= 1
Expand Down Expand Up @@ -211,7 +211,7 @@ end
##

mutable struct BatchedColumnsCursor{T}
par::ParFile
par::Parquet.File
colnames::Vector{Vector{String}}
colcursors::Vector{ColCursor}
colstates::Vector{Tuple{Int64,Int64}}
Expand All @@ -229,7 +229,7 @@ end
Create cursor to iterate over batches of column values. Each iteration returns a named tuple of column names with batch of column values. Files with nested schemas can not be read with this cursor.
```julia
BatchedColumnsCursor(par::ParFile; kwargs...)
BatchedColumnsCursor(par::Parquet.File; kwargs...)
```
Cursor options:
Expand All @@ -238,7 +238,7 @@ Cursor options:
- `reusebuffer`: boolean to indicate whether to reuse the buffers with every iteration; if each iteration processes the batch and does not need to refer to the same data buffer again, then setting this to `true` reduces GC pressure and can help significantly while processing large files.
- `use_threads`: whether to use threads while reading the file; applicable only for Julia v1.3 and later and switched on by default if julia processes is started with multiple threads.
"""
function BatchedColumnsCursor(par::ParFile;
function BatchedColumnsCursor(par::Parquet.File;
rows::UnitRange=1:nrows(par),
batchsize::Signed=min(length(rows), first(rowgroups(par)).num_rows),
reusebuffer::Bool=false,
Expand Down Expand Up @@ -354,7 +354,7 @@ end
##

mutable struct RecordCursor{T}
par::ParFile
par::Parquet.File
colnames::Vector{Vector{String}}
colcursors::Vector{ColCursor}
colstates::Vector{Tuple{Int64,Int64}}
Expand All @@ -366,14 +366,14 @@ end
Create cursor to iterate over records. In parallel mode, multiple remote cursors can be created and iterated on in parallel.
```julia
RecordCursor(par::ParFile; kwargs...)
RecordCursor(par::Parquet.File; kwargs...)
```
Cursor options:
- `rows`: the row range to iterate through, all rows by default.
- `colnames`: the column names to retrieve; all by default
"""
function RecordCursor(par::ParFile; rows::UnitRange=1:nrows(par), colnames::Vector{Vector{String}}=colnames(par))
function RecordCursor(par::Parquet.File; rows::UnitRange=1:nrows(par), colnames::Vector{Vector{String}}=colnames(par))
colcursors = [ColCursor(par, colname; rows=rows, row=first(rows)) for colname in colnames]
sch = schema(par)
rectype = ntelemtype(sch, sch.schema[1])
Expand Down Expand Up @@ -430,7 +430,7 @@ default_init(::Type{Vector{T}}) where {T} = Vector{T}()
default_init(::Type{Dict{Symbol,Any}}) = Dict{Symbol,Any}()
default_init(::Type{T}) where {T} = ccall(:jl_new_struct_uninit, Any, (Any,), T)::T

function update_record(par::ParFile, row::Dict{Symbol,Any}, colid::Int, colcursor::ColCursor, colcursor_state::Tuple{Int64,Int64}, col_repeat_state::Dict{Tuple{Int,Int},Int})
function update_record(par::Parquet.File, row::Dict{Symbol,Any}, colid::Int, colcursor::ColCursor, colcursor_state::Tuple{Int64,Int64}, col_repeat_state::Dict{Tuple{Int,Int},Int})
colpos = colcursor.row
# iterate all repeated values from the column cursor (until it advances to the next row)
while !_done(colcursor, colcursor_state)
Expand All @@ -441,7 +441,7 @@ function update_record(par::ParFile, row::Dict{Symbol,Any}, colid::Int, colcurso
colcursor_state # return new colcursor state
end

function update_record(par::ParFile, row::Dict{Symbol,Any}, colid::Int, colcursor::ColCursor, val, defn_level::Int64, repn_level::Int64, col_repeat_state::Dict{Tuple{Int,Int},Int})
function update_record(par::Parquet.File, row::Dict{Symbol,Any}, colid::Int, colcursor::ColCursor, val, defn_level::Int64, repn_level::Int64, col_repeat_state::Dict{Tuple{Int,Int},Int})
nameparts = colcursor.colname
symnameparts = colcursor.colnamesym
required_at = colcursor.required_at
Expand Down
60 changes: 30 additions & 30 deletions src/reader.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function cacheget(fetcher, lru::PageLRU, chunk::ColumnChunk, startpos::Int64)
end

"""
ParFile(path; map_logical_types) => ParFile
Parquet.File(path; map_logical_types) => Parquet.File
Represents a Parquet file at `path` open for reading. Options to map logical types can be provided via `map_logical_types`.
Expand All @@ -48,44 +48,44 @@ Represents a Parquet file at `path` open for reading. Options to map logical typ
- `true`: default mappings are attempted on all columns (bytearray => String, int96 => DateTime)
- A user supplied dict mapping column names to a tuple of type and a converter function
Returns a `ParFile` type that keeps a handle to the open file and the file metadata and also holds a LRU cache of raw bytes of the pages read.
Returns a `Parquet.File` type that keeps a handle to the open file and the file metadata and also holds a LRU cache of raw bytes of the pages read.
"""
mutable struct ParFile
mutable struct File
path::String
handle::IOStream
meta::FileMetaData
schema::Schema
page_cache::PageLRU
end

function ParFile(path::AbstractString; map_logical_types::Dict=TLogicalTypeMap())
function File(path::AbstractString; map_logical_types::Dict=TLogicalTypeMap())
f = open(path)
try
return ParFile(path, f; map_logical_types=map_logical_types)
return File(path, f; map_logical_types=map_logical_types)
catch ex
close(f)
rethrow(ex)
end
end

function ParFile(path::AbstractString, handle::IOStream; map_logical_types::Dict=TLogicalTypeMap())
function File(path::AbstractString, handle::IOStream; map_logical_types::Dict=TLogicalTypeMap())
is_par_file(handle) || error("Not a parquet format file: $path")
meta_len = metadata_length(handle)
meta = metadata(handle, path, meta_len)
typemap = merge!(TLogicalTypeMap(), map_logical_types)
ParFile(String(path), handle, meta, Schema(meta.schema, typemap), PageLRU())
File(String(path), handle, meta, Schema(meta.schema, typemap), PageLRU())
end

function close(par::ParFile)
function close(par::Parquet.File)
empty!(par.page_cache.refs)
close(par.handle)
end

schema(par::ParFile) = par.schema
schema(par::Parquet.File) = par.schema

colname(par::ParFile, col::ColumnChunk) = colname(metadata(par,col))
colname(par::Parquet.File, col::ColumnChunk) = colname(metadata(par,col))
colname(col::ColumnMetaData) = col.path_in_schema
function colnames(par::ParFile)
function colnames(par::Parquet.File)
names = Vector{Vector{String}}()
cs = Int[]
ns = String[]
Expand All @@ -109,16 +109,16 @@ function colnames(par::ParFile)
names
end

ncols(par::ParFile) = length(colnames(par))
nrows(par::ParFile) = par.meta.num_rows
ncols(par::Parquet.File) = length(colnames(par))
nrows(par::Parquet.File) = par.meta.num_rows

coltype(par::ParFile, col::ColumnChunk) = coltype(metadata(par,col))
coltype(par::Parquet.File, col::ColumnChunk) = coltype(metadata(par,col))
coltype(col::ColumnMetaData) = col._type

# return all rowgroups in the par file
rowgroups(par::ParFile) = par.meta.row_groups
rowgroups(par::Parquet.File) = par.meta.row_groups

function rowgroup_row_positions(par::ParFile)
function rowgroup_row_positions(par::Parquet.File)
rgs = rowgroups(par)
positions = Array{Int64}(undef, length(rgs)+1)
idx = 1
Expand All @@ -129,10 +129,10 @@ function rowgroup_row_positions(par::ParFile)
cumsum!(positions, positions)
end

columns(par::ParFile, rowgroupidx) = columns(par, rowgroups(par)[rowgroupidx])
columns(par::ParFile, rowgroup::RowGroup) = rowgroup.columns
columns(par::ParFile, rowgroup::RowGroup, colname::Vector{String}) = columns(par, rowgroup, [colname])
function columns(par::ParFile, rowgroup::RowGroup, cnames::Vector{Vector{String}})
columns(par::Parquet.File, rowgroupidx) = columns(par, rowgroups(par)[rowgroupidx])
columns(par::Parquet.File, rowgroup::RowGroup) = rowgroup.columns
columns(par::Parquet.File, rowgroup::RowGroup, colname::Vector{String}) = columns(par, rowgroup, [colname])
function columns(par::Parquet.File, rowgroup::RowGroup, cnames::Vector{Vector{String}})
R = ColumnChunk[]
for col in columns(par, rowgroup)
(colname(par,col) in cnames) && push!(R, col)
Expand All @@ -143,12 +143,12 @@ end
##
# Iterator for pages in a column chunk
mutable struct ColumnChunkPages
par::ParFile
par::Parquet.File
col::ColumnChunk
startpos::Int64
endpos::Int64

function ColumnChunkPages(par::ParFile, col::ColumnChunk)
function ColumnChunkPages(par::Parquet.File, col::ColumnChunk)
startpos = page_offset(par, col)
endpos = end_offset(par, col)
new(par, col, startpos, endpos)
Expand Down Expand Up @@ -212,7 +212,7 @@ mutable struct ColumnChunkPageValues{T}
converter_fn::Function
end

function ColumnChunkPageValues(par::ParFile, col::ColumnChunk, ::Type{T}, converter_fn::Function=identity) where {T}
function ColumnChunkPageValues(par::Parquet.File, col::ColumnChunk, ::Type{T}, converter_fn::Function=identity) where {T}
cname = colname(par, col)

max_repn = max_repetition_level(par.schema, cname)
Expand Down Expand Up @@ -338,8 +338,8 @@ end


# column and page metadata
open(par::ParFile, col::ColumnChunk) = open(par.handle, par.path, col)
close(par::ParFile, col::ColumnChunk, io) = (par.handle == io) || close(io)
open(par::Parquet.File, col::ColumnChunk) = open(par.handle, par.path, col)
close(par::Parquet.File, col::ColumnChunk, io) = (par.handle == io) || close(io)
function open(io, path::AbstractString, col::ColumnChunk)
if hasproperty(col, :file_path)
@debug("opening file to read column metadata", file=col.file_path, offset=col.file_offset)
Expand All @@ -363,14 +363,14 @@ function metadata(io, path::AbstractString, col::ColumnChunk)
meta
end

function page_offset(par::ParFile, col::ColumnChunk)
function page_offset(par::Parquet.File, col::ColumnChunk)
colmeta = metadata(par, col)
offset = colmeta.data_page_offset
hasproperty(colmeta, :index_page_offset) && (offset = min(offset, colmeta.index_page_offset))
hasproperty(colmeta, :dictionary_page_offset) && (offset = min(offset, colmeta.dictionary_page_offset))
offset
end
end_offset(par::ParFile, col::ColumnChunk) = page_offset(par, col) + metadata(par,col).total_compressed_size
end_offset(par::Parquet.File, col::ColumnChunk) = page_offset(par, col) + metadata(par,col).total_compressed_size

page_size(page::PageHeader) = hasproperty(page, :compressed_page_size) ? page.compressed_page_size : page.uncompressed_page_size

Expand Down Expand Up @@ -415,10 +415,10 @@ function metadata(io, path::AbstractString, len::Integer=metadata_length(io))
meta
end

metadata(par::ParFile) = par.meta
metadata(par::Parquet.File) = par.meta

#=
function fill_column_metadata(par::ParFile)
function fill_column_metadata(par::Parquet.File)
meta = par.meta
# go through all column chunks and read metadata from file offsets if required
for grp in meta.row_groups
Expand All @@ -429,7 +429,7 @@ function fill_column_metadata(par::ParFile)
end
=#

function metadata(par::ParFile, col::ColumnChunk)
function metadata(par::Parquet.File, col::ColumnChunk)
if !hasproperty(col, :meta_data)
col.meta_data = metadata(par.handle, par.path, col)
end
Expand Down
2 changes: 1 addition & 1 deletion src/show.jl
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ function show(io::IO, meta::FileMetaData, indent::AbstractString="")
hasproperty(meta, :key_value_metadata) && show(io, meta.key_value_metadata, indent)
end

function show(io::IO, par::ParFile)
function show(io::IO, par::Parquet.File)
println(io, "Parquet file: $(par.path)")
meta = par.meta
println(io, " version: $(meta.version)")
Expand Down
Loading

2 comments on commit 9cc50c8

@tanmaykm
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/23436

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.7.0 -m "<description of version>" 9cc50c82043690e2786c9025fcc34bb41572d938
git push origin v0.7.0

Please sign in to comment.