diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 0831fb62675..abe043d77fb 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -378,19 +378,19 @@ const double test_traits<::arrow::DoubleType>::value(4.2); template <> struct test_traits<::arrow::StringType> { static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; - static std::string const value; + static const std::string value; }; template <> struct test_traits<::arrow::BinaryType> { static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; - static std::string const value; + static const std::string value; }; template <> struct test_traits<::arrow::FixedSizeBinaryType> { static constexpr ParquetType::type parquet_enum = ParquetType::FIXED_LEN_BYTE_ARRAY; - static std::string const value; + static const std::string value; }; const std::string test_traits<::arrow::StringType>::value("Test"); // NOLINT @@ -5794,6 +5794,72 @@ TEST(TestArrowReadWrite, WriteRecordBatchNotProduceEmptyRowGroup) { } } +TEST(TestArrowReadWrite, WriteRecordBatchFlushRowGroupByBufferedSize) { + auto pool = ::arrow::default_memory_pool(); + auto sink = CreateOutputStream(); + // Limit the max bytes in a row group to 100 so that each batch produces a new group. + auto writer_properties = WriterProperties::Builder().max_row_group_bytes(100)->build(); + auto arrow_writer_properties = default_arrow_writer_properties(); + + // Prepare schema + auto schema = ::arrow::schema({::arrow::field("a", ::arrow::int64())}); + std::shared_ptr parquet_schema; + ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); + + auto gen = ::arrow::random::RandomArrayGenerator(/*seed=*/42); + + // Create writer to write data via RecordBatch. + ASSERT_OK_AND_ASSIGN(auto arrow_writer, parquet::arrow::FileWriter::Open( + *schema, pool, sink, writer_properties, + arrow_writer_properties)); + // NewBufferedRowGroup() is not called explicitly and it will be called + // inside WriteRecordBatch(). + for (int i = 0; i < 5; ++i) { + auto record_batch = + gen.BatchOf({::arrow::field("a", ::arrow::int64())}, /*length=*/1); + ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch)); + } + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + auto file_metadata = arrow_writer->metadata(); + EXPECT_EQ(5, file_metadata->num_row_groups()); + for (int i = 0; i < 5; ++i) { + EXPECT_EQ(1, file_metadata->RowGroup(i)->num_rows()); + } +} + +TEST(TestArrowReadWrite, WriteTableFlushRowGroupByBufferedSize) { + auto pool = ::arrow::default_memory_pool(); + auto sink = CreateOutputStream(); + // Limit the max bytes in a row group to 100, then first table generates one row group, + // and second table generates 5 row groups. + auto writer_properties = WriterProperties::Builder().max_row_group_bytes(100)->build(); + auto arrow_writer_properties = default_arrow_writer_properties(); + + // Prepare schema + auto schema = ::arrow::schema({::arrow::field("a", ::arrow::int64())}); + auto table = ::arrow::Table::Make( + schema, {::arrow::ArrayFromJSON(::arrow::int64(), R"([1, 2, 3, 4, 5])")}); + ASSERT_OK_AND_ASSIGN(auto arrow_writer, parquet::arrow::FileWriter::Open( + *schema, pool, sink, writer_properties, + arrow_writer_properties)); + for (int i = 0; i < 2; ++i) { + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table, 5)); + } + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + auto file_metadata = arrow_writer->metadata(); + EXPECT_EQ(6, file_metadata->num_row_groups()); + EXPECT_EQ(5, file_metadata->RowGroup(0)->num_rows()); + for (int i = 1; i < 6; ++i) { + EXPECT_EQ(1, file_metadata->RowGroup(i)->num_rows()); + } +} + TEST(TestArrowReadWrite, MultithreadedWrite) { const int num_columns = 20; const int num_rows = 1000; diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 4b2b06e5e09..6438821c0eb 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -397,13 +397,18 @@ class FileWriterImpl : public FileWriter { if (chunk_size <= 0 && table.num_rows() > 0) { return Status::Invalid("chunk size per row_group must be greater than 0"); - } else if (!table.schema()->Equals(*schema_, false)) { + } else if (!table.schema()->Equals(*schema_, /*check_metadata=*/false)) { return Status::Invalid("table schema does not match this writer's. table:'", table.schema()->ToString(), "' this:'", schema_->ToString(), "'"); } else if (chunk_size > this->properties().max_row_group_length()) { chunk_size = this->properties().max_row_group_length(); } + if (auto avg_row_size = EstimateCompressedBytesPerRow()) { + chunk_size = std::min( + chunk_size, static_cast(this->properties().max_row_group_bytes() / + avg_row_size.value())); + } auto WriteRowGroup = [&](int64_t offset, int64_t size) { RETURN_NOT_OK(NewRowGroup()); @@ -442,12 +447,8 @@ class FileWriterImpl : public FileWriter { return Status::OK(); } - // Max number of rows allowed in a row group. - const int64_t max_row_group_length = this->properties().max_row_group_length(); - // Initialize a new buffered row group writer if necessary. - if (row_group_writer_ == nullptr || !row_group_writer_->buffered() || - row_group_writer_->num_rows() >= max_row_group_length) { + if (row_group_writer_ == nullptr || !row_group_writer_->buffered()) { RETURN_NOT_OK(NewBufferedRowGroup()); } @@ -480,17 +481,24 @@ class FileWriterImpl : public FileWriter { return Status::OK(); }; + const int64_t max_row_group_length = this->properties().max_row_group_length(); + const int64_t max_row_group_bytes = this->properties().max_row_group_bytes(); + int64_t offset = 0; while (offset < batch.num_rows()) { - const int64_t batch_size = - std::min(max_row_group_length - row_group_writer_->num_rows(), - batch.num_rows() - offset); - RETURN_NOT_OK(WriteBatch(offset, batch_size)); - offset += batch_size; - - // Flush current row group writer and create a new writer if it is full. - if (row_group_writer_->num_rows() >= max_row_group_length && - offset < batch.num_rows()) { + int64_t batch_size = std::min(max_row_group_length - row_group_writer_->num_rows(), + batch.num_rows() - offset); + if (auto avg_row_size = EstimateCompressedBytesPerRow()) { + int64_t buffered_bytes = row_group_writer_->EstimatedTotalCompressedBytes(); + batch_size = std::min( + batch_size, static_cast((max_row_group_bytes - buffered_bytes) / + avg_row_size.value())); + } + if (batch_size > 0) { + RETURN_NOT_OK(WriteBatch(offset, batch_size)); + offset += batch_size; + } else if (offset < batch.num_rows()) { + // Current row group is full, write remaining rows in a new group. RETURN_NOT_OK(NewBufferedRowGroup()); } } @@ -516,6 +524,17 @@ class FileWriterImpl : public FileWriter { return Status::OK(); } + std::optional EstimateCompressedBytesPerRow() const override { + if (auto value = writer_->EstimateCompressedBytesPerRow()) { + return value; + } + if (row_group_writer_ != nullptr && row_group_writer_->num_rows() > 0) { + return static_cast(row_group_writer_->EstimatedTotalCompressedBytes()) / + row_group_writer_->num_rows(); + } + return std::nullopt; + } + private: friend class FileWriter; diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 8ec8796ffd1..33eca70e31f 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -111,9 +111,9 @@ class PARQUET_EXPORT FileWriter { /// Multiple RecordBatches can be written into the same row group /// through this method. /// - /// WriterProperties.max_row_group_length() is respected and a new - /// row group will be created if the current row group exceeds the - /// limit. + /// WriterProperties.max_row_group_length() and WriterProperties.max_row_group_bytes() + /// are respected and a new row group will be created if the current row group exceeds + /// the limits. /// /// Batches get flushed to the output stream once NewBufferedRowGroup() /// or Close() is called. @@ -139,6 +139,9 @@ class PARQUET_EXPORT FileWriter { /// `store_schema` being unusable during read. virtual ::arrow::Status AddKeyValueMetadata( const std::shared_ptr& key_value_metadata) = 0; + /// \brief Estimate compressed bytes per row from data written so far. + /// \note std::nullopt will be returned if there is no row written. + virtual std::optional EstimateCompressedBytesPerRow() const = 0; /// \brief Return the file metadata, only available after calling Close(). virtual const std::shared_ptr metadata() const = 0; }; diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index ddec2c0a560..1fecca8e488 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -68,6 +68,12 @@ int64_t RowGroupWriter::total_compressed_bytes_written() const { return contents_->total_compressed_bytes_written(); } +int64_t RowGroupWriter::EstimatedTotalCompressedBytes() const { + return contents_->total_compressed_bytes() + + contents_->total_compressed_bytes_written() + + contents_->EstimatedBufferedValueBytes(); +} + bool RowGroupWriter::buffered() const { return contents_->buffered(); } int RowGroupWriter::current_column() { return contents_->current_column(); } @@ -195,6 +201,20 @@ class RowGroupSerializer : public RowGroupWriter::Contents { return total_compressed_bytes_written; } + int64_t EstimatedBufferedValueBytes() const override { + if (closed_) { + return 0; + } + int64_t estimated_buffered_value_bytes = 0; + for (size_t i = 0; i < column_writers_.size(); i++) { + if (column_writers_[i]) { + estimated_buffered_value_bytes += + column_writers_[i]->estimated_buffered_value_bytes(); + } + } + return estimated_buffered_value_bytes; + } + bool buffered() const override { return buffered_row_group_; } void Close() override { @@ -329,6 +349,7 @@ class FileSerializer : public ParquetFileWriter::Contents { if (row_group_writer_) { num_rows_ += row_group_writer_->num_rows(); row_group_writer_->Close(); + written_compressed_bytes_ += row_group_writer_->total_compressed_bytes_written(); } row_group_writer_.reset(); @@ -352,6 +373,8 @@ class FileSerializer : public ParquetFileWriter::Contents { int64_t num_rows() const override { return num_rows_; } + int64_t written_compressed_bytes() const override { return written_compressed_bytes_; } + const std::shared_ptr& properties() const override { return properties_; } @@ -360,6 +383,7 @@ class FileSerializer : public ParquetFileWriter::Contents { if (row_group_writer_) { num_rows_ += row_group_writer_->num_rows(); row_group_writer_->Close(); + written_compressed_bytes_ += row_group_writer_->total_compressed_bytes_written(); } int16_t row_group_ordinal = -1; // row group ordinal not set if (file_encryptor_ != nullptr) { @@ -415,6 +439,7 @@ class FileSerializer : public ParquetFileWriter::Contents { properties_(std::move(properties)), num_row_groups_(0), num_rows_(0), + written_compressed_bytes_(0), metadata_(FileMetaDataBuilder::Make(&schema_, properties_)) { PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell()); if (position == 0) { @@ -468,6 +493,7 @@ class FileSerializer : public ParquetFileWriter::Contents { const std::shared_ptr properties_; int num_row_groups_; int64_t num_rows_; + int64_t written_compressed_bytes_; std::unique_ptr metadata_; // Only one of the row group writers is active at a time std::unique_ptr row_group_writer_; @@ -640,6 +666,29 @@ void ParquetFileWriter::AddKeyValueMetadata( } } +std::optional ParquetFileWriter::EstimateCompressedBytesPerRow() const { + if (contents_ && contents_->num_rows() > 0) { + // Use written row groups to estimate. + return static_cast(contents_->written_compressed_bytes()) / + contents_->num_rows(); + } + if (file_metadata_) { + // Use closed file metadata to estimate. + int64_t total_compressed_bytes = 0; + int64_t total_rows = 0; + for (int i = 0; i < file_metadata_->num_row_groups(); i++) { + const auto row_group = file_metadata_->RowGroup(i); + total_compressed_bytes += row_group->total_compressed_size(); + total_rows += row_group->num_rows(); + } + if (total_compressed_bytes == 0 || total_rows == 0) { + return std::nullopt; + } + return static_cast(total_compressed_bytes) / total_rows; + } + return std::nullopt; +} + const std::shared_ptr& ParquetFileWriter::properties() const { if (contents_) { return contents_->properties(); diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h index d5ea1d7c98a..26500e6ca61 100644 --- a/cpp/src/parquet/file_writer.h +++ b/cpp/src/parquet/file_writer.h @@ -58,6 +58,9 @@ class PARQUET_EXPORT RowGroupWriter { virtual int64_t total_compressed_bytes() const = 0; /// \brief total compressed bytes written by the page writer virtual int64_t total_compressed_bytes_written() const = 0; + /// \brief estimated bytes of values that are buffered by the page writer + /// but not written to a page yet + virtual int64_t EstimatedBufferedValueBytes() const = 0; virtual bool buffered() const = 0; }; @@ -99,6 +102,8 @@ class PARQUET_EXPORT RowGroupWriter { int64_t total_compressed_bytes() const; /// \brief total compressed bytes written by the page writer int64_t total_compressed_bytes_written() const; + /// \brief Estimate total compressed bytes including written and buffered bytes. + int64_t EstimatedTotalCompressedBytes() const; /// Returns whether the current RowGroupWriter is in the buffered mode and is created /// by calling ParquetFileWriter::AppendBufferedRowGroup. @@ -151,6 +156,7 @@ class PARQUET_EXPORT ParquetFileWriter { virtual RowGroupWriter* AppendBufferedRowGroup() = 0; virtual int64_t num_rows() const = 0; + virtual int64_t written_compressed_bytes() const = 0; virtual int num_columns() const = 0; virtual int num_row_groups() const = 0; @@ -207,6 +213,10 @@ class PARQUET_EXPORT ParquetFileWriter { void AddKeyValueMetadata( const std::shared_ptr& key_value_metadata); + /// \brief Estimate compressed bytes per row from closed row groups. + /// \return Estimated bytes or std::nullopt when no written row group. + std::optional EstimateCompressedBytesPerRow() const; + /// Number of columns. /// /// This number is fixed during the lifetime of the writer as it is determined via diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc index 8a374fcdaa3..8fc9a0ba1ce 100644 --- a/cpp/src/parquet/properties.cc +++ b/cpp/src/parquet/properties.cc @@ -67,6 +67,20 @@ std::shared_ptr default_arrow_writer_properties() { return default_writer_properties; } +WriterProperties::Builder* WriterProperties::Builder::max_row_group_length( + int64_t max_row_group_length) { + ARROW_CHECK_GT(max_row_group_length, 0) << "max_row_group_length must be positive"; + max_row_group_length_ = max_row_group_length; + return this; +} + +WriterProperties::Builder* WriterProperties::Builder::max_row_group_bytes( + int64_t max_row_group_bytes) { + ARROW_CHECK_GT(max_row_group_bytes, 0) << "max_row_group_bytes must be positive"; + max_row_group_bytes_ = max_row_group_bytes; + return this; +} + void WriterProperties::Builder::CopyColumnSpecificProperties( const WriterProperties& properties) { for (const auto& [col_path, col_props] : properties.column_properties_) { diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index eb5aee29695..2c2c654aa61 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -160,6 +160,7 @@ static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true; static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize; static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 1024 * 1024; +static constexpr int64_t DEFAULT_MAX_ROW_GROUP_BYTES = 128 * 1024 * 1024; static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true; static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096; static constexpr Encoding::type DEFAULT_ENCODING = Encoding::UNKNOWN; @@ -293,6 +294,7 @@ class PARQUET_EXPORT WriterProperties { dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT), write_batch_size_(DEFAULT_WRITE_BATCH_SIZE), max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH), + max_row_group_bytes_(DEFAULT_MAX_ROW_GROUP_BYTES), pagesize_(kDefaultDataPageSize), max_rows_per_page_(kDefaultMaxRowsPerPage), version_(ParquetVersion::PARQUET_2_6), @@ -309,6 +311,7 @@ class PARQUET_EXPORT WriterProperties { dictionary_pagesize_limit_(properties.dictionary_pagesize_limit()), write_batch_size_(properties.write_batch_size()), max_row_group_length_(properties.max_row_group_length()), + max_row_group_bytes_(properties.max_row_group_bytes()), pagesize_(properties.data_pagesize()), max_rows_per_page_(properties.max_rows_per_page()), version_(properties.version()), @@ -413,10 +416,12 @@ class PARQUET_EXPORT WriterProperties { /// Specify the max number of rows to put in a single row group. /// Default 1Mi rows. - Builder* max_row_group_length(int64_t max_row_group_length) { - max_row_group_length_ = max_row_group_length; - return this; - } + Builder* max_row_group_length(int64_t max_row_group_length); + + /// Specify the max number of bytes to put in a single row group. + /// The size is estimated based on encoded and compressed data. + /// Default 128MB. + Builder* max_row_group_bytes(int64_t max_row_group_bytes); /// Specify the data page size. /// Default 1MB. @@ -779,11 +784,12 @@ class PARQUET_EXPORT WriterProperties { return std::shared_ptr(new WriterProperties( pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, - pagesize_, max_rows_per_page_, version_, created_by_, page_checksum_enabled_, - size_statistics_level_, std::move(file_encryption_properties_), - default_column_properties_, column_properties, data_page_version_, - store_decimal_as_integer_, std::move(sorting_columns_), - content_defined_chunking_enabled_, content_defined_chunking_options_)); + max_row_group_bytes_, pagesize_, max_rows_per_page_, version_, created_by_, + page_checksum_enabled_, size_statistics_level_, + std::move(file_encryption_properties_), default_column_properties_, + column_properties, data_page_version_, store_decimal_as_integer_, + std::move(sorting_columns_), content_defined_chunking_enabled_, + content_defined_chunking_options_)); } private: @@ -793,6 +799,7 @@ class PARQUET_EXPORT WriterProperties { int64_t dictionary_pagesize_limit_; int64_t write_batch_size_; int64_t max_row_group_length_; + int64_t max_row_group_bytes_; int64_t pagesize_; int64_t max_rows_per_page_; ParquetVersion::type version_; @@ -828,6 +835,8 @@ class PARQUET_EXPORT WriterProperties { inline int64_t max_row_group_length() const { return max_row_group_length_; } + inline int64_t max_row_group_bytes() const { return max_row_group_bytes_; } + inline int64_t data_pagesize() const { return pagesize_; } inline int64_t max_rows_per_page() const { return max_rows_per_page_; } @@ -946,9 +955,10 @@ class PARQUET_EXPORT WriterProperties { private: explicit WriterProperties( MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, - int64_t max_row_group_length, int64_t pagesize, int64_t max_rows_per_page, - ParquetVersion::type version, const std::string& created_by, - bool page_write_checksum_enabled, SizeStatisticsLevel size_statistics_level, + int64_t max_row_group_length, int64_t max_row_group_bytes, int64_t pagesize, + int64_t max_rows_per_page, ParquetVersion::type version, + const std::string& created_by, bool page_write_checksum_enabled, + SizeStatisticsLevel size_statistics_level, std::shared_ptr file_encryption_properties, const ColumnProperties& default_column_properties, const std::unordered_map& column_properties, @@ -959,6 +969,7 @@ class PARQUET_EXPORT WriterProperties { dictionary_pagesize_limit_(dictionary_pagesize_limit), write_batch_size_(write_batch_size), max_row_group_length_(max_row_group_length), + max_row_group_bytes_(max_row_group_bytes), pagesize_(pagesize), max_rows_per_page_(max_rows_per_page), parquet_data_page_version_(data_page_version), @@ -978,6 +989,7 @@ class PARQUET_EXPORT WriterProperties { int64_t dictionary_pagesize_limit_; int64_t write_batch_size_; int64_t max_row_group_length_; + int64_t max_row_group_bytes_; int64_t pagesize_; int64_t max_rows_per_page_; ParquetDataPageVersion parquet_data_page_version_;