Size: a a a

2021 January 22

GP

Grigory Pomadchin in Data Engineers
т.е. все работает так?
источник

A

Alex in Data Engineers
StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
источник

RK

Ruslan Ksalov in Data Engineers
блин, ну я пытался найти примере в интернете
источник

A

Alex in Data Engineers
вот это отваливается
источник

RK

Ruslan Ksalov in Data Engineers
и делал как в примерах
источник

RK

Ruslan Ksalov in Data Engineers
Alex
StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
спасибо), и что делать?
источник

RK

Ruslan Ksalov in Data Engineers
куды бечь?)
источник

GP

Grigory Pomadchin in Data Engineers
Ruslan Ksalov
блин, ну я пытался найти примере в интернете
ну если работает когда у тебя авросехеме не валом, то проблема в сериализации
источник

GP

Grigory Pomadchin in Data Engineers
я немного удивлен что def schema: Schema = AvroSchema[URLResponse] не работает
источник

GP

Grigory Pomadchin in Data Engineers
а вообще зачем нужен AvroSchema?
там есть AvroSerializationSchema который у флинка есть
источник

GP

Grigory Pomadchin in Data Engineers
т.е. либа не нужна
источник

RK

Ruslan Ksalov in Data Engineers
Grigory Pomadchin
я немного удивлен что def schema: Schema = AvroSchema[URLResponse] не работает
пробовал, тот же эффект
источник

GP

Grigory Pomadchin in Data Engineers
может у нее лучше с сериализацией
источник

A

Alex in Data Engineers
@pomadchin

https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L259-L275

там веселье что сериализованное сообщение потом не может десериализовать
источник

A

Alex in Data Engineers
executionConfig.disableForceKryo()

вы уверены что вам это нужно?
источник

GP

Grigory Pomadchin in Data Engineers
ну понятное дело
источник

GP

Grigory Pomadchin in Data Engineers
ну я так понял что вот это работает

val stream = env.addSource(new FlinkKafkaConsumer[GenericRecord](
     kafkaTopic, ConfluentRegistryAvroDeserializationSchema.forGeneric(AvroSchema[URLResponse], schemaRegistryURL), properties))
источник

GP

Grigory Pomadchin in Data Engineers
но может я криво прочитал)
источник

RK

Ruslan Ksalov in Data Engineers
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
источник

RK

Ruslan Ksalov in Data Engineers
тут описаны несколько способов
источник