diff --git a/velox/docs/develop/memory.rst b/velox/docs/develop/memory.rst index c5ed4a63fe27..06ff5fe667ed 100644 --- a/velox/docs/develop/memory.rst +++ b/velox/docs/develop/memory.rst @@ -610,10 +610,7 @@ The end-to-end memory arbitration process in *SharedArbitrator* works as follows memory reservations from the candidate query pools without actually freeing the used memory. It first tries to reclaim from itself and then from the candidate pools which have the most free capacity - (*MemoryPool::freeBytes*) until it reaches the memory reclaim target. Note - that we set the memory reclaim target to a large value - (*MemoryManagerOptions::memoryPoolTransferCapacity*) which could be more - than the actual needed size, to avoid the frequent memory arbitrations. + (*MemoryPool::freeBytes*) until it reaches the memory reclaim target. d. If the memory arbitrator hasn’t reclaimed enough free memory on fast path, it runs the slow path diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index d730d39e8d81..2e6794e43b18 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -352,7 +352,7 @@ void Driver::enqueueInternal() { queueTimeStartUs_ = getCurrentTimeMicro(); } -// Call an Oprator method. record silenced throws, but not a query +// Call an Operator method. record silenced throws, but not a query // terminating throw. Annotate exceptions with Operator info. #define CALL_OPERATOR(call, operatorPtr, operatorId, operatorMethod) \ try { \ diff --git a/velox/exec/PartitionedOutput.h b/velox/exec/PartitionedOutput.h index 28ef0877458d..4589646a167f 100644 --- a/velox/exec/PartitionedOutput.h +++ b/velox/exec/PartitionedOutput.h @@ -41,7 +41,7 @@ class Destination { setTargetSizePct(); } - // Resets the destination before starting a new batch. + /// Resets the destination before starting a new batch. void beginBatch() { rows_.clear(); rowIdx_ = 0; @@ -57,7 +57,8 @@ class Destination { } } - // Serializes row from 'output' till either 'maxBytes' have been serialized or + /// Serializes row from 'output' till either 'maxBytes' have been serialized + /// or BlockingReason advance( uint64_t maxBytes, const std::vector& sizes, @@ -136,19 +137,19 @@ class Destination { }; } // namespace detail -// In a distributed query engine data needs to be shuffled between workers so -// that each worker only has to process a fraction of the total data. Because -// rows are usually not pre-ordered based on the hash of the partition key for -// an operation (for example join columns, or group by columns), repartitioning -// is needed to send the rows to the right workers. PartitionedOutput operator -// is responsible for this process: it takes a stream of data that is not -// partitioned, and divides the stream into a series of output data ready to be -// sent to other workers. This operator is also capable of re-ordering and -// dropping columns from its input. +/// In a distributed query engine data needs to be shuffled between workers so +/// that each worker only has to process a fraction of the total data. Because +/// rows are usually not pre-ordered based on the hash of the partition key for +/// an operation (for example join columns, or group by columns), repartitioning +/// is needed to send the rows to the right workers. PartitionedOutput operator +/// is responsible for this process: it takes a stream of data that is not +/// partitioned, and divides the stream into a series of output data ready to be +/// sent to other workers. This operator is also capable of re-ordering and +/// dropping columns from its input. class PartitionedOutput : public Operator { public: - // Minimum flush size for non-final flush. 60KB + overhead fits a - // network MTU of 64K. + /// Minimum flush size for non-final flush. 60KB + overhead fits a + /// network MTU of 64K. static constexpr uint64_t kMinDestinationSize = 60 * 1024; PartitionedOutput( @@ -159,13 +160,13 @@ class PartitionedOutput : public Operator { void addInput(RowVectorPtr input) override; - // Always returns nullptr. The action is to further process - // unprocessed input. If all input has been processed, 'this' is in - // a non-blocked state, otherwise blocked. + /// Always returns nullptr. The action is to further process + /// unprocessed input. If all input has been processed, 'this' is in + /// a non-blocked state, otherwise blocked. RowVectorPtr getOutput() override; - // always true but the caller will check isBlocked before adding input, hence - // the blocked state does not accumulate input. + /// always true but the caller will check isBlocked before adding input, hence + /// the blocked state does not accumulate input. bool needsInput() const override { return true; } @@ -202,7 +203,7 @@ class PartitionedOutput : public Operator { void estimateRowSizes(); - /// Collect all rows with null keys into nullRows_. + // Collect all rows with null keys into nullRows_. void collectNullRows(); // If compression in serde is enabled, this is the minimum compression that @@ -223,7 +224,10 @@ class PartitionedOutput : public Operator { BlockingReason blockingReason_{BlockingReason::kNotBlocked}; ContinueFuture future_; bool finished_{false}; + // Contains pointers to 'rowSize_' elements. 'sizePointers_[i]' contains a + // pointer to 'rowSize_[i]'. std::vector sizePointers_; + // The estimated row size for each row. Index maps back to 'output_' index std::vector rowSize_; std::vector> destinations_; bool replicatedAny_{false};