Size: a a a

2021 August 01

ПФ

Паша Финкельштейн... in Moscow Spark
И чтобы понять в каком значении ты сейчас, надо попарсить не только следующие строчки, но и прошлые
источник
2021 August 02

ЕГ

Евгений Глотов... in Moscow Spark
У спарка есть quotemode, он ищет знаки кавычек тоже
источник

ПФ

Паша Финкельштейн... in Moscow Spark
да, но сколько искать-то? )
источник

ПФ

Паша Финкельштейн... in Moscow Spark
вот ты оказался на строке
aaa,bbb
И даже вот ты знаешь что у тебя всего две колонки в csv
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Но эта строка, как ни странно, может быть часть одного значения!
aaa,"nxstart
aaa,bbb
nxend"
источник

А

Алексей in Moscow Spark
Multiline поумолчанию = false. Т.е. возможно такой файл не будет параллелиться с включенной опцией многострочности
источник

☭S

☭ ⤳ λ Shinkevich... in Moscow Spark
Нет, похоже я не прав..... вернее так - у меня партиции большие были... оно не сплитит файлы, а наоборот, упаковывает.... Т.е. если ты читаешь 10k мелких файлов. но не хочешь, чтобы у тебя было 10k мелких партиций, то ты можешь задать этот параметр, прикинув, с каким размером партиции тебе будет комфортно... При этом еще учитывается spark.sql.files.openCostInBytes - оно плюсуется к размеру файла
источник

ИК

Иван Калининский... in Moscow Spark
csv с переводами строк или сжатый НЕ сплитится
источник

ИК

Иван Калининский... in Moscow Spark
другое дело, что хранить много несжатых csv не очень-то профессионально
источник

ПФ

Паша Финкельштейн... in Moscow Spark
А вот в это я верю 🙂
источник

AV

Alexei Vasilev in Moscow Spark
такой вопрос в связи с последним обсуждением, правильно ли я понимаю, что если локально запускать спарк, то большой csv файл будет обрабатываться меджленней, чем например orc или паркет?
источник

ММ

Максим Мартынов... in Moscow Spark
не локально наверное тоже
источник

AV

Alexei Vasilev in Moscow Spark
нелокально понятно, интересует если локально
источник

NN

No Name in Moscow Spark
Орки и паркеты гораздо умнее, в любом случае читаться спарком будет лучше, чем csv, даже не смотря на сжатие.
Ну и, допустим, локально читаем - это же не означает, что нет параллелизма, можно vcore наплодить.
источник

М

Михаил in Moscow Spark
а у вас spark-livy в контейнере? или на хосте отдельном?
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Мне кажется что правильный ответ, как обычно – "it depends". У локального запуска спарка такие накладные расходы, что чтение файла само по себе будет незаметно. С другой стороны ещё многое зависит опций - выводите ли вы схему, разрешаете ли малтилайн строки и так далее.
источник

AV

Alexei Vasilev in Moscow Spark
будем считать, что настройки дефолтные
источник

ИК

Иван Калининский... in Moscow Spark
Схема и сплит будут выведены в начальном чтении, это не lazy операции. Схема и сплит нужны для валидации плана и DAG выполнения, в том числе в части кодогенерации. Кодогенерация по умолчанию включена не более, чем для ста полей.

Сплит, если кодек сжатия разделяемый (или файл не сжат) и CSV не допускает мультилайны, формируется исходя из размера файлов, количества доступных vcores и параметров «spark.sql.files.maxPartitionBytes», «spark.sql.files.openCostInBytes». Это тут недавно обсуждали. Паркет и ORC обрабатываются точно так же, но они практически всегда разделяемые, ведь сжимаются их отдельные блоки (row group)

До начала чтения спарк не имеет понятия, какие части сплита будут возвращать данные, какие нет, он просто создаёт некоторое количество партиций RDD, разделяя файлы, или наоборот, объединяя их. Если ещё точнее, сначала фильтруются явно указанные партиции HDFS/Hive, файлы разделяются на равные части плюс один остаток (последняя часть файла), сортируются по размеру по убыванию, потом объединяются так, чтобы сумма набора фрагментов и добавляемый фрагмент не превысили допустимого размера. Если размер превышен, то набор фрагментов  «закрывается» и будет прочитан в одну партицию RDD, добавляемый фрагмент становится первым в новом наборе. Очевидно, что сортировка в этом алгоритме делается только по размеру фрагмента, поэтому данные могут быть перемешаны, объединены из разных партиций HDFS, мелкие остатки файлов объединены в одной партиции RDD. Любое упорядочивание, заложенное при сохранении, будет утрачено, информация о партициях Hive/HDFS становится значениями полей RDD (датафрейма). Это будет сделано без учёта схемы, количества полей и даже без учёта размера блока HDFS для любого формата.

По дефолту все поля CSV называются _c{i}, тип данных StringType. Если опция inferSchema=true (default false), то будет выбран семпл всего исходного набора данных опцией samplingRatio (default 1.0, то есть все данные), и сразу произойдёт скан. Колоночные форматы определят схему по «Lucky file», какому-то одному файлу из набора, схема будет прочитана из метаданных формата. В общем, без InferSchema разница во времени выполнения будет несущественной

При вызове Action произойдёт операция с данными. При этом все оптимизации колоночных форматов (векторное чтение, column pruning, фильтры данных при чтении и всё такое) будут доступны, а CSV, насколько я знаю, будет прочитан полностью (все поля) в любом случае, опций фильтра при чтении я в исходниках CSV не увидел. Это не значит, что CSV будет обработан медленнее, потому что если нужны все данные и нужны они, например, для вывода в консоль или строковых преобразований, то ничто не мешает CSV быть обработанным быстрее, чем колоночный формат.

И, может быть важным, текстовый файл очень легко дополнить (не средствами spark, а библиотеками для работы с конкретным форматом, или простыми операциями с файлами), а значит, для append-only операций такой формат может быть лучше. Конечно, в этом случае лучше взять Avro, ведь данные крайне редко должны быть человекочитаемыми. Они могут быть человекоизменяемыми, и даже в этом случае лучше хранить их не в тексте ))

Но если работа производится на кластере, файлов много, к ним осуществляются разные запросы, а количество пользователей тоже немаленькое, то у CSV нет ни одного преимущества перед другими форматами. Как подметил Фёдор Лаврентьев, - «CSV - это бейзлайн по ужасности». Я с ним полностью согласен
источник

CO

Chern Oleksander in Moscow Spark
привет, а не подскажите, как можно отфильтровать по дате в df
например у меня есть list дат
var_dates = ['2021-01-22', '2021-01-04', '2021-01-02']

а хочу получить что-то вроде
df_s3_fin = df_s3.where(df_s3['dates'].isin(var_periods))

но выводит пусто, там данные есть 100000%
источник

AS

Andrey Smirnov in Moscow Spark
А типы совпадают, и там и там строки?
источник