Size: a a a

2021 July 27

С

Сергей in Moscow Spark
проблема java.lang.stackoverflowerror, как понимаю  надо оптимизировать процесс ?
источник

ИК

Иван Калининский... in Moscow Spark
Ну, с такой причиной точно нужно искать логическую ошибку в коде. Может это неправильно примененная функция, может еще какой неправильны mapPartitions
источник

С

Сергей in Moscow Spark
проблема в том что то работает то нет, то есть один раз запускаю отработало, второй раз нет . на одних и тех же данных
источник

S

Snoop Duck in Moscow Spark
Спасибо за наводку) В данных из кафки есть таймстемп событий. Я попробовал переписать джобу следующим образом: spark.read.parquet().repartitionByRange(28, $"ts").sortWithinPartitions($"ts").write.parquet() и получил размер аутпута меньше, чем размер инпута. Надо бы почитать про то, как работает сжатие)
Только теперь почему-то при запуске джобы появился ещё один стейдж, который повторно считывает инпут 🤔
источник

ИК

Иван Калининский... in Moscow Spark
Там есть предварительный проход по RDD для отбора сэмпла. Потом из этого сэмпла локально на драйвере получаются верхние границы для партиций RDD. Так что да, дополнительный стейдж, но он нужный и осознанный хД
источник

S

Snoop Duck in Moscow Spark
Уфф. А как можно узнать, почему какие-то операции (в данном случае repartitionByRange) работают именно так? Разбирать исходники?
источник

ИК

Иван Калининский... in Moscow Spark
Я разбирал исходники, в основном, ещё искал сайты, на которых есть подсказки и разборы. Здесь спрашивал обо всяком. И экспериментировал. Более лёгкого пути я не знаю.

В конечном итоге, это помогло разобраться, как что работает, и переписать под свои нужды. Впрочем, много тонкостей остаётся.
источник

S

Snoop Duck in Moscow Spark
Спасибо. Значит надо пробовать изучать код
источник
2021 July 28

А

Алексей in Moscow Spark
напомните плз, что значит * в спарк плане?
источник

t

tenKe in Moscow Spark
холестейджкодген группы
источник

А

Алексей in Moscow Spark
спс
источник

А

Алексей in Moscow Spark
получается, если так:
    +- *(1) Filter isnotnull(_col17#19)
      +- *(1) ColumnarToRow
         +- FileScan orc

то файл читался отдельной программой, а фильтровался и в строки преобразовывался другой?
источник

А

Алексей in Moscow Spark
а так вместе:
    +- *(1) Filter isnotnull(_col17#19)
      +- *(1) FileScan orc
источник

t

tenKe in Moscow Spark
ну  не программой, а блоком кодосгенерированного кода
источник

t

tenKe in Moscow Spark
но в случае с FileScan это не совсем верно
источник

ИК

Иван Калининский... in Moscow Spark
Больше похоже на то, что в первом случае читался VectorizedReader для орка, а во втором по отдельным записям, не вектором. Или я что-то не так понимаю?
источник

t

tenKe in Moscow Spark
тк сам по себе FileScan - это не кодогенерация, а DataSource API
источник

t

tenKe in Moscow Spark
технически этот оператор может быть объединен с другими в WSCG фазу, но фактически там будет отрабатывать Datasource API и только потом кодогенерация
источник

А

Алексей in Moscow Spark
Сравнивал работу датабриксов на 2,4 спарке с 3,0 и заметил существенную разницу в скорости чтения орков:
2,4
+- *(1) Project [_col0#2 AS check_id#100, _col11#13 AS bic_client#111, _col17#19 AS material#117, _col19#21 AS rt_promo#119, _col27#29 AS rtsaexcust#127, _col28#30 AS rtsaexcusv#128]
   +- *(1) Filter isnotnull(_col17#19)
      +- *(1) FileScan orc [_col0#2,_col11#13,_col17#19,_col19#21,_col27#29,_col28#30,calday#50] Batched: true, DataFilters: [isnotnull(_col17#19)], Format: ORC, Location: InMemoryFileIndex[dbfs:/mnt/exfs/warehouse/rdw.db/pos_rec_itm], PartitionCount: 61, PartitionFilters: [isnotnull(calday#50), (calday#50 >= 20210301), (calday#50 <= 20210431)], PushedFilters: [IsNotNull(_col17)], ReadSchema: struct<_col0:string,_col11:string,_col17:string,_col19:string,_col27:decimal(17,2),_col28:decimal...



number of files read    244
filesystem read data size total (min, med, max)    6.0 GB (22.1 MB, 31.1 MB, 44.0 MB)
scan time total (min, med, max)    15.9 m (2.9 s, 4.8 s, 10.1 s)



3,0
+- *(1) Project [_col0#2 AS check_id#100, _col11#13 AS bic_client#111, _col17#19 AS material#117, _col19#21 AS rt_promo#119, _col27#29 AS rtsaexcust#127, _col28#30 AS rtsaexcusv#128]
   +- *(1) Filter isnotnull(_col17#19)
      +- *(1) ColumnarToRow
         +- FileScan orc [_col0#2,_col11#13,_col17#19,_col19#21,_col27#29,_col28#30,calday#50] Batched: true, DataFilters: [isnotnull(_col17#19)], Format: ORC, Location: InMemoryFileIndex[dbfs:/mnt/exfs/warehouse/rdw.db/pos_rec_itm], PartitionFilters: [isnotnull(calday#50), (calday#50 >= 20210301), (calday#50 <= 20210431)], PushedFilters: [IsNotNull(_col17)], ReadSchema: struct<_col0:string,_col11:string,_col17:string,_col19:string,_col27:decimal(17,2),_col28:decimal...                                      
number of files read    244
filesystem read data size total (min, med, max)    16.3 GiB (60.3 MiB, 85.0 MiB, 119.2 MiB)
scan time total (min, med, max)    33.9 m (6.6 s, 10.7 s, 15.2 s)
источник

А

Алексей in Moscow Spark
у 3,0 нет этой звездочки
источник