Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for reading parquet file thanks to arrow-dataset #576 #577

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

fb64
Copy link
Contributor

@fb64 fb64 commented Jan 28, 2024

Fixes #576

  • Using arrow-dataset to read parquet file
  • Adding test parquet file generated from a fork of arrow_example that allows to write parquet file
  • Adding arrow-dataset dependency
  • Updating arrow version (from 11 to 15)

@koperagen
Copy link
Collaborator

koperagen commented Jan 30, 2024

Hi and thanks for the PR. I have nothing to add to the code. But i get this exception trying to run the test on Linux with both JDK 11 and 17. The issue seems to be on Arrow side. Do you know about any requirements for it to work?

java.lang.UnsatisfiedLinkError: /tmp/jnilib-15573607865820834233.tmp: /tmp/jnilib-15573607865820834233.tmp: undefined symbol: _ZTIN6google8protobuf7MessageE
	at java.base/jdk.internal.loader.NativeLibraries.load(Native Method)
	at java.base/jdk.internal.loader.NativeLibraries$NativeLibraryImpl.open(NativeLibraries.java:388)
	at java.base/jdk.internal.loader.NativeLibraries.loadLibrary(NativeLibraries.java:232)
	at java.base/jdk.internal.loader.NativeLibraries.loadLibrary(NativeLibraries.java:174)
	at java.base/java.lang.ClassLoader.loadLibrary(ClassLoader.java:2389)
	at java.base/java.lang.Runtime.load0(Runtime.java:755)
	at java.base/java.lang.System.load(System.java:1953)
	at org.apache.arrow.dataset.jni.JniLoader.load(JniLoader.java:92)
	at org.apache.arrow.dataset.jni.JniLoader.loadRemaining(JniLoader.java:75)
	at org.apache.arrow.dataset.jni.JniLoader.ensureLoaded(JniLoader.java:61)
	at org.apache.arrow.dataset.jni.NativeMemoryPool.createListenable(NativeMemoryPool.java:44)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingImplKt.readArrowDataset(arrowReadingImpl.kt:327)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingKt.readParquet(arrowReading.kt:197)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingKt.readParquet$default(arrowReading.kt:194)
	at org.jetbrains.kotlinx.dataframe.io.ArrowKtTest.testReadParquet(ArrowKtTest.kt:590)

@fb64
Copy link
Contributor Author

fb64 commented Jan 30, 2024

Hi @koperagen it seems to be a JNI issue, I just checked and it works well both on my MacBook Pro (M1) and on a PC with Windows 10 (intel core i7). What is the processor architecture on your computer ? Normally arrow-dataset dependency provides the required native library but maybe it not fits with your hardware 🤔
Did you launch the tests with gradle ? Only the JVM arg --add-opens java.base/java.nio=ALL-UNNAMED is needed (as is it configured in gradle tasks.test ...)
I'll try to reproduce on linux with docker ....
image

@koperagen
Copy link
Collaborator

koperagen commented Jan 31, 2024

Yes, i do run them with Gradle. Processor is Intel core i7. I tried to run the test on TeamCity, but there it fails on Linux as well :(
Upon inspecting that .so file content i found that this protobuf symbol is indeed undefined which means it's expected to be loaded from another library

0000000000000000         *UND*	0000000000000000              _ZTIN6google8protobuf7MessageE

objdump /tmp/jnilib-11657767653473718381.tmp -x

But the library doesn't have a dependency on any protobuf library, so i assume it could be a linkage error on Arrow side.. maybe? Either this or project needs a dependency on native protobuf somehow

ldd /tmp/jnilib-11657767653473718381.tmp
	linux-vdso.so.1 (0x00007ffd3f689000)
	librt.so.1 => /lib/x86_64-linux-gnu/librt.so.1 (0x00007fe7a31f0000)
	libpthread.so.0 => /lib/x86_64-linux-gnu/libpthread.so.0 (0x00007fe7a31eb000)
	libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe7a31e6000)
	libstdc++.so.6 => /lib/x86_64-linux-gnu/libstdc++.so.6 (0x00007fe79f800000)
	libm.so.6 => /lib/x86_64-linux-gnu/libm.so.6 (0x00007fe7a30ff000)
	libgcc_s.so.1 => /lib/x86_64-linux-gnu/libgcc_s.so.1 (0x00007fe7a30dd000)
	libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe79f400000)
	/lib64/ld-linux-x86-64.so.2 (0x00007fe7a3219000)

@fb64
Copy link
Contributor Author

fb64 commented Jan 31, 2024

Effectively, I also reproduced the issue with docker, downgrading arrow dependency to the version 14.0.2 seems to fix the error. I'll update the PR and check try to dig on arrow side

@fb64 fb64 force-pushed the arrow-read-parquet branch from 3c6e600 to 0dd7498 Compare January 31, 2024 12:58
@koperagen
Copy link
Collaborator

koperagen commented Jan 31, 2024

Can confirm, 14.0.2 works. I tried it, have some requests

  1. Can you clarify what are expected url values?
    Because following code throws an exception
    val df = DataFrame.readParquet(URL("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-07.parquet"))
Exception in thread "main" java.lang.RuntimeException: Unrecognized filesystem type in URI: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-07.parquet
	at org.apache.arrow.dataset.file.JniWrapper.makeFileSystemDatasetFactory(Native Method)
	at org.apache.arrow.dataset.file.FileSystemDatasetFactory.createNative(FileSystemDatasetFactory.java:40)
	at org.apache.arrow.dataset.file.FileSystemDatasetFactory.<init>(FileSystemDatasetFactory.java:31)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingImplKt.readArrowDataset(arrowReadingImpl.kt:328)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingKt.readParquet(arrowReading.kt:197)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingKt.readParquet$default(arrowReading.kt:194)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingImplKt.main(arrowReadingImpl.kt:348)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingImplKt.main(arrowReadingImpl.kt)

Looks like only URL that point to files are valid ones? Can we make this parameter a File then?
DataFrame.readParquet(URL("file:/home/nikita/Downloads/yellow_tripdata_2023-07.parquet"))

  1. I can't read https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-07.parquet and get an exception. Does it work for you?
/arrow/java/dataset/src/main/cpp/jni_util.cc:79: Failed to update reservation while freeing bytes: JNIEnv was not attached to current thread
/tmp/jnilib-17033017462678975899.tmp(+0x1623b38)[0x7fe9e1e23b38]
/tmp/jnilib-17033017462678975899.tmp(_ZN5arrow4util8ArrowLogD1Ev+0xed)[0x7fe9e1e23eed]
... etc

At the same time it reads sample file from tests just fine

@fb64
Copy link
Contributor Author

fb64 commented Jan 31, 2024

Actually URI parsing is done natively by arrow and it supports only few file systems and unfortunately http(s) is not supported yet :

  • local file: file:/filename.parquet
  • AWS S3: s3:/filename.parquet
  • Google Cloud Storage gs:/filename.parquet or gcs:/filename.parquet
  • Hadoop : hdhf:/filename.parquet or viewfs:/filename.parquet

CF arrow source code : https://github.com/apache/arrow/blob/2a87693134135a8af2ae2b6df41980176431b1c0/cpp/src/arrow/filesystem/filesystem.cc#L679

@koperagen
Copy link
Collaborator

koperagen commented Feb 1, 2024

unfortunately http(s) is not supported yet

I actually tried to read local copy of that file and it failed with JNIEnv was not attached to current thread. Want to see if it's a platform specific bug or something else

Thanks for clarification about URI. Let's change that parameter type to java.net.URI and add a note about filesystems then?

@fb64
Copy link
Contributor Author

fb64 commented Feb 2, 2024

I reached the same issue, another problem with JNI (and thread)...
Changing the creation of NativeMemoryPool with NativeMemoryPool.getDefault() here seems to fix the error.
By the way is not recommended to use it in production: https://arrow.apache.org/docs/java/dataset.html#native-memory-management
I also will update the URI part ...

@koperagen koperagen self-requested a review February 2, 2024 13:50
@koperagen koperagen added this to the 0.13.0 milestone Feb 2, 2024
@zaleslaw
Copy link
Collaborator

zaleslaw commented Feb 5, 2024

Hi, thanks to the PR, sorry, I could not understand will it cover any Parquet files or only Parquet files keeping the something in the Arrow format? I will collect a few parquet files and return to you

@fb64
Copy link
Contributor Author

fb64 commented Feb 6, 2024

Hi, thanks to the PR, sorry, I could not understand will it cover any Parquet files or only Parquet files keeping the something in the Arrow format? I will collect a few parquet files and return to you

I confirm that it should cover every parquet files. We facing to a JNI error with some parquet files (not all). I created an issue on arrow repository: apache/arrow#20379

@zaleslaw
Copy link
Collaborator

zaleslaw commented Feb 8, 2024

@fb64 we made a decision to not merge it immediately before three things happened:

  1. This PR will be tested on our own Parquet files
  2. We compare this approach with alternatives (without Apache Arrow, for example)
  3. We decided to keep it in Arrow module or create a separate module for it (it could change our 2-level structure of modules and dependencies in the project.

Thanks again for your help and collaboration!

@fb64
Copy link
Contributor Author

fb64 commented Feb 8, 2024

@fb64 we made a decision to not merge it immediately before three things happened:

  1. This PR will be tested on our own Parquet files
  2. We compare this approach with alternatives (without Apache Arrow, for example)
  3. We decided to keep it in Arrow module or create a separate module for it (it could change our 2-level structure of modules and dependencies in the project.

Thanks again for your help and collaboration!

No problem !
From my experience the other alternative is to use the Java Parquet library which relies on Hadoop which can be difficult to run on windows because of certain native libraries (but this point has maybe been improved). On the other hand arrow-dataset seems to be still under development and not totally operational but it seems prometheus and could bring both parquet and orc format reading/writing feature easily.
Let's keep in touch

@fb64
Copy link
Contributor Author

fb64 commented Feb 11, 2024

Related Arrow issue for JNIEnv was not attached to current thread error : apache/arrow#20379

@Jolanrensen Jolanrensen modified the milestones: 0.13.0, Backlog Mar 7, 2024
@Jolanrensen Jolanrensen added the enhancement New feature or request label Mar 7, 2024
@fb64 fb64 force-pushed the arrow-read-parquet branch 2 times, most recently from 71b06f5 to 8b8f706 Compare April 24, 2024 11:49
@fb64
Copy link
Contributor Author

fb64 commented Apr 24, 2024

for information I just updated this PR with Arrow 16.0.0 that includes fixes for the 2 issues discovered previously :

@fb64 fb64 force-pushed the arrow-read-parquet branch from 8b8f706 to 5ce70b9 Compare August 13, 2024 10:05
@fb64 fb64 force-pushed the arrow-read-parquet branch from 5ce70b9 to 79fd37d Compare September 6, 2024 09:41
@laurentperez
Copy link

Hi

I'm interested in df support, I can confirm that cloning from https://github.com/fb64/dataframe/tree/arrow-read-parquet and building with ./gradlew build publishToMavenLocal -xtest -xintegrationTest works. Very nice.

Please note that Arrow required me to set the following args in my build.gradle

jvmArgs '--add-opens', 'java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED'

However, when reading a.parquet with 38 millions of lines, it will OOM with Java heap space: failed reallocation of scalar replaced objects with the followig code snippet.

It works with smaller files though, around 5M rows per file work.

Caused by: java.lang.OutOfMemoryError: Java heap space: failed reallocation of scalar replaced objects
	at java.base/java.lang.Long.valueOf(Long.java:1206)
	at org.apache.arrow.vector.BigIntVector.getObject(BigIntVector.java:131)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingImplKt.values(arrowReadingImpl.kt:111)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingImplKt.readField(arrowReadingImpl.kt:301)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingImplKt.readArrowImpl(arrowReadingImpl.kt:389)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingKt.readArrow(arrowReading.kt:181)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingImplKt.readArrowDataset(arrowReadingImpl.kt:415)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingKt.readParquet(arrowReading.kt:195)
	at org.jetbrains.kotlinx.dataframe.io.ArrowReadingKt.readParquet$default(arrowReading.kt:192)
import org.jetbrains.kotlinx.dataframe.DataFrame
import org.jetbrains.kotlinx.dataframe.io.readParquet
val frame = DataFrame.readParquet(            URL("file:/home/lperez/foo.parquet)        )
        val rowsCount = frame.rowsCount()
        val columnTypes = frame.columnTypes()
        println("columnTypes: $columnTypes")
        println("rowsCount: $rowsCount")
        println(frame)

Parquet size :

zsh 10147 [127]  (git)-[main]-% parquet-tools inspect foo.parquet

############ file meta data ############
created_by: Polars
num_columns: 7
num_rows: 38837326
num_row_groups: 148
format_version: 1.0
serialized_size: 100764

I've set Xmx to 4096M on my machine, should I be able to run 38M of lines or is it too much ?

thanks

@fb64
Copy link
Contributor Author

fb64 commented Oct 15, 2024

Hello Laurent, I'm pleased to see your interest in this PR.
Kotlin Dataframe keeps all data in memory, which may lead to out-of-memory issues with large datasets.
What is the size of your parquet file and is it compressed ? Could you specify the column types in your dataset?
For datasets too large for memory, I can recommend to use DuckDB with Kotlin Dataframe for preprocessing before loading into a Dataframe using Arrow.
I recently published an article about this : https://dev.to/fb64/kotlin-dataframe-arrow-54kh
Hope it'll help you.

@laurentperez
Copy link

Hi. My parquet is 126MB on disk, uses ZSTD compression, columns types are byte array, double, float (1), and has 38M lines.
Theses lines are one month of measures from IoT devices in some industry.

In fact.. I'm already using DuckDB ;) Some of my parquets are generated either by python polars, or duck.

Our company backends use both kotlin and python so I was looking at your PR with great interest.

I've used KDF before, my use case is to render base64 PNG plot renderings (using Kandy) of the dataframes, I know it works, however, only on "small" datasets :

I was not aware KDF would load the entire thing in memory.
I wrongly assumed it'd read using chunks or some sort of cursor or iterable. Perhaps if readParquet() delegated to an underlying (RootAllocator(),512) # from your article, or some sort of implementation based on https://kotlin.github.io/dataframe/read.html#read-apache-arrow-formats, then it'd fit.

This is not a problem per se : I'm already using duck, and your article is a great piece of information, I'll use it :) 👍

I was wondering how one would read such a massive dataset directly thru readParquet without knowledge of Arrow (or duck, for that matter) but I guess this is beyond the scope of this PR.

I hope your branch gets merged soon in 1.4.x ;)

(1)

############ file meta data ############
created_by: Polars
num_columns: 7
num_rows: 38837326
num_row_groups: 148
format_version: 1.0
serialized_size: 100764

############ Columns ############
name
id
value2
value3
timestamp
value4
processed_timestamp

############ Column(name) ############
name: name
path: name
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: ZSTD (space_saved: -16%)

############ Column(id) ############
name: id
path: id
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: None
converted_type (legacy): NONE
compression: ZSTD (space_saved: 86%)

############ Column(value2) ############
name: value2
path: value2
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: None
converted_type (legacy): NONE
compression: ZSTD (space_saved: 99%)

############ Column(value3) ############
name: value3
path: value3
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: None
converted_type (legacy): NONE
compression: ZSTD (space_saved: 65%)

############ Column(timestamp) ############
name: timestamp
path: timestamp
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: ZSTD (space_saved: 84%)

############ Column(value4) ############
name: value4
path: value4
max_definition_level: 1
max_repetition_level: 0
physical_type: DOUBLE
logical_type: None
converted_type (legacy): NONE
compression: ZSTD (space_saved: 82%)

############ Column(processed_timestamp) ############
name: processed_timestamp
path: processed_timestamp
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: ZSTD (space_saved: 81%)

@fb64
Copy link
Contributor Author

fb64 commented Oct 15, 2024

Polars (and also Pandas) store DataFrames in memory. However, Polars uses Arrow Format for representation in RAM, which can be more efficient for handling large datasets compared to KDF.
Polars has also a lazy mode that allows performing some operations without loading all rows (streaming mode) but only some operations are supported.
Maybe you can try to check how much memory takes your parquet file into a Polars DataFrame and compare it to KDF by using :

import polars as pl
df_pl = pl.read_parquet('<your_parquet_file>')
print(f'Size: {df_pl.estimated_size()} bytes')

I could be useful to add a such method in Kotlin DataFrame

@fb64 fb64 force-pushed the arrow-read-parquet branch from 79fd37d to e66906a Compare October 15, 2024 20:24
@laurentperez
Copy link

Polars estimated_size returned : 3650708644 == estimated 3.65GB

I made a mistake configuring my gradle build, it was not honoring my heap settings under tests, hence the OOM.

I made it work and it fits in memory using 16GB of heap which roughly translates using htop to 18-20GB of Resident Set Size (RSS is real physical RAM used by the JVM, Heap plus the other regions), I had to opt-in for ZGC since I'm on java21.

To estimate an order of magnitude for future readers stumbling upon this thread then I'd say loading the whole set using the JVM will use approx 4-5x the estimated size returned by Polars.

TL;DR : works :

tasks.withType(Test) {
    maxHeapSize = "16g"
    jvmArgs '--add-opens', 'java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED', '-XX:+UseZGC', '-XX:+ZGenerational'
}

I understand KDF minimal java version is 8, and ZGC did not land under java <= 15 so ZGC may not be a viable option for massive datasets requiring a big heap (esp. if target is an Android device). I'm using kotlin on server side, so ZGC is ok for me.

Thanks !

@fb64
Copy link
Contributor Author

fb64 commented Oct 16, 2024

After testing Parquet and CSV files, I also found that Kotlin Dataframe consumes 4-5 times more memory than Polars and Pandas. This maybe due to type boxing or other things related to the JVM, but not directly linked to parquet parsing.
IMHO there is a room for memory improvement and it should be addressed in a dedicated issue
Regarding the reading of parquet files, it seems quite stable. I know that there are some missing type mappings but I plan to work on it.
In any case, thank you @laurentperez for taking the time to test and provide constructive feedback. 🙏

@fb64
Copy link
Contributor Author

fb64 commented Oct 16, 2024

I just dig on to understand why the same DataFrame takes much space in JVM, and it is indeed due to JVM memory representation (mark word, class pointer, memory alignment).
According to this great article, Java objects could take up to 16 times more memory than their primitive counterparts.

@Jolanrensen
Copy link
Collaborator

@fb64 We're aware of the JVM space usage of DataFrame. Some weeks ago I've actually been investigating whether it's possible to swap out the underlying ArrayLists containing the column's data with primitive arrays (#712). While, yes, it's possible, I found it to be a large trade-off in terms of performance. Yes, we can store values as primitives, but then for each operation, since we use generic functions, they are boxed and unboxed one at a time. This is heavy and is very noticable for large operations. While we could, in theory, create primitive overloads for each operation we have, it may be worth to wait for Project Valhalla to help us here: https://openjdk.org/jeps/402.

@fb64
Copy link
Contributor Author

fb64 commented Oct 17, 2024

@Jolanrensen Yes Valhalla sounds promising to improve memory, and combined with the new Vector API it it could improve the performances significantly.
Another approach could be to use Arrow representation in memory (such as Polars, Pandas, etc.) and layer DataFrame methods on top of it. However, this may not be straightforward.

@fb64 fb64 force-pushed the arrow-read-parquet branch from e66906a to b123c43 Compare November 28, 2024 17:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for reading Parquet files
5 participants