Size: a a a

2019 September 27

EN

Eldar Nezametdinov in Data Engineers
Окей
источник

EN

Eldar Nezametdinov in Data Engineers
Спасибо 👌
источник
2019 September 28

神風 in Data Engineers
tenKe
не, триггер всего лишь запускает функцию getBatch, когда getOffset возвращает не None
Не, я вот про этот df.writeStream .format("console") .trigger(Trigger.ProcessingTime("2 seconds"))
источник

神風 in Data Engineers
Этот конкретно пишет в консоль каждые 2 секунды
источник

神風 in Data Engineers
Да и на hdfs, когда я вычитал весь топик стал писать мелкие паркеты раз в минуту, как оговорено. Но вот до этого много миллионов записей он 30 минут жевал джобом #0. Зато теперь, когда лага почти нет в спарковском гуе я вижу каждую минуту +1 джоб и соответственно файлики на hdfs.
источник

神風 in Data Engineers
Stanislav
аккуратно с этим
при уменьшении количества свободного места до 10% ярна встанет
а для хдфса вроде и норм быть настолько заполненным
Поймали такое буквально вчера, но на другом кластере. Спасибо за предупреждение)
источник

t

tenKe in Data Engineers
神風
Не, я вот про этот df.writeStream .format("console") .trigger(Trigger.ProcessingTime("2 seconds"))
и я про этот)
источник

神風 in Data Engineers
tenKe
и я про этот)
Тогда нифига не понял. В какой момент происходит запись на hdfs батча?
источник

t

tenKe in Data Engineers
神風
Тогда нифига не понял. В какой момент происходит запись на hdfs батча?
источник

A

Alex in Data Engineers
神風 все же просто:

Да, тригерит каждые 2 минуты, вычитать всё что набежало

На холодном старте у вас в топике уже набралось много и он пытается прожевать за один раз (от последнего комита до now, в итоге процессинг может занять много времени и выйти за интервал).

Не вызывает вопросов если знать как это работает:

1) Драйвер раз в ваш указанный интервал запускается и смотрит последний процешшенный id и текущий, есть что-то есть забрасывает таск в котором указано Start offset и end offset.

2) на ui вы увидете размер этого батча и в какой сколько уйдёт

3) если какой-то начинает долго процессится, то появляется очередь и вы начинаете батч писать в прошлое, так как процессинг уже давно запланирован был, вы не успеваете это делать

4) если в момент отставания вы сделаете рестарт то вся очередь задач теряется и в первый же батч может очень много

5) рулится параметрами тротлинга на кафку, сколько максимум в одну партицию за раз можно скедулить (например "не более 1000 message/s", это не значит что вы будете непрерывно читать с этой скоростью, а лишь что в 2 мин интервал вам влетить 120*1000 сообщений с партиции)

6) но все равно помнится был баг именно на холодный старт и большой объем, вроде уже фиксили
источник

A

Alex in Data Engineers
То есть считайте что тригер по процессинг тайму это такой тригер который расставляет флажки говорящие "работай от предыдуще флажка до вот этого последнего"

Это не значит: процессим непрерывно, пришло 2 минуты, флашнули что есть начали следующий интервал

Скедулер нарезающий задачи и воркеры независимы
источник

神風 in Data Engineers
Спасибо за развёрнутый ответ!
источник
2019 September 29

СХ

Старый Хрыч in Data Engineers
кстати тут же кто-то хвастался что спарк воркеры в кубере гоняет
источник

ТС

Тимофей Смирнов in Data Engineers
Старый Хрыч
кстати тут же кто-то хвастался что спарк воркеры в кубере гоняет
мы гоняем, даже pyspark
источник

СХ

Старый Хрыч in Data Engineers
Тимофей Смирнов
мы гоняем, даже pyspark
и как?
источник

СХ

Старый Хрыч in Data Engineers
там чего не коснись всё эксперементал
источник

ТС

Тимофей Смирнов in Data Engineers
вроде стабильно работает
источник

СХ

Старый Хрыч in Data Engineers
Тимофей Смирнов
вроде стабильно работает
а бывало, что спарк воркеру надо больше данных сожрать чем оперативы выделенной воркеру?
источник

СХ

Старый Хрыч in Data Engineers
точнее контейнеру с воркером
источник

ТС

Тимофей Смирнов in Data Engineers
не совсем понял, но если указать spark.executor.memory=27g то по факту сожрется где-то 40
источник