Size: a a a

2020 April 22

DY

Dmitriy Yampolskiy in Moscow Spark
Andrey Siunov
@yampolson а какие схемы у этих dataframe? IIRC, union сопоставляет столбцы не по имени, а в том порядке, в котором они даны в схеме.
Спасибо, тогда все понятно
источник
2020 April 24

VA

Vladislav Akatov in Moscow Spark
У кого есть опыт перехода от других аналических систем к использованию Spark? Поделитесь опытом, например, как долго проходила миграция, какие основные трудности, в какой момент поняли, что так больше нельзя и надо переходить на distributed system; как принимали решение между платформами? Что читали, есть ли какой-то гайд известный/книга о том, как строить это дело с нуля?
источник
2020 April 26

DC

Dany Chepenko in Moscow Spark
На scala пишу просетнькую модель для xgboost
Почему то отдельно категориальные и численные фичи подаю все работает, как только пытаюсь объединить их в пайплайн, все падает.
private def pipeline: Pipeline = {
 val cat_features = getFeaturesByType(this.taskConfig, "categorical")
 val num_features = getFeaturesByType(this.taskConfig, "numeric")
 val catIndexer = cat_features.map {
   feature => new StringIndexer().setInputCol(feature).setOutputCol(feature + "_index")
 }
 val cat_features_index = cat_features.map {
   (feature: String) => feature + "_index"
 }
 this.modelFeatures = cat_features_index ++ num_features
 val catFeatureAssembler = new VectorAssembler()
   .setInputCols(cat_features_index)
   .setOutputCol("cat_features")
 val numFeatureAssembler = new VectorAssembler()
   .setInputCols(num_features)
   .setOutputCol("num_features")
 val featureAssembler = new VectorAssembler()
   .setInputCols(Array("cat_features", "num_features"))
   .setOutputCol("features")
 val pipelineStages = catIndexer ++
   Array(catFeatureAssembler, numFeatureAssembler, featureAssembler)
 TaskLogger.INFO(pipelineStages.toList.toString())
 new Pipeline().setStages(pipelineStages)
}
private def estimator(paramMap: Map[String, Any]): XGBoostEstimator = {
 new XGBoostEstimator(paramMap)
   .setFeaturesCol("features")
   .setLabelCol("indexed_label")
}

Падает при вызове этой функции
val preprocessed = this.pipelineModelTransform(dataset)
this.estimatorModel = Some(xgbEstimator.fit(preprocessed))

с ошибкой SparkContext has been shutdown

куда можно смотреть?
источник

R

Renarde in Moscow Spark
Dany Chepenko
На scala пишу просетнькую модель для xgboost
Почему то отдельно категориальные и численные фичи подаю все работает, как только пытаюсь объединить их в пайплайн, все падает.
private def pipeline: Pipeline = {
 val cat_features = getFeaturesByType(this.taskConfig, "categorical")
 val num_features = getFeaturesByType(this.taskConfig, "numeric")
 val catIndexer = cat_features.map {
   feature => new StringIndexer().setInputCol(feature).setOutputCol(feature + "_index")
 }
 val cat_features_index = cat_features.map {
   (feature: String) => feature + "_index"
 }
 this.modelFeatures = cat_features_index ++ num_features
 val catFeatureAssembler = new VectorAssembler()
   .setInputCols(cat_features_index)
   .setOutputCol("cat_features")
 val numFeatureAssembler = new VectorAssembler()
   .setInputCols(num_features)
   .setOutputCol("num_features")
 val featureAssembler = new VectorAssembler()
   .setInputCols(Array("cat_features", "num_features"))
   .setOutputCol("features")
 val pipelineStages = catIndexer ++
   Array(catFeatureAssembler, numFeatureAssembler, featureAssembler)
 TaskLogger.INFO(pipelineStages.toList.toString())
 new Pipeline().setStages(pipelineStages)
}
private def estimator(paramMap: Map[String, Any]): XGBoostEstimator = {
 new XGBoostEstimator(paramMap)
   .setFeaturesCol("features")
   .setLabelCol("indexed_label")
}

Падает при вызове этой функции
val preprocessed = this.pipelineModelTransform(dataset)
this.estimatorModel = Some(xgbEstimator.fit(preprocessed))

с ошибкой SparkContext has been shutdown

куда можно смотреть?
Шатдаун это конечная ошибка, приложение падает где-то раньше
источник

R

Renarde in Moscow Spark
И вообще питоновая версия хгбуста принимала только числовые фичи, насколько я помню
источник

DC

Dany Chepenko in Moscow Spark
Exception in thread "main" java.lang.IllegalStateException: SparkContext has been shutdown

   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2021)

   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)

   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)

   at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)

   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

   at org.apache.spark.rdd.RDD.take(RDD.scala:1327)

   at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1368)

   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

   at org.apache.spark.rdd.RDD.first(RDD.scala:1367)

   at ml.dmlc.xgboost4j.scala.spark.XGBoost$.ml$dmlc$xgboost4j$scala$spark$XGBoost$$postTrackerReturnProcessing(XGBoost.scala:393)

   at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4.apply(XGBoost.scala:356)

   at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4.apply(XGBoost.scala:337)

   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

   at scala.collection.immutable.List.foreach(List.scala:381)

   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

   at scala.collection.immutable.List.map(List.scala:285)

   at ml.dmlc.xgboost4j.scala.spark.XGBoost$.trainDistributed(XGBoost.scala:336)

   at ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator.train(XGBoostEstimator.scala:139)

   at ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator.train(XGBoostEstimator.scala:36)

   at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)

   at jp.co.yahoo.k2.i2i.optimizer.model.XGBoostRanker.fit(XGBoostRanker.scala:67)

   at jp.co.yahoo.k2.i2i.optimizer.Optimizer$.train(Optimizer.scala:151)

   at jp.co.yahoo.k2.i2i.optimizer.Optimizer$.main(Optimizer.scala:78)

   at jp.co.yahoo.k2.i2i.optimizer.Optimizer.main(Optimizer.scala)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

   at java.lang.reflect.Method.invoke(Method.java:498)

   at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:782)

   at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)

   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)

   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)

   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
источник

DC

Dany Chepenko in Moscow Spark
Renarde
Шатдаун это конечная ошибка, приложение падает где-то раньше
полный трейсбек.

да, в скале тоже, я для этого делаю путаюсь их заэнкодить.
val catIndexer = cat_features.map {
   feature => new StringIndexer().setInputCol(feature).setOutputCol(feature + "_index")
 }


когда я в пайплайн не включаю numFeatureAssembler  то работает без ошибок
источник

DC

Dany Chepenko in Moscow Spark
то же верно я для только numeric  фич
источник
2020 April 30

AA

Anton Alekseev in Moscow Spark
Anton Alekseev
Всем привет. Ребята я уже приходил в чатик с проблемой переполнения буфера pyarrow. (https://issues.apache.org/jira/browse/ARROW-4890
), но опять возникла такая проблема. В общем дело в том что после группировки получаются слишком большие массивы данных чтобы передать это в pudf, проиходит переполнение буфера. И вроде в ишью выше есть ссылка на фикс, который уже залили в новые версии pyarrow. Удалось завести новый pyarrow (0.17.0) (и заодно пофиксить ошибку обратной совместимости версий), но вылазит новая обшибка pyarrow. OSError: Invalid IPC message: negative bodyLength - выглядит как тоже самое переполнение, кто-то сталкивался, удалось пофиксить? гугл тут уже не помогает.
В общем, отбор колонок не помог, данных все равно много. Кастомные сборки arrow с увеличенным буфером до 64бит, не допилены. Вот решил посмотреть в сторону сэмплирования, чтобы pudf не падал. Вопрос, а как вы делаете стратифицированное семплирование по нескольким колонкам? Пока только через rdd решение нашёл https://stackoverflow.com/questions/43878019/pyspark-sampleby-using-multiple-columns
источник
2020 May 04

K

KrivdaTheTriewe in Moscow Spark
источник

А

Алексей in Moscow Spark
Еще вот так можно, но, возможно, только в датабриксе:
https://kb.databricks.com/data/skew-hints-in-join.html
df.join(
   skewDF.hint("SKEW", "skewKey", List("0", "9999") ), Seq("keyCol"), "inner")
)
источник

А

Алексей in Moscow Spark
подскажите, как внутри spark реализован pivot? Думаю,  лучше сделать через перечисление нескольких sum(case when) или один pivot
источник

LK

Leonid Krylov in Moscow Spark
Алексей
подскажите, как внутри spark реализован pivot? Думаю,  лучше сделать через перечисление нескольких sum(case when) или один pivot
не подскажу, но вчера был выбор: 146 кейсов или один пивот 🤣
источник

А

Алексей in Moscow Spark
Leonid Krylov
не подскажу, но вчера был выбор: 146 кейсов или один пивот 🤣
ну мне не проблема сгенерить sparksql запрос. В моем запросе будет 9454 кейсов)
источник

PK

Pavel Klemenkov in Moscow Spark
Алексей
ну мне не проблема сгенерить sparksql запрос. В моем запросе будет 9454 кейсов)
Возможно от такого длинного запроса лопнет анализатор )
источник

M

Mi in Moscow Spark
трансформации в цикле это ну такое
источник

А

Алексей in Moscow Spark
ну я попробовал уже в датабриксе, запрос запускается, но оочень медленно идет, до конца не стал ждать, чтобы деньги не тратить
источник

А

Алексей in Moscow Spark
Mi
трансформации в цикле это ну такое
почему в цикле ? это 1 запрос с кучей кейсов
источник

А

Алексей in Moscow Spark
почему то экзекутеры начитют спилить промежуточные данные на диск, причем memory spill пусто
источник

R

Renarde in Moscow Spark
Алексей
ну мне не проблема сгенерить sparksql запрос. В моем запросе будет 9454 кейсов)
я такое делал в спарке, 15к case запросов - нужно побольше памяти на драйвер насыпать и норм.
источник