Size: a a a

2020 December 04

AE

Alexey Evdokimov in Data Engineers
MarkAndSweep, причём гвоздями прибили по ходу
источник

AE

Alexey Evdokimov in Data Engineers
пытался прописать через -XX, ошибки сыплет
источник

ИК

Иван Калининский... in Data Engineers
Всем привет!

Я написал простенький партишенер:

class MyPartitioner(n: Int) extends Partitioner {
 override def numPartitions: Int = n
 override def getPartition(key: Any): Int = key.asInstanceOf[Row].getAs[Int](PartField)
}

И так же просто его вызываю:

def repartition(df: DataFrame, n: Int)(implicit spark: SparkSession): DataFrame = {
 val partitionedRDD = df
   .rdd
   
.map(row => (row, ()))
   .partitionBy(new MyPartitioner(n))
   .map(_._1)
 spark.createDataFrame(partitionedRDD, df.schema)
   .drop(PartField)
}

Поле PartField с типом Int уже добавлено в датафрейм, и представляет собой точный номер партиции RDD. Эта фигня обеспечивает нужное разделение по партициям и конечным файлам, но работает примерно в два раза медленнее, чем простой df.repartition(n, col(PartField)). Но repartition работает по вычисленному Murmur3 (коллизии!) и в один файл уходит несколько партиций. Повышать n пробовал (это был предыдущий подход, уже больше года работает), но коллизии все равно возможны, и много пустых тасков не добавляет элегантности.

Отсюда два вопроса:
1. Что я делаю не так?
2. Как можно сделать чтобы Partitioner работал в два раза быстрее или DataFrame.repartition брал значение поля, не вычисляя хеш? С расширениями SparkSession могу повозиться, если есть какой-нибудь наводящий ресурс
источник

A

Alex in Data Engineers
@KaiNie_R https://habr.com/ru/company/otus/blog/529684/

Есть подозрение что из-за ваших конвертации в рдд все и идёт по одному месту
источник

A

Alex in Data Engineers
В статье примеры на "одинаковы код есть"
источник

A

Alex in Data Engineers
Напишите партишинер для датафрейма :)
источник

AS

Andrey Smirnov in Data Engineers
Весьма неожиданный результат! Получается, что дисковый ввод-вывод оказывает намного большее влияние на производительность, чем передача данных по сети.

простите что? автор узнал что передача по сети быстрее чем чтение с диска?
источник

A

Alex in Data Engineers
Там много КО было, но проблема что многие кто работает на уровне апи и её смотрит под капот это не понимают
источник

ME

Max Efremov in Data Engineers
Andrey Smirnov
Весьма неожиданный результат! Получается, что дисковый ввод-вывод оказывает намного большее влияние на производительность, чем передача данных по сети.

простите что? автор узнал что передача по сети быстрее чем чтение с диска?
Смотря какая сеть и диск 😂
источник

ИК

Иван Калининский... in Data Engineers
Alex
Напишите партишинер для датафрейма :)
Да, похоже так и придётся делать. Обидно, что со всей кодогенерацией, типизацией и т. п., для работы партишенера, в котором две строки, нужно создать ещё несколько классов и встроить их в сессию. Ещё и протестировать
источник

A

Alex in Data Engineers
Или быстро или удобно :) 2 стула
источник

NN

No Name in Data Engineers
Alex
Или быстро или удобно :) 2 стула
Обычно стулья похуже предлагаются
источник

A

Alex in Data Engineers
Похуже это когда уйдёт в прод и тебе приходится отвечать за принятое решение
источник

ИК

Иван Калининский... in Data Engineers
Alex
Или быстро или удобно :) 2 стула
Третий стул: форкнуть спарк(
источник

A

Alex in Data Engineers
Ну, многие имеют кастомные сборки, но их ещё и поддерживать надо регулярно
источник

A

Alex in Data Engineers
И это точно не тот случай
источник

A

Alex in Data Engineers
Пускай и пяток классов, но их можно в либу отдельную завернуть

Форк имеет смысл если нужно заменить поведение и по другому никак (например константы или сам алгоритм пофиксить), бекпортнуть изменение
источник

ИК

Иван Калининский... in Data Engineers
Alex
Или быстро или удобно :) 2 стула
С rdd сейчас медленно и неудобно, вот четвёртый стул
источник

A

Alex in Data Engineers
Это не стул, это жизнь
источник

ИК

Иван Калининский... in Data Engineers
Alex
Пускай и пяток классов, но их можно в либу отдельную завернуть

Форк имеет смысл если нужно заменить поведение и по другому никак (например константы или сам алгоритм пофиксить), бекпортнуть изменение
Парсер spark я дополнял, то ещё удовольствие.
источник