Size: a a a

2021 January 22

A

Alex in Data Engineers
ну и да, в рассылке люди жаловались на такую же ошибку ещё в августе
источник

RK

Ruslan Ksalov in Data Engineers
я перепробовал все, сбилдить получилось два из них (AvroDeserializationSchema.forGeneric(...) и ConfluentRegistryAvroDeserializationSchema.forGeneric(...) и оба падают с одинаковой ошибкой при получении сообщений)
источник

GP

Grigory Pomadchin in Data Engineers
Ruslan Ksalov
тут описаны несколько способов
а ты смотри что тут ключевое что в конструктор передается
источник

GP

Grigory Pomadchin in Data Engineers
типа ты new делаешь в констуркторе консумера
источник

A

Alex in Data Engineers
I figured it out for those interested; I actually had an embedded report in my avro schema, so my loop was incorrectly building a single dimension map with a GenericRecord value, which was throwing off the map’s serialization. After recursing the embedded GenericRecords to build the fully realized multi-d map, kryo stopped choking.
источник

RK

Ruslan Ksalov in Data Engineers
Grigory Pomadchin
типа ты new делаешь в констуркторе консумера
так я тоже пробовал
источник

RK

Ruslan Ksalov in Data Engineers
не билдится, сейчас ошибку гляну
источник

A

Alex in Data Engineers
чем человек закончил, возможно пересечение багов крио и авро
источник

RK

Ruslan Ksalov in Data Engineers
Alex
I figured it out for those interested; I actually had an embedded report in my avro schema, so my loop was incorrectly building a single dimension map with a GenericRecord value, which was throwing off the map’s serialization. After recursing the embedded GenericRecords to build the fully realized multi-d map, kryo stopped choking.
да, читал, я весь интернет перерыл уже за пять дней)
источник

RK

Ruslan Ksalov in Data Engineers
val stream = env.addSource(new FlinkKafkaConsumer[URLResponse](
 kafkaTopic, new AvroDeserializationSchema[URLResponse](
classOf[URLResponse]), properties))
источник

RK

Ruslan Ksalov in Data Engineers
вот так пробовал
источник

RK

Ruslan Ksalov in Data Engineers
не билдится, ошибка такая:
источник

RK

Ruslan Ksalov in Data Engineers
constructor AvroDeserializationSchema in class AvroDeserializationSchema cannot be accessed in object Extractor
     kafkaTopic, new AvroDeserializationSchema[URLResponse](classOf[URLResponse]), properties))
источник

RK

Ruslan Ksalov in Data Engineers
Насчёт Kryo, я читаю книгу Потоковая обработка данных с Apache Flink (Фабиан Уэске, Василики Калаври)
источник

RK

Ruslan Ksalov in Data Engineers
Там написано следующее
источник

RK

Ruslan Ksalov in Data Engineers
“В большинстве случаев Flink может автоматически определять типы и гене- рировать правильную информацию о типе. Средство извлечения типов Flink использует отражение и анализирует сигнатуры функций и информацию о подклассах, чтобы получить правильный тип вывода для определяемой пользователем функции. Однако иногда необходимая информация не может быть извлечена (например, из-за того, что Java стирает информацию общего типа). Более того, в некоторых случаях Flink может не выбирать TypeInformation, который генерирует наиболее эффективные сериализаторы и десериа- лизаторы. Следовательно, вам может потребоваться явное предоставление объектов TypeInformation для Flink для некоторых типов данных, используе- мых в вашем приложении."
источник

RK

Ruslan Ksalov in Data Engineers
Но и где и как правльно это сделать я не очень понимаю в контексте моего приложения
источник

A

Alex in Data Engineers
источник

RK

Ruslan Ksalov in Data Engineers
там приводится такой пример
источник

RK

Ruslan Ksalov in Data Engineers
class Tuple2ToPersonMapper extends MapFunction[(String, Int), Person] with ResultTypeQueryable[Person] {
override def map(v: (String, Int)): Person = Person(v._1, v._2)
// Предоставляет TypeInformation для типа выходных данных.
override def getProducedType: TypeInformation[Person] = Types.CASE_CLASS[Person] }
источник