Size: a a a

2022 January 14

N

Nick in Moscow Spark
Есть таблица(более 1млрд+ пополняется) с данными товаров, есть классификатор "категорий"(грубо хорошо продается, плохо и т.д.) который крутиться на другой машинке и доступен через рест. Надо добавить категории к существующей таблице по товарам, сама таблица строится через спарк. * со временем категории могут обновляться(классификатор дообучивают раз в неделю на выходных) и надо также учесть, что в будущем данные придется апдейтить, если поменяются категории (это видно из статистики по категориям, которую потом собирают, до этого момента понятно мы не знаем, что у кого-то товара поменялась категория)
источник

N

Nick in Moscow Spark
Коллеги, поделились своими соображениями на сколько они меня поняли. Им за это спасибо, буду дальше думать пока есть время. Мне то больше и не надо, хорошо иметь несколько способов решения задачи
источник

k

kvadratura in Moscow Spark
я так понял, это, в общем, dimension, пусть и сушественных размеров. если его еще надо апдейтить, я бы подумал над такой архитектурой:

- берем что есть, один раз прогоняем через api, кладем в KV какой-нибудь. cassandra, scilla, hbase, mongo.. при апдейте посылаем event и обновляем запись в этом KV
источник

k

kvadratura in Moscow Spark
я думаю, это лучше, чем раз в N часов делать фулл скан. может же так быть, что за сутки только 1 запись из 1 млрд поменяется, я верно понимаю?
источник

k

kvadratura in Moscow Spark
а обновлять output классификатора можно не через рест, а при деплое новой модели. offilne то бишь. и тогда не трогаем endpoint вообще, просто запускаем обычный жойн с датасетом, который коллеги, кто модель обучил, положат куда-то
источник

k

kvadratura in Moscow Spark
тех лид(ы) у вас есть? кто-то, кто знает существующий стек
источник

N

Nick in Moscow Spark
Оверхед для нас, уже есть хайв там как раз и лежит таблица, реал тайм не нужен, достаточно будет сделать 200 штук равномерных партиций (тут надо смотреть на кол-во и размер данных) и перезаписывать крохи будет проще, чем ставить и админить дополнительную бд для одной задачи
источник

N

Nick in Moscow Spark
Spark, hive, hdfs. Единичных записей нет , если это не что-то новенькое. Например мука, есть не одно название и не один магазин который ее продает. Но у этого товара будет одна категория. Поэтому при смене категории в 90% затронет больше одной записи
источник
2022 January 16

BM

Boris Malaichik in Moscow Spark
S3a commiters можно использовать
Реализация имитирует атомарный коммит в s3 посредством API

https://hadoop.apache.org/docs/r3.1.1/hadoop-aws/tools/hadoop-aws/committers.html

У себя использую magic commiter
Он делает multipart upload и потом на комит шлёт апи запрос делая файл «видимым»

Там есть другая реализация, описано хорошо сравнение и кейсы использования
источник

BM

Boris Malaichik in Moscow Spark
По дефолту - нет. Коммит в s3 это будет клонирование директории + удаление старой, не атомарно
источник

v

v in Moscow Spark
Всем привет. Подскажите, как решить проблему.
Помогаю дебажить скрипт на Pyspark, который отрабатывает на Google Colab. Да, может не лучший вариант, но какой есть.
Если ограничить датасет с помощью limit - прорабатывает нормально.
При повышении размера данных - начинает отваливаться с ошибкой java.lang.OutOfMemoryError: Java heap space.
Раньше все работало, ошибка появилась после убирания одного из фильтров, который раньше отбрасывал часть значений.
Стектрейс - https://pastebin.com/HgkMR58x

Логика кода - делается практически кросс джойн двух таблиц, пары фильтруются по ряду признаков, затем оставшиеся пары для каждого ключа сортируются по ряду параметров и выбирается лучший.
План выполнения (с поставленными лимитами) - https://pastebin.com/3FguXCJ4

Сам код.
https://pastebin.com/v9bHbyik

Подскажите, что можно сделать чтобы скрипт отработал, не вываливаясь? Скорее всего нужно поиграться с количеством partition-ов, но я не очень хорошо разбираюсь в спарке и мне даже сложно понять в какую сторону. Идеальный вариант - спарк грузит кусок маленький датасета, делает джойны и агрегации с другими таблицами, обрабатывает, записывает результаты на диск, идет дальше. Поскольку это не кластер, излишне параллелить смысла не имеет.
источник

v

v in Moscow Spark
Возможно проблема в том, что я не выделял spark.driver.memory.
Спарк работает в local mode, значит надо работать с driver.memory. По умолчанию он всего 1гб, что ОЧЕНЬ мало.
Попробую через spark = SparkSession.builder.master("local[*]").config("spark.driver.memory", '10g').getOrCreate()
источник

ЕГ

Евгений Глотов... in Moscow Spark
Сделать df.groupBy("assignment_id").count().orderBy(desc("count")).limit(10).toPandas()
У обоих датафреймов при джойне, глянуть насколько раздувается один ключ в результате
источник

GP

Grigory Pomadchin in Moscow Spark
в таком случае просто мало памяти
источник

ЕГ

Евгений Глотов... in Moscow Spark
И ещё там оконка есть по meeting_subject, тоже одна группа может слишком большая быть
источник

GP

Grigory Pomadchin in Moscow Spark
а вообще надо посмотреть что с памятью на ехекуторах прежде чем чето делать анверное (в данном случае ехекутор это драйвер)
в момент перед x
источник

ЕГ

Евгений Глотов... in Moscow Spark
Ну плюс да 512мб оперативки хватит не всем
источник

v

v in Moscow Spark
Вроде говорят надо настроить до запуска JVM, т.е не в конструкторе Sparksession, а в конфиге spark.conf
источник

ЕГ

Евгений Глотов... in Moscow Spark
Из питона можно в конструкторе настроить
источник

ЕГ

Евгений Глотов... in Moscow Spark
Из джавы/скалы нельзя, так как приложение и есть драйвер😆
источник