Size: a a a

2021 August 06

ПФ

Паша Финкельштейн... in Moscow Spark
а я говорил что надо более жёсткого бота поставить
источник

K

KrivdaTheTriewe in Moscow Spark
Тут не оплачен
источник

ПФ

Паша Финкельштейн... in Moscow Spark
так есть бесплатный же
источник

ПФ

Паша Финкельштейн... in Moscow Spark
с капчей
источник
2021 August 08

SI

Sergey Ivanychev in Moscow Spark
Привет! А кто-то знает как конкретно df.repartition(10) шаффлит данные? Что конкретно используется для вычисления хеша и как его руками воспроизвести?
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Думаешь там есть оптимизации прям такие? Я бы просто добавил rownum на каждой ноде и по остатку от деления на число нод отсылал бы на соответствующие ноды
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Ага, ну нет, работает оно не так
источник

ПФ

Паша Финкельштейн... in Moscow Spark
partition = key.hashCode () % numPartitions
источник

SI

Sergey Ivanychev in Moscow Spark
В случае датафрейма ключ это что?
источник

ПФ

Паша Финкельштейн... in Moscow Spark
ищу )
источник

ПФ

Паша Финкельштейн... in Moscow Spark
индексирование проекта спарка — боль
источник

t

tenKe in Moscow Spark
если мне память не изменяет, там рандом % новому числу партиций
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Короче походу правда посередине.
источник

ИК

Иван Калининский... in Moscow Spark
Рандом должен быть нерандомный))
Для стабильного разделения сидируется номером партиции RDD, ограничен количеством партиций.
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Формально там берётся хэшкод у объекта
  def getPartition(key: Any): Int = key match {
   case null => 0
   case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
 }
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Но ключём выступает Row, видимо, у которого хэшкод не определён
источник

ПФ

Паша Финкельштейн... in Moscow Spark
А потом начинаем читать код и видим что всё сложнее, и, например, в UnsafeShuffleWriter ключём является значение из первой колонки
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Но нас скорее всего интересует всё-таки

    def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
     case RoundRobinPartitioning(numPartitions) =>
       // Distributes elements evenly across output partitions, starting from a random partition.
       var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
       (row: InternalRow) => {
         // The HashPartitioner will handle the `mod` by the number of partitions
         position += 1
         position
       }
     case h: HashPartitioning =>
       val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes)
       row => projection(row).getInt(0)
     case RangePartitioning(sortingExpressions, _) =>
       val projection = UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes)
       row => projection(row)
     case SinglePartition => identity
     case _ => sys.error(s"Exchange not implemented for $newPartitioning")
   }
источник

ПФ

Паша Финкельштейн... in Moscow Spark
источник

ПФ

Паша Финкельштейн... in Moscow Spark
Если раундробин — то в точности как сказал Андрей
источник