Size: a a a

2020 January 14

Н

Никита in Data Engineers
сорян за такой поток мыслей))
источник

Н

Никита in Data Engineers
каждая строка большой html странички от краулера
{"text":"large html"}
Вот хочу отпроцессить (найти различные вхождения в html) его через спарк и записать в таблицу типа
domain     is_shop ... has_phone_number
google.ru  True          True
источник

R

Renarde in Data Engineers
Никита
привет, у меня flume стримит в папку на hdfs, хочу отпроцессить эти данные и затирать то, что прочитал и писать в hive красиво, чтобы не плодить маленькие файлы.
вопрос: как читать файлы в sparke а потом их затирать? Как лучше делать это джобом или спарк стримингом?
Если писать малеьникими rdd через спарк стриминг в хайв, он их потом замерджит или будут маленькьие файлы?
можешь делать следующее:
- читаешь файловый поток через StructuredStreaming
- После каждого батча берешь имена прочитанных файлов и удаляешь их в исходнике
- Данные пишешь в delta -формат, время от времени прогоняешь compaction по записанным данным
источник

Н

Никита in Data Engineers
а как узнать имена прочитаных файлов?
источник

R

Renarde in Data Engineers
в scala как минимум есть функция:

import org.apache.spark.sql.functions.input_file_name
источник

R

Renarde in Data Engineers
она вернет для каждой записи имя файла. можно после батча сделать что-то вроде (псевдокод):
def foreachBatch(some_args): Unit = {
 val df = processBatch(batch)
 val fnames = df.withColumn(“ifn”, input_file_name).select(“input_file_name”).distinct().collect()
 fnames.map(deleteFile)
}
источник

R

Renarde in Data Engineers
Но вообще обычно стараются не удалять исходники (так называемый raw data level) насовсем. Что будет, если твой процессинг радикально поменялся и тебе нужно сделать refeed и заново отчитать все исходные текстовые файлы?
источник

Н

Никита in Data Engineers
ну я могу еще раз прогнать, я просто не хочу процесить старые данные, а только новые поступающие
источник

Н

Никита in Data Engineers
Правильно ли я понимаю, если я перезапущу скрипт со structured streaming, то он начнет работать с самого начала
источник

R

Renarde in Data Engineers
Никита
ну я могу еще раз прогнать, я просто не хочу процесить старые данные, а только новые поступающие
так зачем файлы то удалять после прочтения тогда?
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources
спарк будет автоматически помечать прочитанные файлы, но нужно будет включить checkpointing. Чекпоинт будет хранить информацию о последнем прочитанном файле
источник

Н

Никита in Data Engineers
о, хорошо тогда
источник

R

Renarde in Data Engineers
источник

Н

Никита in Data Engineers
спс
источник
2020 January 15

V

Vlad in Data Engineers
Привет, при сабмите через Livy нескольких spark апликейшенов( около 10), несколько из них падает с ошибкой http 409 конфликт при записи. Может кто-нибудь сталкивался с подобным?
источник

A

Alex in Data Engineers
а какая версия livy?
у меня правда ошибки такой не было и в коде livy про конфликт тоже ничего нету (по крайней мере в версии 0.5.х)
источник

V

Vlad in Data Engineers
Alex
а какая версия livy?
у меня правда ошибки такой не было и в коде livy про конфликт тоже ничего нету (по крайней мере в версии 0.5.х)
источник

EV

Eduard Vlasov in Data Engineers
Всем приветик,
есть задачка обстреливать HTTP API из Flink DataStream, как правильно это реализовать если нужна гарантия доставки at least once? У меня варианты это RichAsyncFunction или кастомный синк
Я пока не понял как это все поведет себя в случае отказа downstream API
источник

EP

Easycore Programming in Data Engineers
Коллеги, такой вопрос, как мне в  спарке выполнить единоразовую операцию на каждом экзекьюторе? Например зарегистрировать jdbc драйвер
источник

EV

Eduard Vlasov in Data Engineers
Easycore Programming
Коллеги, такой вопрос, как мне в  спарке выполнить единоразовую операцию на каждом экзекьюторе? Например зарегистрировать jdbc драйвер
mapPartitions
источник

EP

Easycore Programming in Data Engineers
Да хотел уточнить, что там стриминг и mapPartitions будет регистрировать драйвер на на каждом батче. Или просто использовать флаг, что драйвер уже зарегистрирован?
источник