Size: a a a

2021 September 15

T

T in Data Engineers
Если они в паркете значит у них есть схема
источник

T

T in Data Engineers
Если вы там джсоны ток не жмёте
источник

VF

Vasily Fomin in Data Engineers
Да, но она может меняться. В Спарк есть mergeSchema флаг, который замедляет запросы, но он сможет помочь, я думаю это проще, чем самому мониторить схемы и писать что-то вроде Uber Schema Service. Но я могу ошибаться, поэтому открыт к любым идеям 😊
источник

AK

Alexandr Khan in Data Engineers
Всем привет, может глупый вопрос: как predicate pushdown улучшает перформанс? Данные то все равно фильтруются, только на уровне базы. Пока писал, походу понял - за счет data locality получается - не тянем все данные в спарк, а отправляем query к ним?
источник

AZ

Anton Zadorozhniy in Data Engineers
Все равно это немного
источник

AZ

Anton Zadorozhniy in Data Engineers
Зависит от формата, но идея именно такая что мы не читаем с диска лишнее
источник

AK

Alexandr Khan in Data Engineers
А что формат? Можете вкратце пояснить или ссылку на это? Спасибо
источник

AZ

Anton Zadorozhniy in Data Engineers
Возможность читать только нужные строчки и колонки зависит от формата, например CSV файл надо прочитать целиком, а для Parquet вы можете пропускать какие-то группы или страйпы (забыл точно как там) потому что там точно нет нужных вам колонок
источник

AZ

Anton Zadorozhniy in Data Engineers
Если например у вас Presto читает из PostgreSQL, то он может выгрузить все ваши таблицы в запросе целиком, а потом соединять и фильтровать их у себя; а может добавить к выгрузке where clause и взять только нужные колонки - это тоже пример predicate push down
источник

AK

Alexandr Khan in Data Engineers
Понял, спасибо
источник

K

KGM in Data Engineers
Вводные данные:
- Spark 2.4.4
- Hadoop 2.7

Есть датафрейм DF1raw с колонками (a1, b1, c1, d1, e1, f1) и датафрейм DF2raw с колонками (a2, b2, c2, d2, e2, f2)

Формат ORC

* В DF1raw (40 GB)  8.5 млн строк *
* В DF2raw (8GB)     1.7 млн строк *

val DF1 = DF1raw.repartition('a1, 'b1, 'c1, 'e1)
val DF2 = DF2raw.repartition('a2, 'b2, 'c2, 'e2)

1) val DF3 = DF1.join(DF2, a1 == a2 && b1 == b2 && c1 == c2 && d1 == d2, "left") // это отрабатывает, но пока сложность дальше

* В DF3 (500GB)     320 млрд строк *

2) DF3.withColumn("arr", array('c1, 'e1, 'c2, 'e2).drop("a2", "b2").write(...)  // ушел в бесконечные вычисления — пишет в HDFS, но медленно

3) DF3.groupBy("a1", "b1").agg(collect_set($"arr" as "arrs") // но пока следует понять то, что с пунктом 2
источник

AN

Aleksey Nikolaev in Data Engineers
А почему после джоина количество строк растет на четыре порядка? В дф1 и 2 много дублей по ключам abcd? Спецом так данные устроены? Если так то видится что левую и правую часть можно сперва agg (полностью или частично) и только потом джоин тк объединение и Агрегация идут по одним и тем же колонкам
источник

NN

No Name in Data Engineers
А сам вопрос в чем заключается?
источник
2021 September 16

K

KGM in Data Engineers
Много дублей может быть, да. Данные именно так устроены, да.

А агрегировать Вы что предлагаете — сперва DF1 по abcd, а затем DF2, и только затем джоин?
источник

K

KGM in Data Engineers
Да, забыл добавить самое важное — как ускорить это?
источник

AN

Aleksey Nikolaev in Data Engineers
Да, если задача позволяет так сделать. Ведь сейчас при джоине датасет раздувается чтобы потом снова схлопнуться.
источник

AN

Aleksey Nikolaev in Data Engineers
Ну ускорить стандартно - настройками сессии, добавить коров и памяти, посмотреть нет ли перекоса… но чет кажется что это все ненужно если добиться чтобы джоин на выходе порядка млн строк был
источник

ИК

Иван Калининский... in Data Engineers
DF3 (500Gb,320 млрд) пытались материализовать до создания поля arr? Интересно, что известны размер и кардинальность. И, если он материализован, то дальше нет широких трансформации, всё локально вычисляется и уже точно материализуется. Может есть что-то интересное во write? PartitionBy, например.

И да, советы даны здравые, надо  аггрегировать до join. Минус в том, что будет сложно описать желаемый результат, возможно массивы придётся разбирать и снова собирать. Это нежелательно, надо подумать, как избежать такого
источник

С

Сергей in Data Engineers
Я боюсь, что с таким количеством записей вам вообще диска не хватит, поэтому и пишет долго, что объем ого-го

При 300байт на запись вам уже 100тб диск нужен.

У вас каждый лишний байт данных - уже 320гб весит

+ скорость диска какая?

На c++/go обработка в оперативной памяти займёт чуть более 5-10 минут, без учёта дисковых операций
источник

С

Сергей in Data Engineers
Ускорить можно, но похоже не на спарке, если считать в один проход цикла + 1 вложенный для джойна

Я думаю вполне можно уложиться, если постараться.

+ работать с небольшими кусочками датасета для начала и обрабатывать кусками - в этом случае вы даже распаралелить сможете, на несколько ядер, если оперативки позволит.

И на диск писать уже финальный вариант, после агрегаций, суммирований и т.д.
источник