ПФ
Size: a a a
ПФ
K
ПФ
ПФ
SI
df.repartition(10) шаффлит данные? Что конкретно используется для вычисления хеша и как его руками воспроизвести?ПФ
ПФ
ПФ
SI
ПФ
ПФ
t
ПФ
ИК
ПФ
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}ПФ
ПФ
ПФ
def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
case RoundRobinPartitioning(numPartitions) =>
// Distributes elements evenly across output partitions, starting from a random partition.
var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
(row: InternalRow) => {
// The HashPartitioner will handle the `mod` by the number of partitions
position += 1
position
}
case h: HashPartitioning =>
val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes)
row => projection(row).getInt(0)
case RangePartitioning(sortingExpressions, _) =>
val projection = UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes)
row => projection(row)
case SinglePartition => identity
case _ => sys.error(s"Exchange not implemented for $newPartitioning")
}ПФ