Size: a a a

2020 December 23

ИК

Иван Калининский... in Data Engineers
Артур Семенов
Я на питоне, например я хочу добавить некий метод qwe, который бы внутри себя делал spark.sql(describe formatted таблица)+ вытащить поле location, где spark - это уже созданный контекст и вернуть строку с полным hdfs путем таблицы
для реализации на scala - легко, самому захотелось такое сделать, у меня по старинке обёрнуто в функцию, на пайтон я не умею в этот паттерн
источник

АС

Артур Семенов... in Data Engineers
Иван Калининский
для реализации на scala - легко, самому захотелось такое сделать, у меня по старинке обёрнуто в функцию, на пайтон я не умею в этот паттерн
Есть пример кода на скале?
источник

ИК

Иван Калининский... in Data Engineers
Артур Семенов
Есть пример кода на скале?
попробую накидать
источник

ИК

Иван Калининский... in Data Engineers
Артур Семенов
Есть пример кода на скале?
источник

ИК

Иван Калининский... in Data Engineers
у меня в тестах заработал сразу
источник

ИК

Иван Калининский... in Data Engineers
тут я никаких зависимостей не добавлял, работать не будет, просто чтобы код сюда не кидать

Может опытные товарищи помогут дальше
источник

АС

Артур Семенов... in Data Engineers
Ну тут все понятно по коду
источник

АС

Артур Семенов... in Data Engineers
Теперь бы на питон переписать
источник

T

T in Data Engineers
Привет, как получить в спарке пути которые были сгенерены присохранении датафрейма?
источник

ИК

Иван Калининский... in Data Engineers
T
Привет, как получить в спарке пути которые были сгенерены присохранении датафрейма?
можно прочитать датафрейм из сохраненного пути, выполнить .inputFiles - будет получен список файлов по контракту bestEffort (возможно, не все, но спарк постарается) и работать с этим списком
Лучше зайти через файловую систему (объектное хранилище, что у вас есть?), получить fileSystem.listFiles(new Path(dir)) и работать с этим правильным и полным списком
источник

T

T in Data Engineers
Иван Калининский
можно прочитать датафрейм из сохраненного пути, выполнить .inputFiles - будет получен список файлов по контракту bestEffort (возможно, не все, но спарк постарается) и работать с этим списком
Лучше зайти через файловую систему (объектное хранилище, что у вас есть?), получить fileSystem.listFiles(new Path(dir)) и работать с этим правильным и полным списком
Это к сожелению не подходит так как у меня пишется в s3 c "partitionOverwriteMode", "dynamic
источник

ИК

Иван Калининский... in Data Engineers
T
Это к сожелению не подходит так как у меня пишется в s3 c "partitionOverwriteMode", "dynamic
Не могу помочь(
источник

T

T in Data Engineers
Самое просто это коненчо сделать что то типа
df.select(partitionColumns.head, partitionColumns.drop(1):_*).distinct.collect()

Но в этом способе не нравится что получается что по дата фраему над будет пробегаться 2 раза: 1 при сохранении, 2 чтобы вытащить колонки.

Есть идея прокидывать аккумулятор но это же выглядит не красиво
источник

b

barracuda in Data Engineers
Курсы посоветуйте...закончил бесплатный Яндекс практикум по питону,но отзывы по платному негативные
источник

ИК

Иван Калининский... in Data Engineers
Артур Семенов
Теперь бы на питон переписать
в принципе, можно заэкстендить SparkSession и в проекте пользоваться этим потомком
источник

АС

Артур Семенов... in Data Engineers
Иван Калининский
в принципе, можно заэкстендить SparkSession и в проекте пользоваться этим потомком
Вот сейчас пробую
источник

DM

Dave Manukian in Data Engineers
Привет, столкнулся с такой проблемой, может кто подскажет. У меня есть Spark Structured Streaming джоба у которой source=kafka,
также есть кастомная udf которую я создаю вот так:  
spark.udf.register("deserialize", (e: Array[Byte]) => {.....})

Джоба просто считывает с кафки и кладет в дальнейшем в реляционку.
Проблема: Локально все очень хорошо работает, но на spark-k8s постоянно выпадает ошибка
"Failed to execute user defined function(UDFRegistration$$Lambda$1434/1497411918: (binary) => ....... java.lang.IllegalArgumentException: The value (()) of the type (scala.runtime.BoxedUnit) cannot be converted to struct ....."

Топик, ивент, брокер один и тот во время запуска что на локалке, что в кубере. На кубере запускаю spark mode client.
источник

e

er@essbase.ru in Data Engineers
Уязвимость в Apache Airflow, допускающая использование одного сеанса на  разных серверах https://opennet.ru/54298/
источник

GT

Gennady Timofeev in Data Engineers
Dave Manukian
Привет, столкнулся с такой проблемой, может кто подскажет. У меня есть Spark Structured Streaming джоба у которой source=kafka,
также есть кастомная udf которую я создаю вот так:  
spark.udf.register("deserialize", (e: Array[Byte]) => {.....})

Джоба просто считывает с кафки и кладет в дальнейшем в реляционку.
Проблема: Локально все очень хорошо работает, но на spark-k8s постоянно выпадает ошибка
"Failed to execute user defined function(UDFRegistration$$Lambda$1434/1497411918: (binary) => ....... java.lang.IllegalArgumentException: The value (()) of the type (scala.runtime.BoxedUnit) cannot be converted to struct ....."

Топик, ивент, брокер один и тот во время запуска что на локалке, что в кубере. На кубере запускаю spark mode client.
А локально запускаете local или тоже сабмитите в свой кластер?
источник

A

Alex in Data Engineers
ну бывает
источник