Всем привет!
Я написал простенький партишенер:
class MyPartitioner(n: Int) extends Partitioner {
override def numPartitions: Int = n
override def getPartition(key: Any): Int = key.asInstanceOf[Row].getAs[Int](PartField)
}
И так же просто его вызываю:
def repartition(df: DataFrame, n: Int)(implicit spark: SparkSession): DataFrame = {
val partitionedRDD = df
.rdd
.map(row => (row, ()))
.partitionBy(new MyPartitioner(n))
.map(_._1)
spark.createDataFrame(partitionedRDD, df.schema)
.drop(PartField)
}
Поле PartField с типом Int уже добавлено в датафрейм, и представляет собой точный номер партиции RDD. Эта фигня обеспечивает нужное разделение по партициям и конечным файлам, но работает примерно в два раза медленнее, чем простой df.repartition(n, col(PartField)). Но repartition работает по вычисленному Murmur3 (коллизии!) и в один файл уходит несколько партиций. Повышать n пробовал (это был предыдущий подход, уже больше года работает), но коллизии все равно возможны, и много пустых тасков не добавляет элегантности.
Отсюда два вопроса:
1. Что я делаю не так?
2. Как можно сделать чтобы Partitioner работал в два раза быстрее или DataFrame.repartition брал значение поля, не вычисляя хеш? С расширениями SparkSession могу повозиться, если есть какой-нибудь наводящий ресурс