Я на питоне, например я хочу добавить некий метод qwe, который бы внутри себя делал spark.sql(describe formatted таблица)+ вытащить поле location, где spark - это уже созданный контекст и вернуть строку с полным hdfs путем таблицы
для реализации на scala - легко, самому захотелось такое сделать, у меня по старинке обёрнуто в функцию, на пайтон я не умею в этот паттерн
Привет, как получить в спарке пути которые были сгенерены присохранении датафрейма?
можно прочитать датафрейм из сохраненного пути, выполнить .inputFiles - будет получен список файлов по контракту bestEffort (возможно, не все, но спарк постарается) и работать с этим списком Лучше зайти через файловую систему (объектное хранилище, что у вас есть?), получить fileSystem.listFiles(new Path(dir)) и работать с этим правильным и полным списком
можно прочитать датафрейм из сохраненного пути, выполнить .inputFiles - будет получен список файлов по контракту bestEffort (возможно, не все, но спарк постарается) и работать с этим списком Лучше зайти через файловую систему (объектное хранилище, что у вас есть?), получить fileSystem.listFiles(new Path(dir)) и работать с этим правильным и полным списком
Это к сожелению не подходит так как у меня пишется в s3 c "partitionOverwriteMode", "dynamic”
Привет, столкнулся с такой проблемой, может кто подскажет. У меня есть Spark Structured Streaming джоба у которой source=kafka, также есть кастомная udf которую я создаю вот так: spark.udf.register("deserialize", (e: Array[Byte]) => {.....})
Джоба просто считывает с кафки и кладет в дальнейшем в реляционку. Проблема: Локально все очень хорошо работает, но на spark-k8s постоянно выпадает ошибка "Failed to execute user defined function(UDFRegistration$$Lambda$1434/1497411918: (binary) => ....... java.lang.IllegalArgumentException: The value (()) of the type (scala.runtime.BoxedUnit) cannot be converted to struct ....."
Топик, ивент, брокер один и тот во время запуска что на локалке, что в кубере. На кубере запускаю spark mode client.
Привет, столкнулся с такой проблемой, может кто подскажет. У меня есть Spark Structured Streaming джоба у которой source=kafka, также есть кастомная udf которую я создаю вот так: spark.udf.register("deserialize", (e: Array[Byte]) => {.....})
Джоба просто считывает с кафки и кладет в дальнейшем в реляционку. Проблема: Локально все очень хорошо работает, но на spark-k8s постоянно выпадает ошибка "Failed to execute user defined function(UDFRegistration$$Lambda$1434/1497411918: (binary) => ....... java.lang.IllegalArgumentException: The value (()) of the type (scala.runtime.BoxedUnit) cannot be converted to struct ....."
Топик, ивент, брокер один и тот во время запуска что на локалке, что в кубере. На кубере запускаю spark mode client.
А локально запускаете local или тоже сабмитите в свой кластер?