0

голосов
0

ответ
4

Просмотры

подписка на Java graphQL для IBM MQ Темы

Я новичок в graphQL и пытаюсь создать подписку на тему MQ. Я хочу на основе событий обновления, всякий раз, когда я получаю сообщение graphQL API должен закачает и показать обновления. Все указатели будут полезны для понимания подписок с JMS. Спасибо
Priya Tanwar
1

голосов
0

ответ
5

Просмотры

@KafkaListener не потребляя сообщений - проблема с десериализации

Производитель сообщения с помощью Кафку привязок Spring облака потоков @Component общественности статического класса PageViewEventSource реализует ApplicationRunner {личной окончательное MessageChannel pageViewsOut; частный окончательный журнал Log = LogFactory.getLog (GetClass ()); общественного PageViewEventSource (связывание AnalyticsBinding) {this.pageViewsOut = binding.pageViewsOut (); } @Override общественный недействительный запуск (ApplicationArguments арг) бросает исключение {список имен = Arrays.asList ( "Priya", "dyser", "Луч", "Марк", "Oman", "Ларри"); Список страниц = Arrays.asList ( "блог", "Facebook", "Instagram", "новости", "YouTube", "о"); Runnable работоспособной = () -> {Строка rPage = pages.get (новый Random () nextInt (pages.size ()).); Строка RNAME = страница. получить (новый Random () nextInt (names.size ()).); PageViewEvent pageViewEvent = новый PageViewEvent (RNAME, rPage, Math.random ()> 0,5 10: 1000); Serializer сериализатору = новый JsonSerde (PageViewEvent.class) .serializer (); байт [] т = serializer.serialize (нуль, pageViewEvent); Сообщение Сообщение = MessageBuilder .withPayload (м) .build (); попробуйте {this.pageViewsOut.send (сообщение); log.info ( "отправлено" + сообщение); } Задвижка (Исключение е) {log.error (е); }}; Executors.newScheduledThreadPool (1) .scheduleAtFixedRate (Runnable, 1, 1, TimeUnit.SECONDS); } Это использование ниже сериализации spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde = org.apache.kafka.common.serialization.Serdes $ StringSerde spring.cloud.stream.kafka.streams. binder.configuration.default.value.serde = org.apache.kafka.common.serialization.Serdes $ BytesSerde Я стараюсь потреблять эти сообщения в отдельном Consumer Application через Spring Кафки - KafkaListener @Service общественного класса PriceEventConsumer {частный статический окончательного Logger LOG = LoggerFactory.getLogger (PriceEventConsumer.class); @KafkaListener (темы = "test1", идентификатор_группы = "JSON", containerFactory = "kafkaListenerContainerFactory") общественного недействительными получить (байты данных) {// общественного недействительными получить (данные @Payload PageViewEvent, @ заголовки Заголовки MessageHeaders) {log.info ( "Сообщение доставлено"); LOG.info ( "полученные данные = {}", данные); } Конфигурации Контейнер завод @Bean общественного ConsumerFactory consumerFactory () {Карта реквизита = новый HashMap (); props.put (ConsumerConfig. BOOTSTRAP_SERVERS_CONFIG, "локальный: 9092"); props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put (ConsumerConfig.GROUP_ID_CONFIG, "JSON"); props.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "ранний"); вернуть новый DefaultKafkaConsumerFactory (реквизит); } @Bean общественного ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory завод = новый ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory ()); вернуться завод; } С помощью этого потребителя конфигурации не подбирая сообщения (в байтах). Если я изменю Кафка слушателя принять строку, то это дает мне ниже исключения: @KafkaListener (темы = "test1", идентификатор_группы = "JSON", containerFactory = "kafkaListenerContainerFactory") общественного недействительными получить (данные типа String) {LOG.info ( "Сообщение получено"); LOG.info ( "полученные данные = {}", данные); } Вызванный: org.springframework.messaging.converter.MessageConversionException: Невозможно обработать сообщение; вложенное исключение org.springframework.messaging.converter.MessageConversionException: не удается преобразовать из [org.apache.kafka.common.utils.Bytes] до [java.lang.String] для GenericMessage [полезной нагрузки = { "USERID": "Facebook" , "страница": "о", "длительность": 10}, заголовки = {kafka_offset = 4213, [email protected], kafka_timestampType = create_time, kafka_receivedMessageKey = NULL,
Priya Tanwar