Skip to content

Commit

Permalink
Rewrite the RecordBatchBuilder class as a ParquetStatsFileWriter class
Browse files Browse the repository at this point in the history
  • Loading branch information
robomics committed Nov 13, 2024
1 parent f51bb56 commit 0e1e72c
Show file tree
Hide file tree
Showing 10 changed files with 546 additions and 449 deletions.
4 changes: 2 additions & 2 deletions src/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ target_sources(
"${CMAKE_CURRENT_SOURCE_DIR}/parquet_helpers.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/parquet_stats_file_reader.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/parquet_stats_file_reader_impl.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/record_batch_builder.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/record_batch_builder_impl.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/parquet_stats_file_writer.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/parquet_stats_file_writer_impl.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/text.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/text_impl.hpp"
)
Expand Down
117 changes: 117 additions & 0 deletions src/io/include/nchg/parquet_stats_file_writer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright (C) 2024 Roberto Rossini <[email protected]>
//
// SPDX-License-Identifier: GPL-3.0
//
// This library is free software: you can redistribute it and/or
// modify it under the terms of the GNU Public License as published
// by the Free Software Foundation; either version 3 of the License,
// or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Public License along
// with this library. If not, see
// <https://www.gnu.org/licenses/>.

#pragma once

// clang-format off
#include "nchg/suppress_warnings.hpp"
NCHG_DISABLE_WARNING_PUSH
NCHG_DISABLE_WARNING_DEPRECATED_DECLARATIONS
#include <arrow/array/array_base.h>
#include <arrow/builder.h>
#include <arrow/record_batch.h>
#include <arrow/io/file.h>
#include <parquet/properties.h>
#include <parquet/arrow/writer.h>
NCHG_DISABLE_WARNING_POP
// clang-format on

#include <cstddef>
#include <cstdint>
#include <filesystem>
#include <hictk/reference.hpp>
#include <memory>
#include <optional>
#include <string_view>

namespace nchg {

class ParquetStatsFileWriter {
std::filesystem::path _path{};
std::shared_ptr<parquet::WriterProperties> _props{};
std::shared_ptr<arrow::io::FileOutputStream> _fp{};
std::unique_ptr<parquet::arrow::FileWriter> _writer{};

std::size_t _chunk_size{};
std::size_t _chunk_capacity{};
std::size_t _size{};

hictk::Reference _chroms{};

arrow::StringDictionary32Builder _chrom1{};
arrow::UInt32Builder _start1{};
arrow::UInt32Builder _end1{};

arrow::StringDictionary32Builder _chrom2{};
arrow::UInt32Builder _start2{};
arrow::UInt32Builder _end2{};

arrow::DoubleBuilder _pvalue{};
std::optional<arrow::DoubleBuilder> _pvalue_corrected{};
arrow::UInt64Builder _observed{};
arrow::DoubleBuilder _expected{};
arrow::DoubleBuilder _log_ratio{};
arrow::DoubleBuilder _odds{};
arrow::DoubleBuilder _omega{};

public:
ParquetStatsFileWriter() = delete;
ParquetStatsFileWriter(hictk::Reference chroms, const std::filesystem::path &path, bool force,
std::string_view compression_method, std::uint8_t compression_lvl,
std::size_t threads, std::size_t batch_size = 1'000'000);
ParquetStatsFileWriter(const ParquetStatsFileWriter &other) = delete;
ParquetStatsFileWriter(ParquetStatsFileWriter &&other) noexcept = delete;

~ParquetStatsFileWriter() noexcept;

ParquetStatsFileWriter &operator=(const ParquetStatsFileWriter &other) = delete;
ParquetStatsFileWriter &operator=(ParquetStatsFileWriter &&other) noexcept = delete;

template <typename Record>
void append(const Record &r);

// If finalize() is not called before a ParquetStatsFileWriter writer object goes out of scope
// the underlying file will be removed by the destructor
void finalize();

// Same as the above function, but can be used to finalize an empty file
template <typename Record>
void finalize();

private:
template <typename Record>
void initialize_writer();

void write_chunk();
void reset_builders();

template <typename ArrayBuilder, typename T>
void append(ArrayBuilder &builder, const T &data);

[[nodiscard]] std::shared_ptr<arrow::RecordBatch> finalize_chunk();

template <typename ArrayBuilder>
[[nodiscard]] static std::shared_ptr<arrow::Array> finalize_chunk(ArrayBuilder &builder);

static void setup_threadpool(std::size_t size);
void init_chrom_writers();
};

} // namespace nchg

#include "../../parquet_stats_file_writer_impl.hpp"
97 changes: 0 additions & 97 deletions src/io/include/nchg/record_batch_builder.hpp

This file was deleted.

Loading

0 comments on commit 0e1e72c

Please sign in to comment.