E
Size: a a a
т
т
т
т
WD
@KafkaListener(groupId = "mygroup", topics = "mytopic")конфиг консюмера
public void listenPartition0(ConsumerRecord<?, ?> record) {
HelpMethods.setLog("Received: " + record.value());
}
@EnableKafka
@Configuration
public class KafkaConfiguration {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "x.x.x.x.x:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "tasks");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
конфиг продюсера@ConfigurationПишу продюсером так
public class KafkaProducerConfig {
@Bean(name = "kafkaTransactionManager")
@ConditionalOnMissingBean(KafkaTransactionManager.class)
public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
return new KafkaTransactionManager(producerFactory);
}
@Bean
public ProducerFactory<?, ?> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "x.x.x.x.x:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaProducerFactory<Object, Object> producerFactory =
new DefaultKafkaProducerFactory<>(configProps);
String transactionId = UUID.randomUUID().toString();
producerFactory.setTransactionIdPrefix(transactionId);
return producerFactory;
}
@Bean
public KafkaTemplate<?, ?> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
kafkaTemplate.send(taskTopic, new TaskInstanceKafka(instanceId, taskId, currentTime));
Есть какие-то очевидные ошибки в моем коде?т
т
E
т