Size: a a a

2021 February 19

A

Alex in Data Engineers
да, нашёл
в этом случае именно page, так как примерно ту же функцию выполняет
источник

A

Alex in Data Engineers
на каждый пейдж мин/макс и тд
источник

A

Alex in Data Engineers
я не знаю случая чтобы страницу читали с середины
источник

A

Andrey in Data Engineers
K S
Проведите ликбез для меня 😁
В Pyspark ищу эффективный способ фильтрации супербольшого датафрейма big_df по полю из другого датафрейма (небольшого или среднего размера- до 10 тысяч записей) medium_df.
Оба датафрейма вытягиваются из паркет файлов из S3, никаких баз данных.

Моя идея перевести поле pids=medium_df.select("product_id") в список, потом этот список broadcast и потом фильтровать по нему
big_df.select(col1,col2,col3).where(big_df.product_id.isin(pids)). Будет ли это быстрее left join?

Хочется понять насколько хорош оптимизатор запросов и сможет ли он уменьшить количество shuffles.
да, будет. Так и делай, распредели еще данные первого датасета по воркерам равномерено
источник

ПФ

Паша Финкельштейн... in Data Engineers
Alex
да, нашёл
в этом случае именно page, так как примерно ту же функцию выполняет
Спасибо!
источник

KS

K S in Data Engineers
Andrey
да, будет. Так и делай, распредели еще данные первого датасета по воркерам равномерено
Понятно, спасибо. Я интуитивно понимаю, что сравнения на нодах будут локальные, а в случае join скорее всего будет траффик между нодами, и это медленнее, но просто хотел подтверждение, что catalyst (или tungsten) ещё не дошёл до таких оптимизаций.
источник

A

Andrey in Data Engineers
K S
Понятно, спасибо. Я интуитивно понимаю, что сравнения на нодах будут локальные, а в случае join скорее всего будет траффик между нодами, и это медленнее, но просто хотел подтверждение, что catalyst (или tungsten) ещё не дошёл до таких оптимизаций.
catalyst все еще глупый ) не стоит на него надеяться
источник

N

Nail in Data Engineers
K S
Проведите ликбез для меня 😁
В Pyspark ищу эффективный способ фильтрации супербольшого датафрейма big_df по полю из другого датафрейма (небольшого или среднего размера- до 10 тысяч записей) medium_df.
Оба датафрейма вытягиваются из паркет файлов из S3, никаких баз данных.

Моя идея перевести поле pids=medium_df.select("product_id") в список, потом этот список broadcast и потом фильтровать по нему
big_df.select(col1,col2,col3).where(big_df.product_id.isin(pids)). Будет ли это быстрее left join?

Хочется понять насколько хорош оптимизатор запросов и сможет ли он уменьшить количество shuffles.
Причем тут фильтрация и left join? Left join не про фильтрацию. Или я что-то не понимаю?
источник

N

Nail in Data Engineers
И почему это быстрее чем broadcast join?
источник

NN

No Name in Data Engineers
Nail
Причем тут фильтрация и left join? Left join не про фильтрацию. Или я что-то не понимаю?
Вероятно, коллега интересовался, что быстрее - джойнить по ключам и получить sortMerge join, или же забродкастить питоновский объект и фильтрануть по isin. Или же, может быть, хотел сравнить broadcast join и isin после бродкаста. В первом случае, скорее всего, isin будет лучше. А во втором уже все не так очевидно, но, в теории, если лист небольшой, то лучше фильтровать по нему, а если большой - превратить в датафрейм и сделать бродкаст джойн. Но я не делал тестов, это гипотетически.
источник

t

tenKe in Data Engineers
чето я сомневаюсь, что запихнуть 1000 элементов в .isin лучше, чем сделать обычный джойн с бродкастом
источник

t

tenKe in Data Engineers
ибо вот:
/* 024 */   protected void processNext() throws java.io.IOException {
/* 025 */     while (inputadapter_input_0.hasNext() && !stopEarly()) {
/* 026 */       InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
/* 027 */       do {
/* 028 */         long inputadapter_value_0 = inputadapter_row_0.getLong(0);
/* 029 */
/* 030 */         byte filter_inTmpResult_0 = -1;
/* 031 */         if (!false) {
/* 032 */           filter_inTmpResult_0 = 0;
/* 033 */           long filter_valueArg_0 = inputadapter_value_0;
/* 034 */           do {
/* 035 */             if (false) {
/* 036 */               filter_inTmpResult_0 = -1; // filter_isNull_0 = true;
/* 037 */             } else if (filter_valueArg_0 == 1L) {
/* 038 */               filter_inTmpResult_0 = 1; // filter_isNull_0 = false; filter_value_0 = true;
/* 039 */               continue;
/* 040 */             }
/* 041 */
/* 042 */             if (false) {
/* 043 */               filter_inTmpResult_0 = -1; // filter_isNull_0 = true;
/* 044 */             } else if (filter_valueArg_0 == 2L) {
/* 045 */               filter_inTmpResult_0 = 1; // filter_isNull_0 = false; filter_value_0 = true;
/* 046 */               continue;
/* 047 */             }
/* 048 */
/* 049 */             if (false) {
/* 050 */               filter_inTmpResult_0 = -1; // filter_isNull_0 = true;
/* 051 */             } else if (filter_valueArg_0 == 3L) {
/* 052 */               filter_inTmpResult_0 = 1; // filter_isNull_0 = false; filter_value_0 = true;
/* 053 */               continue;
/* 054 */             }
/* 055 */
/* 056 */             if (false) {
/* 057 */               filter_inTmpResult_0 = -1; // filter_isNull_0 = true;
/* 058 */             } else if (filter_valueArg_0 == 4L) {
/* 059 */               filter_inTmpResult_0 = 1; // filter_isNull_0 = false; filter_value_0 = true;
/* 060 */               continue;
/* 061 */             }
/* 062 */
/* 063 */             if (false) {
/* 064 */               filter_inTmpResult_0 = -1; // filter_isNull_0 = true;
/* 065 */             } else if (filter_valueArg_0 == 5L) {
/* 066 */               filter_inTmpResult_0 = 1; // filter_isNull_0 = false; filter_value_0 = true;
/* 067 */               continue;
/* 068 */             }
/* 069 */
/* 070 */           } while (false);
/* 071 */         }
/* 072 */         final boolean filter_isNull_0 = (filter_inTmpResult_0 == -1);
/* 073 */         final boolean filter_value_0 = (filter_inTmpResult_0 == 1);
/* 074 */         if (!filter_value_0) continue;
/* 075 */
/* 076 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 077 */
/* 078 */         filter_mutableStateArray_0[0].reset();
/* 079 */
/* 080 */         filter_mutableStateArray_0[0].write(0, inputadapter_value_0);
/* 081 */         append((filter_mutableStateArray_0[0].getRow()));
/* 082 */
/* 083 */       } while(false);
/* 084 */       if (shouldStop()) return;
/* 085 */     }
/* 086 */   }
источник

t

tenKe in Data Engineers
это кусок кода, который генерируется на spark.range(10).localCheckpoint.filter('id.isin(1,2,3,4,5))
источник

t

tenKe in Data Engineers
это еще не учитывая, что эти 1000 элементов надо сколлектить, а если это еще в питоне, то все еще веселее
источник

NN

No Name in Data Engineers
tenKe
это еще не учитывая, что эти 1000 элементов надо сколлектить, а если это еще в питоне, то все еще веселее
Собсна, о чём я и рассуждал. А ты взял и проверил. Человечище!
источник

K

KrivdaTheTriewe in Data Engineers
tenKe
чето я сомневаюсь, что запихнуть 1000 элементов в .isin лучше, чем сделать обычный джойн с бродкастом
Можно юдф сделать на скале
источник

NN

No Name in Data Engineers
KrivdaTheTriewe
Можно юдф сделать на скале
А надо ли? Сколько оно реально времени сэкономит по сравнению с бродкаст джойном?
источник

ИК

Иван Калининский... in Data Engineers
Nail
Причем тут фильтрация и left join? Left join не про фильтрацию. Или я что-то не понимаю?
Имелся в виду left semi join, он эквивалентен условию in (или exists) в SQL. Для фильтрации отлично подходит
источник
2021 February 20

ИК

Иван Калининский... in Data Engineers
K S
Понятно, спасибо. Я интуитивно понимаю, что сравнения на нодах будут локальные, а в случае join скорее всего будет траффик между нодами, и это медленнее, но просто хотел подтверждение, что catalyst (или tungsten) ещё не дошёл до таких оптимизаций.
Единственная альтернатива бродкасту, которую я могу представить для целых чисел - это фильтр по битовому индексу. Но готовое решение не могу предложить, если кто-то видел или сам делал - буду благодарен за ссылку
источник

AS

Andrey Smirnov in Data Engineers
tenKe
чето я сомневаюсь, что запихнуть 1000 элементов в .isin лучше, чем сделать обычный джойн с бродкастом
а бродкаст с неба упадет, его еще надо раскидать по экзекютерам, а дальше возможно будет такая-же портянка
источник