Size: a a a

2021 September 16

K

KGM in Data Engineers
Спасибо

Да, агрегация помогла, но больше помог ещё один из фильтров, который перенесён был в сырые ещё DF1raw и DF2raw
источник

K

KGM in Data Engineers
Спасибо.

Я пробовал разные способы: и партиционирование на диск (partitionBy), и даже бакетинг, но все длилось невероятно медленно. Пока что помог фильтр до агрегации, агрегация DF1 и DF2 до джоина, и наконец-то джоин DF1 с DF2. Да, действительно, пришлось «разбирать» и «собирать» массивы, но эти операции, как оказалось, не столь «прожорливы».

По факту, здесь ведётся работа с одним датафреймом (почти 21 ТБ данных), правильно записанный на диск, без перекоса, который разбивается на два «кластера» (DF1raw и DF2raw), по условиям, и затем уже происходит «слив» воедино.
источник

K

KGM in Data Engineers
Спасибо

Диски хорошие (NVME, то есть 3ГБ на чтение и 2ГБ на запись, как правило), однако сложность, действительно в том, что данных очень много.
источник

С

Сергей in Data Engineers
3Гбит/сек?
источник

K

KGM in Data Engineers
Именно так и пробовал: брал небольшие куски данных, работал над ними, затем писал на диск. Однако, даже в таком случае, выигрыша в производительности не наблюдалось, ибо общее время вычислений, стало меньше, конечно, но все равно долго.
источник

ИК

Иван Калининский... in Data Engineers
Хорошо, что получилось.

Партиционирование в стиле hive и бакетинг не обязательно помогают, у них ограниченная область применения. Проблема спарка в том, что ему доступно очень немного информации, поэтому почти каждую операцию приходится выполнять «с нуля». Но если его научить хорошему, то пайплайн взлетит.

Ну и спарк 3 я ещё не пытался, но если даже завезли нормальную cost model, то проблема сбора и хранения статистик остаётся.
источник

С

Сергей in Data Engineers
Я просто не знаю как в спарке, но

когда например у тебя 4 переменных = ты по ним джойнишь - что происходит в спарке?

1. либо он генерит код типа if(a1=a2) if(b1=b2) if(c1=c2) ...
2. Либо он сразу бежит по индексу в нужное место struct[a1][b1][c1] = [... joined rows]
— и в этом случае тебе кстати гораздо проще агрегаты строить

Что происходит? // это вопрос - я не знаю.

Если первый случай - то тут конечно будет сильно влиять отсортированы данные или нет, и скажем сокращение дата-сета - так же сократит елозанье

Если строит индекс df[a1][b1][c1] - с этим гораздо проще - у нас там внутри уже все данные
И нам в принципе тогда достаточно - чтобы первый датасет был отсортирован ( например df1 - самый большой, по нему ползаем, а df2 - например попробовать загнать в индекс и держать в памяти даже - там 8гб надеюсь влезут )

Так же есть еще нюанс типа -

k1 = f(a1,b1,c1)
k2 = f(a2,b2,c2)

Когда строишь композитный ключ, в этом случае - с индексом или нет - у тебя сокращается количество проверок банально
Одна проверка занимает копейки, но при большом повторении - оно тоже требует времени в общем

Но если у тебя сразу индекс строится, а с твоим диском все первичное чтение займет около 3 минут.
т.е. это я бы сказал даже почти моментально. ( 40гб+8гб)

Блин, диск царский конечно, если он действительно 3Гбита дает - это прям вообще супер фантастически, чет мне раньше казалось, что NVme не сильно от SSD отличаются по скорости - тоже около 300-500мбит где-то.

ПС, - и кстати забыл, что ты можешь еще начать с расчета агрегатов - по обоим структурам, и только после этого эти композиты совмещать и джойнить
источник

ИК

Иван Калининский... in Data Engineers
Спарк не то же самое, что реляционная база данных, которая возникает на кластере по первому запросу пользователя. Для джоина нужно отсортировать данные или по одному отношению (хорошо, если по меньшему)) построить хэш-таблицу. Чего-то, похожего на b-tree или bitmap индекс там не было и нет. Так что, скорее первый вариант, не столь наивный, но и не идеал ((
источник

С

Сергей in Data Engineers
Под индексами в данном случае я больше понимал, типа multidimensional array которые используются во всех языках ( с хешом или без )

Например:

array[x][y] = some_data

Ну как раз первый вариант - с ифам, который, он как раз самый наивный.

В том смысле что если вторая таблица, скажем 10 записей содержит, если мы ей джойним, то на каждую итерацию первой таблицы - нужно каждый раз по 10 записей второй итерировать.

Итого, если в первой таблице, 100 записей, с 10 итерациями для джойна = 1000 обращений ( Это в случае с IF-ами )

В то время если объединять через индекс (массив), то тут достаточно только 100 + 10 итераций суммарно = 110 обращений ( на 890 проходов меньше )

Ну и конечно при таком подходе, с ростом записей - профит от таких приемов будет более заметным, главное чтобы индексы(массивы) шустро работали.

Помню один запрос из базы долго работал, пришлось несколько вложенных циклов, с такими ифами писать. После подобных упражнений начинаешь лучше понимать, что происходит с базой во время запроса и чо она вообще там делает.
источник

ИК

Иван Калининский... in Data Engineers
То, что "с ифами" - это Nested Loops, вложенные циклы. В общем случае может применяться, если в условии join не простое равенство, какой nonequity предикат. В самой наивной реализации даёт квадратичную сложность (m*n), поэтому каждый оптимизатор стремится избавиться от этого соединения.

Объединение "через индекс" - это, как я понимаю Sort Merge Join, линейная сложность самого соединения (m+n), однако наборы данных нужно отсортировать, а это лог-линейная сложность. Всё лучше, чем квадратичная. Если данные отсортированы заранее, то это самый лучший вариант.

Или можно построить хеш-таблицу для меньшего (или менее распределенного, много тонкостей) отношения. Получится (Broadcast) Hash Join, данные можно не сортировать, тоже линейная сложность.

Хорошие оптимизаторы также сделают много, чтобы уменьшить соединяемые наборы данных, например, добавят Semi Join, которого не было в самом запросе. И даже динамические приёмчики могу пойти в ход, если полезная информация стала известной только во время выполнения запроса.

Но спарковский Catalyst основан на правилах (rule based optimizer). Недавно выясняли, уберёт он из датафрейма поля, если они не нужны, или будет таскать по всем шафлам и дропнет в последний момент. Я даже сам засомневался, а спарк убрал все ненужные поля. А это всего лишь отработала эвристика, нужная и хорошая, но не зависящая от самих данных.

Может что-то поменялось в третьем спарк, пока не могу сказать. Но пока приходится вводить дополнительные правила-эвристики, если существующие чем-то не устраивают, это может быть непросто и порождает код, техдолг и баги
источник

A

Alex in Data Engineers
да вроде как cost based туда уже завезли
не помню в какой версии

huawei был основным контрибьютором

конечно вопросы к расчёту костов, но в зачаточном виде точно есть
источник

A

Alex in Data Engineers
https://docs.databricks.com/spark/latest/spark-sql/cbo.html

To get the full benefit of the CBO it is important to collect both column statistics and table statistics. Statistics can be collected using the Analyze Table command.
источник

A

Alex in Data Engineers
вопрос что не у всех есть статистика по таблицам, чаще её нету =\
источник

A

Alex in Data Engineers
источник

С

Сергей in Data Engineers
😍😍😍
источник

С

Сергей in Data Engineers
А вообще спарк как либо компилируется? или хотя бы какой нибудь run time компилятор там есть?
Или каждый раз на лету все считает?
источник

A

Alex in Data Engineers
что значит компилируется
источник

С

Сергей in Data Engineers
в байт-код хотя бы, он джавовый я так понимаю
источник

A

Alex in Data Engineers
по плану запроса происходит генерация java классов
они на драйвере проверяются что компилируются
отравляются сорцы на воркеры
воркеры повторно компилируют каждый у себя
сетап в класпас и вызов кода
дальше уже приходит jvm jit и делает из них машинные инструкции
источник

A

Alex in Data Engineers
пачка операций может схлопнуться в 1 класс
источник