Вопросы с тегами [apache-kafka]

1

голосов
1

ответ
198

Просмотры

Что происходит, когда есть только один раздел в Кафки темы и несколько потребителей?

У меня есть Кафка тема только один раздел, и я не получаю то, что будет происходить в следующих случаях? Как сообщения будут доставлены потребителям? Если все потребители находятся в одной группе Если все потребители находятся в другой группе, я не уверен, что если потребители будут получать уникальные сообщения или дублировать из них.
hard coder
1

голосов
2

ответ
860

Просмотры

что лучше практик, чтобы получать сообщения из нескольких тематических разделов Кафки?

Мне нужно потребительские сообщения из различных тем Кафки, Должен ли я создать другой экземпляр потребителя за тему, а затем начать новую обработку резьбы в соответствии с количеством разделов. или я должен подписаться все темы от одного экземпляра потребителей и должны начать разные технологические потоки Благодарности и пожелания, MEGHA
Megha
1

голосов
4

ответ
817

Просмотры

Как проверить, если Кафка Потребитель готов

У меня есть Кафка совершить набор политик для последних и пропавших без вести первых нескольких сообщений. Если я даю спать в 20 секунд, прежде чем начать посылать сообщения на входной теме, все работает так, как хотелось бы. Я не уверен, что если проблема с потребителем принимать длительное время для раздела восстановления равновесия. Есть ли способ узнать, если потребитель готов, прежде чем начать опрашивать?
Nagireddy Hanisha
1

голосов
0

ответ
308

Просмотры

Надежный веб Оправа Streaming Кафки в Java

Мне нужно записать из ненадежного подключения веб-сокетов и потока в Кафка. Наш Кафка кластер является довольно надежным и мы можем сделать его высокую доступность. Каков наилучший подход, чтобы сделать подключение к веб-сокетов в качестве надежной, насколько это возможно? Я хотел бы, чтобы свести к минимуму потери данных. Одним из решений будут иметь несколько процессов или веб-сокеты клиентов прослушивания и потоковые к нескольким темам Кафок. Затем сделать фильтр с Кафкой потоков. Это работает только тогда, когда каждое сообщение, которое я получаю имеет уникальный идентификатор, который не всегда. Другим решением было бы контролировать подключение веб-сокетов и перезапустить или сбросить его. Но тогда я, возможно, потерю данных. Или полагаться на веб-сокетов сердцебиения? Или коды своих собственных обработчиков ошибок? Какие каркасы / библиотеки из Java в этом пространстве, чтобы сделать лучшую работу? В настоящее время я использую клиент веб-сокеты от org.java-WebSocket.
Daniel
1

голосов
0

ответ
179

Просмотры

Спарк потокового возвращается Кафка результат на местном, но не на ПРЯЖАХ

Я использую Cloudera В. М. CDH 5,12, свечи v1.6, Кафка (устанавливается ням) V0.10 и питона 2.66. Я по этой ссылке параметры свечи: Ниже приведен простой искровой приложение, которое я бегу. Он принимает события от Кафки и печатает его после того, как карта уменьшить. от __future__ импорта print_function импорта SYS из pyspark импорта SparkContext из pyspark.streaming импорта StreamingContext из pyspark.streaming.kafka импорта KafkaUtils если __name__ == "__main__": если Len (sys.argv) = 3: печать ( "Использование: kafka_wordcount. ру», файл = sys.stderr) выход (-1) = SparkContext подкожно (APPNAME = "PythonStreamingKafkaWordCount") ССК = StreamingContext (п, 1) zkQuorum, тема = sys.argv [1:] КВС = KafkaUtils.createStream (ССК , zkQuorum, "искра-потоковая-потребитель", {темы: 1}) линия = kvs.map (лямбда х: 1515766709025 окончательный статус: UNDEFINED отслеживания URL: HTTP: //quickstart.cloudera: 8088 / прокси / application_1515761416282_0010 / пользователь: Cloudera 40 INFO YarnClientSchedulerBackend: application_1515761416282_0010 Применение начал работать. 40 INFO Utils: Успешно начал службу 'org.apache.spark.network.netty.NettyBlockTransferService' на порт 53694. 40 INFO NettyBlockTransferService: Сервер создан на 53694 53 INFO YarnClientSchedulerBackend: SchedulerBackend готова для планирования начинающегося после ожидания maxRegisteredResourcesWaitingTime: 30000 (мс) 54 ИНФО BlockManagerMasterEndpoint: Регистрация блок-менеджер quickstart.cloudera: 56220 с 534,5 MB RAM, BlockManagerId (1, quickstart.cloudera, 56220) 07 ИНФО ReceiverTracker: с 1 приемников 07 ИНФО ReceiverTracker: ReceiverTracker начала 07 INFO PythonTransformedDStream: PythonTransformedDStream @ de77734 10 INFO MemoryStore: Блок broadcast_2 хранятся в виде значений в памяти (приблизительный размер 5,8 Кб, свободный 534,5 MB) 10 INFO MemoryStore: Блок broadcast_2_piece0 хранится в виде байтов в памяти (приблизительный размер 3.5 KB, свободный 534,5 МБ) 20 INFO JobScheduler: Добавлено рабочие места для времени 1515766760000 мс 30 INFO JobScheduler: Добавлены рабочие места для времени 1515766770000 мс 40 INFO JobScheduler: Добавлены задания на время 1515766780000 мс после этого, работа только начинает повторять следующие строки (после некоторой задержки, заданной контексте потока) и не суммируется и не распечатке Кафки потока, в то время как работа на хозяина местного с точно такой же код делает. Интересно, что печатает следующую строку каждый раз, когда происходит-событие Кафки (картинка увеличенных настроек искры памяти) Обратите внимание, что: данные в Кафке, и я могу видеть, что в потребительской консоли я также попытался увеличением исполнителя»
Samhash
1

голосов
0

ответ
293

Просмотры

Bridging Кафка и Akka Streams

Кафка Streams поддерживается только на платформе JVM. Поэтому я искал способ использовать AKKA потоки в C # для создания потребителя Кафки с выгодой от Akka Streams. Сливной C # Кафка потребитель основан на опрос и обработку событий, например, с использованием (вар потребитель = новый Потребитель (consumerConfig, пустой, новый StringDeserializer (Encoding.UTF8))) {// Подписка на события OnMessage consumer.OnMessage + = (OBJ , MSG) => {// дескриптор сообщения}; consumer.Subscribe (новый список () {kafkaTopic}); в то время как (отменен) {consumer.Poll (); }} И источник для Akka потоков, кажется, основано на IEnumerable. Так как опрос и IEnumerable аналогичен по своей природе она должна быть как-то можно соединить эти две технологии. Я новичок в Akka Streams и не быть в состоянии понять, как это сделать.
carstenj
1

голосов
1

ответ
544

Просмотры

Как установить MSSQL JDBC драйвер для сливных / Кафки

баночка / USR / Библиотека / JVM / Java-8-OpenJDK-amd64 / JRE / Библиотека / внутр документация создает впечатление, что все, что я сделать, это добавить / путь / к / баночку на пути к классам и начать сливающиеся. Что мне не хватает. Кроме того, это мой JSON для создания соединителя. { "Имя": "JDBC-источник", "конфигурация": { "connector.class": "com.microsoft.sqlserver.jdbc.SQLServerDriver", "tasks.max": "1", "connection.url": "JDBC: SQLServer: // SQL: 1443; User = кто-то; Password = пароль; Database = некоторая-дебютантка", "режим": "приращение", "incrementing.column.name": "идентификатор", «topic.prefix «: "электронная почта", "topic.whitelist": "EventLog", "имя": "JDBC-источник"}} Кроме того, это мой JSON для создания соединителя. { "Имя": "JDBC-источник", "конфигурация": { "connector.class": "com.microsoft.sqlserver.jdbc.SQLServerDriver", "tasks.max": "1", "connection.url": "JDBC: SQLServer: // SQL: 1443; User = кто-то; Password = пароль; Database = некоторая-дебютантка", "режим": "приращение", "incrementing.column.name": "идентификатор", «topic.prefix «: "электронная почта", "topic.whitelist": "EventLog", "имя": "JDBC-источник"}} Кроме того, это мой JSON для создания соединителя. { "Имя": "JDBC-источник", "конфигурация": { "connector.class": "com.microsoft.sqlserver.jdbc.SQLServerDriver", "tasks.max": "1", "connection.url": "JDBC: SQLServer: // SQL: 1443; User = кто-то; Password = пароль; Database = некоторая-дебютантка", "режим": "приращение", "incrementing.column.name": "идентификатор", «topic.prefix «: "электронная почта", "topic.whitelist": "EventLog", "имя": "JDBC-источник"}}
matthewdaniel
1

голосов
1

ответ
901

Просмотры

Как слушать правое сообщение ACK от Кафки

Я делаю ДОУ с Spring ботинком и Кафка для транзакционного проекта и у меня есть следующее сомнение: Сценарий: Один microservices MSPUB1 получает запросы от клиента. Это просит опубликовать сообщение на тему TRANSACTION_TOPIC1 на Кафку, но что Microservice может принимать множество запросов параллельно. Microservice слушает топика TRANSACTION_RESULT1, чтобы проверить, что сделка завершена. В другой стороне платформы Streaming, другой Microservice MSSUB1 слушает топику TRANSACTION_TOPIC1 и обрабатывать все сообщения и опубликовать результаты по: TRANSACTION_RESULT1 Что такое лучший способ от MSPUB1 знать, если сообщение на тему TRANSACTION_RESULT1 совпадает с его исходным запросом? Microservice MSPUB1 может иметь идентификатор для любого сообщения, опубликованного на начальную тему TRANSACTION_TOPIC1 и быть перемещен в TRANSACTION_RESULT1 Вопрос: Когда вы читаете раздел, вы перемещаете указатель, но в параллелизме среды с несколькими запросами, как если сообщение на проверки тема TRANSACTION_RESULT1 это ожидается? Большое спасибо заранее Хуан Антонио
jabrena
1

голосов
1

ответ
640

Просмотры

Невозможно удалить тему Кафки в ОС Windows

Я установить свойства Zookeeper из «delete.topic.enable» истина. Но я до сих пор не могу удалить эту тему. Когда я МВН установить или тест МВН, я получил следующие проблемы: WARN обработки kafka.log Ошибка: тип = LogManager, имя = LogDirectoryOffline, logDirectory = C: \ Users \ extznq \ AppData \ Local \ Temp \ EH4Test7267133751803693562 (com.yammer. metrics.reporting.JmxReporter: 397) javax.management.MalformedObjectNameException: недопустимый символ ':' в стоимость части имущества ERROR Ошибка при удалении healthchecktopic1516638375589-0 в дир C: \ Users \ extznq \ AppData \ Local \ Temp \ EH4Test9083449671042580730. (Kafka.server.LogDirFailureChannel: 107) java.io.IOException: Невозможно переименовать каталог журнала из C: \ Users \ {My-тема-путь} в C: \ Users \ {My-тема-путь} -0.0a40ae7410c2401aba0816891789c334- удалить в kafka.log.LogManager. collection.IterableLike $ class.foreach (IterableLike.scala: 72) при scala.collection.AbstractIterable.foreach (Iterable.scala: 54) в kafka.server.ReplicaManager.stopReplicas (ReplicaManager.scala: 371) в kafka.server.KafkaApis .handleStopReplicaRequest (KafkaApis.scala: 190) в kafka.server.KafkaApis.handle (KafkaApis.scala: 104) в kafka.server.KafkaRequestHandler.run (KafkaRequestHandler.scala: 65) в java.lang.Thread.run (Thread. Java: 745) ОШИБКА [Broker ID = 0] Игнорирование остановки реплики (удалить = TRUE) для разбиения healthchecktopic1516728986980-0 из-за исключением хранения (state.change.logger: 107) org.apache.kafka.common.errors.KafkaStorageException: Ошибка во время удаления healthchecktopic1516728986980-0 в дир C: \ Users \ AppData \ Local \ Temp \ EH4Test7267133751803693562. Вызванный: java.io.IOException: Не удалось переименовать каталог журнала из C: \ Users \ AppData \ Local \ Temp \ EH4Test7267133751803693562 \ healthchecktopic1516728986980-0 в C: \ Users \ AppData \ Local \ Temp \ EH4Test7267133751803693562 \ healthchecktopic1516728986980-0.65fc6c32c44940e58c1a45bd2972523a-удаление в kafka.log.LogManager.asyncDelete (LogManager.scala: 671) Я думаю, что предупреждение все в порядке. Но я не знаю, почему я получаю ошибки, хотя я бегу Eclipse в качестве администратора. Тем более, KafkaStorageException, я до сих пор 50GB в моем компьютере. Окружающая среда: Windows 10 Zookeeper 3.5.3-бета Кафка 1.0.0 asyncDelete (LogManager.scala: 671) Я думаю, что предупреждение все в порядке. Но я не знаю, почему я получаю ошибки, хотя я бегу Eclipse в качестве администратора. Тем более, KafkaStorageException, я до сих пор 50GB в моем компьютере. Окружающая среда: Windows 10 Zookeeper 3.5.3-бета Кафка 1.0.0 asyncDelete (LogManager.scala: 671) Я думаю, что предупреждение все в порядке. Но я не знаю, почему я получаю ошибки, хотя я бегу Eclipse в качестве администратора. Тем более, KafkaStorageException, я до сих пор 50GB в моем компьютере. Окружающая среда: Windows 10 Zookeeper 3.5.3-бета Кафка 1.0.0
Spider
1

голосов
0

ответ
310

Просмотры

How to read kafka statestore in processor which was updated from a different processor?

Я создал 2 Кафка statestores, соответствующую 2 различных темы, как показано ниже. StateStoreSupplier tempStore1 = Stores.create ( "tempStore1") withKeys (Serdes.String ()) withValues ​​(valueSerde) .persistent () построить ()...; StateStoreSupplier tempStore2 = Stores.create ( "tempStore2") withKeys (Serdes.String ()) withValues ​​(valueSerde) .persistent () построить ()...; streamsBuilder.addSource ( "Источник", "tempTopic1", "tempTopic2") .addProcessor ( "Процесс", () -> новый процессор (), "Источник") .connectProcessorAndStateStores ( "Процесс", "tempStore1", "tempStore2" ) .addStateStore (tempStore1, "Процесс") .addStateStore (tempStore2, "Процесс"); В классе процессора, Я могу читать и добавлять записи в StatesStores, когда есть сообщение в соответствующих темах. Но я не могу прочитать магазин tempStore1, когда есть сообщение, приходящее из tempTopic2 и вице verca. Я должен сравнить сообщения от одного statestore к другому, а это значит, мне нужно прочитать как государственные магазины. Вот пример кода Snipper от способа обработки. Я считаю, что ProcessorContext (переменная контекста) отличается для соответствующих тем и, следовательно, другой магазин не быть доступным. tempKeyValueStore1 = (KeyValueStore) context.getStateStore ( "tempStore1"); tempKeyValueStore2 = (KeyValueStore) context.getStateStore ( "tempStore2"); если (context.topic (). равными ( "tempTopic1")) {tempKeyValueStore1.put (value.getHeader (). getCorrelationId (), значение); } Иначе, если (context.topic () равно ( "tempTopic2").) {TempKeyValueStore2.put (value.getHeader () getCorrelationId (), значение.); System.out.println ( "Размер:" + tempKeyValueStore1.approximateNumEntries ()); // возвращение как 0, хотя есть записи в том, что statestore} Спасибо заранее.
Nar123
1

голосов
0

ответ
775

Просмотры

Спарк структурированных потокового с обеспеченным Кафка получает заморозил сообщение журнала [INFO StreamExecution: Запуск нового потоковый запроса.]

Для того, чтобы использовать структурированные потоковый в моем проекте, я тестирую искру 2.2.0 и Кафка 0.10.1 интеграцию с Kerberos на моем Hortonworks 2.6.3 среды, я бег ниже пример кода для проверки интеграции. Я могу запустить программу ниже на IntelliJ на свече локальном режиме без каких-либо проблем, но и та же программа при перемещении в кластер пряжи / режим клиента на Hadoop кластера он застрянет на ниже сообщения журнала 18/01/25 14:19:59 INFO StreamExecution: Запуск нового потокового запроса. Для проверки вменяемости на обеспеченном кластере Hadoop, я побежал искру 2.2.0 ПИ примера, это работает отлично, и я также способен производить и потреблять сообщения от Кафки на Hadoop кластере с использованием kafka-console-producer.sh и kafka- console-consumer.sh. Извините за этот долгий пост, поскольку это является очень широким вопросом, мне нужно ваши материалы для того, чтобы сделать эту работу. Пример программы: APPNAME ( "KAFKA_SPARK_TEST_APP") .getOrCreate ()}} искры kafka_jaas.conf: KafkaClient {com.sun.security.auth.module.Krb5LoginModule требуется useTicketCache = истина renewTicket = истина ServiceName = "{{kafka_bare_jaas_principal}}"; }; Клиент {com.sun.security.auth.module.Krb5LoginModule требуется useKeyTab = истина Keytab = "{{kafka_keytab_path}}" storeKey = TRUE useTicketCache = FALSE ServiceName = "Zookeeper" основные "= {{kafka_jaas_principal}}"; }; Спарк подать команду: ./spark-submit \ --name TEST_JOB_SPARK_STREAM_WITH_KAFKA \ --verbose \ --master пряжа \ --deploy режим клиента \ --num-исполнители 1 \ --executor-памяти 1g \ --executor- ядра 1 \ --repositories http://repo.hortonworks.com/content/repositories/releases/ \ --packages org.apache.spark: искровой SQL-Кафка-0-10_2.11: 2.2.0.2.6.3. 0-235 \ --conf spark.yarn.maxAppAttempts = 1 \ --files /home/user/sparktest/kafka_jaas.conf#kafka_jaas.conf,/home/user/sparktest/user.headless.keytab#user.headless. Keytab \ --conf "spark.driver.extraJavaOptions = -XX: + UseG1GC -Djava.security.auth.login.config = / дом / пользователь / sparktest / kafka_jaas.conf" \ --conf «spark.executor.extraJavaOptions = -XX: + UseG1GC -Djava.security.auth.login.config = / дом / пользователь / sparktest / kafka_jaas.conf»\ --class com.brokerfmc.demo.DemoSparkStreamingWithKafkaJob /home/user/sparktest/demo.jar kafka01. broker.com:6667,kafka02.broker.com:6667 src_topic dest_topic SASL_PLAINTEXT Спарк Консоль вывода: 18/01/25 14:19:59 INFO AbstractLogin: Успешно вошли в 18/01/25 14:19:59 INFO KerberosLogin.: TGT обновить поток начал. 18/01/25 14:19:59 INFO KerberosLogin: TGT действителен начиная с: Чт 25 января 13:07:
nilesh1212
1

голосов
1

ответ
776

Просмотры

Apache Спарк Структурированные потоковый (DataStreamWriter) запись в улье таблицы

Я ищу использовать искру Structured потокового чтения данных с Кафкой и обработать его и записать в таблицу Hive. Вал искра = SparkSession .builder .appName ( "Кафка Тест") .config ( "spark.sql.streaming.metricsEnabled", правда) .config ( "spark.streaming.backpressure.enabled", "истина") .enableHiveSupport () .getOrCreate () знач события = искра .readStream .format ( "Кафка") .option ( "kafka.bootstrap.servers", "ХХХХХХХ") .option ( "startingOffsets", "последний") .option ( "подписываются", "YYYYYY") .load VAL данные = events.select (..... некоторые столбцы ...) data.writeStream .format ( "паркетным") .option ( "сжатие", "мгновенным") .outputMode ( "добавить ") .partitionBy (" DS ") .option ("
Anant Furia
1

голосов
0

ответ
63

Просмотры

CDH искровых пропариваниями потребительского Кафка Керберос

Есть ли какой-либо один пытался использовать искру-пропаривания (pyspark) как потребитель для керберос Кафка в CDH? Я ищу CDH и просто найти пример о Scala. Разве это означает, CDH не поддерживает это? Любой человек может помочь в этом ???
znever
1

голосов
0

ответ
348

Просмотры

Какой искровой SQL-Кафка использовать для Apche Кафки 1.0.0.?

Какие искровым SQL-Кафка .jar как зависимость при работе с искровым отправить следует использовать для Apache Кафки 1.0.0.? Я использую Scale 2.11 и Спарк 2.2.1. Я хотел бы запустить Спарк Структурированного приложения Streaming.
Erhan
1

голосов
0

ответ
244

Просмотры

Kafka consumer message consumption delayed for one VM in cluster

У меня есть KAFKA 10.2.1 кластер под управлением, в котором сообщение Кафка отправляется с использованием абонентов kafka-console-producer.sh и 2/3 получают сообщение немедленно, пока другой получает сообщение значительно позже (4 минуты). Я предполагаю, что это проблема с абонентом, поэтому ниже мой абонент конфигурация защищен ConsumerFactory createConsumerFactory (String kafkaBrokers) {Карта реквизит = новый HashMap (); props.put (ConsumerConfig.GROUP_ID_CONFIG, InetAddress.getLocalHost () getHostAddress ().); props.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "ранний"); props.put (ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); вернуть новый DefaultKafkaConsumerFactory (реквизит); } Защищенный KafkaListenerContainerFactory createKafkaListenerContainerFactory (ConsumerFactory consumerFactory) {ConcurrentKafkaListenerContainerFactory завод = новый ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory); factory.setConcurrency (1); factory.setAutoStartup (истина); factory.setMessageConverter (новый StringJsonMessageConverter ()); factory.getContainerProperties () setPollTimeout (3000). вернуться завод; } Абоненты работают в Java Spring JVM, но производитель простой питон скрипт, который вызывает kafka-console-producer.sh отправить сообщение. Производитель конфигурации по умолчанию. Что может вызвать сообщение, которое будет прибывать так поздно? Не уверен, если это уместно - Когда я Ls -l на сегменты журнала свалка папку Кафка (Kafka / myTopic-0 /), я вижу, что 000000 ... 000. журнал уже писал в конце VM в то время я ожидаю, что приводит меня верить, что сообщение было доставлено вовремя. Тем не менее, Java абонент набирает сообщение позже. Что может быть причиной?
cjavier70
1

голосов
0

ответ
81

Просмотры

Проблема с filebeat KAFKA сжатия выходного LZ4

Я пытаюсь настройки filebeat (6.2) с выходом Кафка с использованием LZ4 сжатия, но всегда получаю ошибку, как ERR Неверной Кафка конфигурации: Кафка: неверная конфигурация (сжатие LZ4 требует версия> = V0_10_0_0) Моя конфигурация выхода: output.kafka: хосты: [ "kafka1: 10092", "kafka2: 10092", "kafka3: 10092"] тема: '% {[fields.log_topic]}' partition.round_robin: reachable_only: ложные required_acks: 1 сжатия: LZ4 max_message_bytes: 1000000 версия: 0.11 .0.0 Любое представление о том, что могло бы быть неправильным было бы очень полезно Спасибо.
Prabin Meitei
1

голосов
0

ответ
147

Просмотры

Apache Flink Стохастический Outlier Выбор по потоку данных

Я пытаюсь использовать модель StochasticOutlierSelection пакета Apache Flink ML. Я не могу работать, как использовать его с Кафкой в ​​качестве источника данных, я понимаю, что это нуждается в DataSet, а не DataStream, но я, кажется, не быть в состоянии окна моего Кафка DataStream стать DataSet. Есть ли способ, я могу относиться к моему потоку в виде серии небольших DataSets. Например, есть способ сказать, каждые 10 элементов в потоке, соответствующий шаблон (скользящее окно элементов уникального идентификатор) рассматривать их в качестве DataSet фиксированного размера и выявлением каких-либо выбросов в течение этого фиксированного набора данных размера? Сценарий я ищу, чтобы создать это: источник данных -> Кафка Тема 1 -> Flink предварительная обработка -> Кафка Тема 2 -> FLiNK Группы по ID -> Обнаружение Outlier на группах у меня уже есть рабочая реализация до предварительно обработка,
Joe
1

голосов
1

ответ
67

Просмотры

Порядок между зависимыми объектами с Кафки Streams

Читаем данные из RestfulAPI, которые представляют собой зависимые объекты. например, из / студентов я получаю студенческие объекты и из / учителей я получаю объект учителя. Студент подключен к объекту Преподаватель (студент, преподаватель Id). Проблема заключается в том, что я произвожу от / студентов к Кафке в студент темы и с / учителями учителей по теме, но когда я пытаюсь соединить между ними Кафками Streams, иногда событие студента приходит до его событие учителя пришло, таким образом, я не получаю объединенная запись студента и преподавателя (из-за рано прибыли студентов). Для того, чтобы использовать окно не является оптимальным, потому что я хотел бы получить студент обновляет все время. Мой вопрос - как синхронизировать события, так что я буду в состоянии разрешить в зависимости объектов. В настоящее время я
Itai
1

голосов
0

ответ
543

Просмотры

Тестирование производительности Кафки Streaming Application

Я написал приложение потокового Кафка в Java, который считывает с одной темы. Есть ли какое-то лицо без преобразования на отдельных сообщениях и производит его в другую тему, используя Кафка Streams Java API. Может кто-нибудь помочь мне, как сделать тестирование производительности для данного приложения. Я хочу, чтобы найти какую скорость мы потребляем и трансформацию и производство?
Vamshi
2

голосов
0

ответ
13

Просмотры

Robinhood Faust: How to process results from windowed table?

Я пытаюсь создать перемешивающуюся оконную таблицу в RobinHood Фауста в результате чего события, потребляемое от темы является оконной в дискретные временных интервалы 10 секунд и истекает через 90 минут. events_topic = app.topic ( 'события', value_type = Событие) EVENT_TABLE = app.Table ( 'EVENT_TABLE', по умолчанию = Событие,) .tumbling (размер = timedelta (секунды = 10), истекает = timedelta (минуты = 90), key_index = True) .relative_to_field (Event.event_time) @ app.agent (events_topic) асинхронная Защита process_event (события): асинхронный для события в events.group_by (event.partition_id): EVENT_TABLE [event.partition_id] = событие вышеперечисленных разделов реализации события относительно их ID раздела и вставляет событие в оконном event_table. Для того, чтобы получить доступ к элементам, хранящимся в таблице, можно обратиться к документации здесь список (kline_table [ «example_feature_id»]. пункты ()) Это должно быть тривиальным применять операции над данными, таких, что для каждого второго окна 10 должен существовать единый агрегированный значение, определяющее среднее «событие» в этом окне т.е. с учетом следующих событий: [{VAL: 2}, {вал: 2}] совокупное событие должно быть {вал: 2}. Однако это становится несколько сложным, когда вы осознаете, что данные не были дискретно сгруппированы по времени / partition_id. Я не уверен относительно того, что каноническая реализация агрегированных барабанных окон в RobinHood Фауста, как бы один наиболее эффективно реализовать логику Тэрин? Спасибо пункты ()) Это должно быть тривиальным применять операции над данными, что для каждого второго окна 10 должен существовать единый агрегированный значение, определяющее среднее «событие» в пределах этого окна, т.е. учитывая следующие события: [{VAL: 2}, {вал: 2}] совокупное событие должно быть {вал: 2}. Однако это становится несколько сложным, когда вы осознаете, что данные не были дискретно сгруппированы по времени / partition_id. Я не уверен относительно того, что каноническая реализация агрегированных барабанных окон в RobinHood Фауста, как бы один наиболее эффективно реализовать логику Тэрин? Спасибо пункты ()) Это должно быть тривиальным применять операции над данными, что для каждого второго окна 10 должен существовать единый агрегированный значение, определяющее среднее «событие» в пределах этого окна, т.е. учитывая следующие события: [{VAL: 2}, {вал: 2}] совокупное событие должно быть {вал: 2}. Однако это становится несколько сложным, когда вы осознаете, что данные не были дискретно сгруппированы по времени / partition_id. Я не уверен относительно того, что каноническая реализация агрегированных барабанных окон в RobinHood Фауста, как бы один наиболее эффективно реализовать логику Тэрин? Спасибо Однако это становится несколько сложным, когда вы осознаете, что данные не были дискретно сгруппированы по времени / partition_id. Я не уверен относительно того, что каноническая реализация агрегированных барабанных окон в RobinHood Фауста, как бы один наиболее эффективно реализовать логику Тэрин? Спасибо Однако это становится несколько сложным, когда вы осознаете, что данные не были дискретно сгруппированы по времени / partition_id. Я не уверен относительно того, что каноническая реализация агрегированных барабанных окон в RobinHood Фауста, как бы один наиболее эффективно реализовать логику Тэрин? Спасибо
Brad
1

голосов
1

ответ
327

Просмотры

Spring Кафка - Как установить Commit Async недвижимость

Я пытаюсь установить свойство для разрешения commitAsync () будет вызываться из KafkaMessageListenerContainer: если (this.containerProperties.isSyncCommits ()) {this.consumer.commitSync (совершает); } Еще {this.consumer.commitAsync (совершает, is.commitCallback); } Есть ли способ, чтобы установить это в моем файле application.yml? Я был в состоянии установить подтверждени режим ПАРТИИ, но я не могу найти способ, чтобы иметь свойства контейнера имеют syncCommits значение ЛОЖЬ.
shaktech786
1

голосов
1

ответ
225

Просмотры

Невозможно подключиться к серверу Kakfa от моего локального хоста

У меня есть Кафка сервер работает на другой системе. Я пытаюсь запустить клиент из моей локальной машины, давая брокер URL компьютера, на котором сервер Кафки работает. Но, к сожалению, я не могу подключиться к серверу Кафки. server.properties файлы имеет атрибуты ниже: group.initial.rebalance.delay.ms = 0 слушателей = SASL_PLAINTEXT: // локальный: 9093 advertised.listeners = SASL_PLAINTEXT: // локальный: 9093 # advertised.listeners = SASL_PLAINTEXT: //10.97 .123.52: 9093 security.inter.broker.protocol = SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol = НЕАРМИРОВАННОЕ sasl.enabled.mechanisms = ОБЫЧНЫЙ во время работы моего клиента из моей локальной машины, я передаю брокерский URL машины сервера , но не может подключиться :(. Может кто-нибудь помочь в этой проблеме?
ashish chauhan
1

голосов
1

ответ
236

Просмотры

kafka confluent error java.lang.IllegalArgumentException: /tmp/confluent.PVghAKRg/zookeeper/data/myid file is missing

Я бегу Кафку через платформу Confluent на 3 узлов, но когда я разбег сливающийся получаю эту ошибку: [2018-04-09 10: 54: 25,995] INFO Считывание конфигурации из: /tmp/confluent.SVNfiLFU/zookeeper/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2018-04-09 10: 54: 26011] INFO Решенный имя хоста: 0.0.0.0 по адресу: /0.0.0.0 (org.apache.zookeeper.server.quorum. QuorumPeer) [2018-04-09 10: 54: 26011] INFO Решенный имя хоста: 192.168.0.36 к адресу: /192.168.0.36 (org.apache.zookeeper.server.quorum.QuorumPeer) [2018-04-09 10:54 : 26011] INFO Решенного имя хост: 192.168.0.22 по адресу: /192.168.0.22 (org.apache.zookeeper.server.quorum.QuorumPeer) [2018-04-09 10: 54: 26011] INFO недобросовестного большинства кворумам (Org. apache.zookeeper.server.quorum.QuorumPeerConfig) [2018-04-09 10: 54: 26012] ОШИБКА Недопустимые конфигурации,выход ненормально (org.apache.zookeeper.server.quorum.QuorumPeerMain) org.apache.zookeeper.server.quorum.QuorumPeerConfig $ ConfigException: Ошибка обработки /tmp/confluent.SVNfiLFU/zookeeper/zookeeper.properties в org.apache.zookeeper. server.quorum.QuorumPeerConfig.parse (QuorumPeerConfig.java:154) при org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun (QuorumPeerMain.java:101) при org.apache.zookeeper.server.quorum.QuorumPeerMain.main ( QuorumPeerMain.java:78) Вызванный: java.lang.IllegalArgumentException: /tmp/confluent.SVNfiLFU/zookeeper/data/myid файл отсутствует в org.apache.zookeeper.server.quorum.QuorumPeerConfig.parseProperties (QuorumPeerConfig.java:406 ) в org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse (QuorumPeerConfig.java:150) ... более 2 это Zookeeper.Свойства: DATADIR = / вар / Библиотека / Zookeeper / ClientPort = 2181 initLimit = 5 syncLimit = 2 TickTime = 2000 server.1 = 192.168.0.21: 2888: 3888 server.2 = 192.168.0.22: 2888: 3888 = 192,168 server.3 .0.36: 2888: 3888 также я создал MyId файл, который содержит целочисленный идентификатор в / вар / Библиотека / Zookeeper / каталог
1

голосов
1

ответ
35

Просмотры

Нужно настроить нагрузки Ба-улан для настройки 5 узлов кластера Кафка на Linux RHEL7.5 виртуальной машине

Am настройка 5 узлов кластера Кафка на Linux RHEL7.5 виртуальной машине, нужен ли я настроить нагрузки Ба-улан? Если да, на котором узел мне нужно настроить и на какой порт?
Priyanka Marihal
1

голосов
2

ответ
876

Просмотры

Может ли потребитель Кафки совершить сдвиг в отдельном потоке?

Есть ли Кафка разрешение один поток или процесс, чтобы использовать данные из раздела, в то время как другой поток или процесс берет на себя ответственность вручную совершать смещение когда данные были полностью обработаны?
LittleSiberia
1

голосов
0

ответ
1.1k

Просмотры

Кафка сервер TimeoutException: Освобождающиеся 1 запись (ы) для

У меня есть экземпляры Кафки, работающие в 2-х различных виртуальных машинах. Я могу отправлять сообщения Кафки работает в VM-1 с помощью пружинного Кафка-шаблон, но при отправке сообщений Кафки работает в VM-2, я получаю ниже исключение: 2018-04-19 15:12: 57 [Кафка-производитель-сеть-нить | производитель-1] ОШИБКА osksLoggingProducerListener - исключение выбрасывается при отправке сообщения с ключом = 'хххх' и полезной нагрузки = '{79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, -28, 51, 123, 34, 116, 12 ...»на тему XXXXXX-v1: org.apache.kafka.common.errors.TimeoutException: Освобождающиеся 1 запись (ы) для XXXXXX -v1-3: 60043 мс прошло с момента создания пакетного плюс задерживаться время конфигурирования Производитель config.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, хххх: порт); config.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); config.put (ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); config.put (ProducerConfig.BATCH_SIZE_CONFIG, 1048576); config.put (ProducerConfig.LINGER_MS_CONFIG, 100); config.put (ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760); config.put (ProducerConfig.ACKS_CONFIG, "все"); Я могу телнет как для виртуальных машин, а также пинг их.
Ajey kumar HB
1

голосов
2

ответ
2.3k

Просмотры

Невозможно создать Avro родовую запись из объекта

Я пытаюсь отправить Avro записей на тему Кафки с помощью продюсера Кафка. У меня есть класс User, и я посылаю объект этого класса. Ниже код работает отлично, если я использую avroRecord.put (); для установки каждого атрибута. Но то, что я хочу, чтобы создать общую запись от объекта без использования avroRecord.put (); для каждого атрибута. Класс пользователя открытого класса пользователь {INT идентификатор; Имя строки; Public User (интермедиат идентификатор, имя String) {супер (); this.id = ID; this.name = имя; } Общественного ИНТ GetId () {возвращение идентификатор; } Общественного недействительными SETID (интермедиат ID) {this.id = ID; } Строка GetName () {имя общественного возврата; } Общественной недействительная SetName (имя String) {this.name = имя; }} Sender импорт класс org.apache.avro.Schema; импорт org.apache.avro.generic.GenericData; импорт org.apache.avro.generic.GenericDatumWriter; импорт org.apache.avro.generic.GenericRecord; импорт org.apache.avro.io.DatumWriter; импорт org.apache.avro.io.Encoder; импорт org.apache.avro.io.EncoderFactory; импорт org.apache.avro.reflect.ReflectData; импорт org.apache.kafka.clients.producer.KafkaProducer; импорт org.apache.kafka.clients.producer.ProducerConfig; импорт org.apache.kafka.clients.producer.ProducerRecord; импорт java.io.ByteArrayOutputStream; импорт java.io.IOException; импорт java.util.Properties; импорт vo.User; общественного класса Отправитель {государственной статической силы основных (String [] арг) {Пользователь пользователя = новый пользователь (10, «АСМАП»); Схема схемы = ReflectData.get () GetSchema (user.getClass ()). GenericRecord avroRecord = новый GenericData.Record (схема); // работает отлично /*avroRecord.put("id», user.getId ()); avroRecord.put ( "имя", user.getName ()); * / // не работает DatumWriter datumWriter = новый GenericDatumWriter (схема); ByteArrayOutputStream OutputStream = новый ByteArrayOutputStream (); . Кодер Кодер = EncoderFactory.get () binaryEncoder (OutputStream, NULL); попробуйте {datumWriter.write (пользователь, энкодер); encoder.flush (); } Поймать (IOException е1) {e1.printStackTrace (); } ProducerRecord запись = новый ProducerRecord ( "avrotesttopic1", avroRecord); Свойства реквизита = новые свойства (); props.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put (ProducerConfig. KEY_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class); props.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class); props.put ( "schema.registry.url", "http://127.0.0.1:8081"); производитель KafkaProducer = новые KafkaProducer (реквизит); попробуйте {producer.send (запись); producer.flush (); } Задвижка (Исключение е) {e.printStackTrace (); } Producer.close (); }} Как я могу опубликовать этот объект в мой Кафке тему, как Avro? Я упомянул ниже ссылки https://github.com/akmalmuqeeth/confluent-kafka-spring-demo/blob/master/src/main/java/ConfluentProducerApp.java https://findusages.com/search/org.apache .avro.io.DatumWriter / запись $ 2 смещение = 23 HTTPS: // WWW.
blasteralfred Ψ
1

голосов
1

ответ
572

Просмотры

весна Кафка производительность производитель шаблон

Я использую шаблон Spring Кафка для получения сообщений. И скорость, с которой он производит сообщения слишком медленно. Займет около 8 минут для получения 15000 сообщений. Ниже Как я создал шаблон Кафка: @Bean общественного ProducerFactory highSpeedAvroProducerFactory (@Qualifier ( "highSpeedProducerProperties") KafkaProperties свойства) {окончательная карта kafkaPropertiesMap = properties.getKafkaPropertiesMap (); System.out.println (kafkaPropertiesMap); kafkaPropertiesMap.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); kafkaPropertiesMap.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroGenericSerializer.class); вернуть новый DefaultKafkaProducerFactory (kafkaPropertiesMap); } @Bean общественного KafkaTemplate highSpeedAvroKafkaTemplate (@Qualifier (»
Prabhakar D
1

голосов
1

ответ
430

Просмотры

Как найти причину, почему Кафка брокер получил вниз?

Мы используем Кафка версию 2.11-0.11.0.2 и Zookeeper версию 3.3.6. И они работают на Ubuntu 16.0.4. Мы производим данные со скоростью 3k сообщений в секунду и размер каждого сообщения составляет около 150 байт. мы имеем 7 брокеров и 1 зоопарк, а иногда некоторые из брокеров спускайтесь. Так как мы можем выяснить причину этого, почему конкретный брокер завелся вниз? И где журналы брокера остановки сохраняются, откуда мы можем проверить журналы и найти, почему он был остановлен?
Rajendra Jangir
1

голосов
1

ответ
141

Просмотры

Кафка высота выборки размера чистилище

Я мониторинг наших брокеров Кафку, и я увидел наш размер выборки чистилище 6K - 10К. что эффект Hight выборка чистилища размера на потребителях?
Osham
1

голосов
0

ответ
191

Просмотры

Запланированный Скрипт для расчета общего потребительского лаг Кафки

Я хочу написать скрипт, который вычисляет общее отставание каждого потребителя / раздела. Я знаю общую команду. Bin / Кафка потребителя-групп -bootstrap-сервер: 9092,: 9092,: 9092 --describe --group Что лучший способ для анализа данных и распечатать его для входа? Наряду с названием темы и отставанием потребительского Добавлю TIMESTAMP каждый раз, когда эта команда выполняется. так что я могу отправить данные в упругий поиск и сделать метрику общего отставания от времени на потребитель. Мы не хотим использовать менеджер инструменты Кафки и рыть Кафка версию 1.1.0 Спасибо,
oraclept
1

голосов
0

ответ
229

Просмотры

Кафка разъем JDBC раковина не правильно конвертировать метку времени

INSERT INTO "тест" ( "время", "TIME2") VALUES ( '2018-2-2 20: 14: 45,507000 +0: 0: 0', 1517602485507) это странно, потому что дата и код такой же. Почему первое поле даты «время» конвертирует хорошо, но во втором у меня есть проблемы?
Mikhail
1

голосов
1

ответ
485

Просмотры

Тайм-аут соединения с Кафка работает с Windows, Subsystem для Linux

Я установил Кафку 1.1.0 с Zookeeper 3.4.12 под Windows, Subsystem для Linux на моем Windows 10 ноутбука. Я могу procude и получать сообщения в то время как я остаюсь в убунту, но когда я хочу, чтобы произвести сообщение из окна (с помощью программы Java или с помощью инструмента Кафка-консоль-producer.bat) У меня есть следующее сообщение об ошибке: [2018- 05-11 15: 31: 01449] ERROR Ошибка при отправке сообщений на тему тест с ключом: нулевое, значение: 15 байт с ошибкой: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka. common.errors.TimeoutException: Освобождающиеся 1 запись (ы) для теста-0: 1534 мс прошло с момента создания пакетного плюс задерживаться время любая идея?
Pit
1

голосов
1

ответ
52

Просмотры

Как добавить в тему confiuration в Кафка Admin API?

Список newKafkaTopicsList = новый список; NewTopic newTopic = новый NewTopic ( "topicName", getPartitionCount (), getReplicationFactor ()); newKafkaTopicsList.add (newTopic) Ниже adminClient апи, чтобы создать тему, которая принимает список, который предоставляется Кафка adminClient, который имеет конструктор (имя java.lang.String, внутр numPartitions, короткие replicationFactor) NewTopic и метод конфиги конфиги (java.util. Карта конфига) Может кто-нибудь объяснить, как передать карту методе конфиги? CreateTopicsResult createTopicsResult = adminClient.createTopics (newKafkaTopicsList);
Bharat
1

голосов
1

ответ
206

Просмотры

Можно ли запустить встроенную Кафка брокер как ActiveMQ?

Когда я говорю, внедренное это то, что я имею в виду, пожалуйста, обратитесь к следующей теме - https://stackoverflow.com/a/28858630/5375223 Позволь мне дать некоторый контекст, в нашем проекте, сейчас мы используем ActiveMQ в качестве брокера сообщений, и теперь мы планируем мигрировать к Кафке. Пожалуйста, смотрите прилагаемую фотографию. Как и в схеме выше, производитель посылает данные встроенного брокера и, затем встроен брокер отправляет данные внешнего брокера, которые соединены с помощью конфигурации сети URL-адреса, как показано ниже. Таким образом, если внешний брокер не доступен все сообщения не будут сидеть во встроенном брокере, пока его наличие. Таким образом, есть в любом случае делать, как это с Кафкой? вместо прямого подключения к брокеру работает вне приложения
krishna_5c3
1

голосов
0

ответ
458

Просмотры

Отправка массива байт в Кафка .Net

В настоящее время я столкнулся с проблемой с моим кодом, ниже которого я написал для Кафки. Код достаточно прост, производитель производит массив байт и потребитель хочет потреблять массив. Проблема с ним в том, что она ни отображает какую-либо ошибку, ни я в состоянии получить требуемую мощность. Я уверен, что ByteArray, (Размер: 4.06MB) вход получает правильные значения, которые будут переданы в трубопровод Кафки. Оба метода потребительских не дает требуемой мощности. В способе 1 код не проходит состояние ПЧ «Consumer.Consume ()» и в методе 2, «MSG» переменная приходит нуль. Пожалуйста, посоветуйте, как к тому, что я делаю неправильно. Кафка Производитель: общественный класс BookingProducer: IBookingProducer {общественного недействительными продукты (байт [] ByteArray) {вар конфигурации = новый словарь {{ "bootstrap.servers", "локальный": 9092}, { "message.max.bytes", "5242880"}}; используя (производитель вар = новый Производитель (конфигурации, нулевой, новый ByteArraySerializer)) () {результат переменная = producer.ProduceAsync ( "timemanagement_booking", нулевой, ByteArray) .GetAwaiter () GetResult (). //Console.WriteLine($"Value: {result.Value} "); producer.Flush (10000); }}} Kafka Потребитель Метод 1: класс BookingConsumer общественности {общественного недействительными Listen () {вар конфиг = новый словарь {{ "group.id", "booking_consumer"}, { "bootstrap.servers", "локальный: 9092"}, { "enable.auto.commit" , "ложь"}, { "fetch.message.max.bytes", "5242880"}}; используя (вар потребитель = новый Потребитель (конфигурации, нулевой новый ByteArrayDeserializer ())) {consumer.Subscribe ( "timemanagement_booking"); в то время как (истинно) {сбщ сообщение; consumer.Poll (10000); если (потребитель. Потребление (из MSG, TimeSpan.FromSeconds (1))) {Console.WriteLine (msg.Value.ToArray ()); }}}}} Метод Кафка Потребитель 2: BookingConsumer общественного класса {общественного недействительными Listen () {вар конфиг = новый словарь {{ "group.id", "booking_consumer"}, { "bootstrap.servers", "локальный: 9092" }, { "enable.auto.commit", "ложь"}, { "fetch.message.max.bytes", "5242880"}}; используя (вар потребитель = новый Потребитель (конфигурации, нулевой новый ByteArrayDeserializer ())) {consumer.Subscribe (»
Kenil44
1

голосов
0

ответ
433

Просмотры

How to consume data in HDFS using Hive with the Kafka-Python client?

Извините, если это глупый вопрос. Я новичок всего этого трубопровода вещи :) Я использовал Кафка-питон клиента создать производителя, который посылает CSV (один CSV строка = один Кафка сообщение). Обратите внимание, что я сериализовать его в строку с помощью JSON, и кодируется, что в байты как UTF-8. Затем я создал потребителя, который декодирует сообщения (одна строка CSV теперь строка) и выводит их на терминал. Теперь мне нужно, чтобы сохранить эти данные в HDFS с помощью улья. Я хочу, чтобы вставить каждое сообщение в улую таблицу, а затем я хочу сделать огромный выбор, чтобы извлечь все данные в одном файле. Что такое лучший способ сделать это с помощью Python? Вот что я сделал: Во-первых, я запустить сервер Zookeeper: bin / zookeeper-server-start.sh конфигурации / zookeeper.properties Затем я запустить сервер Кафка: бен / kafka-server-start.sh конфигурации / server.properties Тогда я начинаю свой потребитель и производитель: мой продюсер:
Niklas Donges
1

голосов
1

ответ
164

Просмотры

HDFS написать от Кафки: createBlockOutputStream Exception

Я использую Hadoop из Докер роя с 1 NameNode и 3 DataNodes (на 3-х физических машин). Я также использую Кафка и Кафка подключение + разъем HDFS писать сообщения в HDFS в формате паркетного. Я могу записать данные в HDFS с помощью клиентов HDFS (HDFS ставить). Но когда Кафка пишут сообщения, он работает в самом начале, а затем, если не удается с этой ошибкой: org.apache.hadoop.net.ConnectTimeoutException: 60000 Миллиса тайм-аут во время ожидания для канала, чтобы быть готовыми к подключению. ч: java.nio.channels.SocketChannel [соединение рассматриваемой дистанционного = / 10.0.0.8: 50010] в org.apache.hadoop.net.NetUtils.connect (NetUtils.java:534) при org.apache.hadoop.hdfs. DFSOutputStream.createSocketForPipeline (DFSOutputStream.java:1533) в org.apache.hadoop.hdfs.DFSOutputStream $ DataStreamer.createBlockOutputStream (DFSOutputStream.java:1309) в орг. apache.hadoop.hdfs.DFSOutputStream $ DataStreamer.nextBlockOutputStream (DFSOutputStream.java:1262) при org.apache.hadoop.hdfs.DFSOutputStream $ DataStreamer.run (DFSOutputStream.java:448) [2018-05-23 10: 30: 10125 ] ИНФОРМАЦИЯ Отказ ВР-468254989-172.17.0.2-1527063205150: blk_1073741825_1001 (org.apache.hadoop.hdfs.DFSClient: 1265) [2018-05-23 10: 30: 10148] INFO Исключая DataNode DatanodeInfoWithStorage [10.0.0.8:50010, DS-cd1c0b17-Бебб-4379-a5e8-5de7ff7a7064, DISK] (org.apache.hadoop.hdfs.DFSClient 1269) [2018-05-23 10: 31: 10203] ИНФОРМАЦИЯ Исключение в createBlockOutputStream (org.apache.hadoop. hdfs.DFSClient: 1368) org.apache.hadoop.net.ConnectTimeoutException: 60000 Миллиса тайм-аут во время ожидания для канала, чтобы быть готовыми к подключению. ч: java.nio.channels.SocketChannel [соединение рассматриваемой дистанционного = / 10.0.0.9: 50010] в org.apache.hadoop.net.NetUtils. Есть 3 DataNode (ы) работает и 3 узла (ов) исключены в этой операции. на org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock (BlockManager.java:1733) в org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock (FSDirWriteFileOp.java:265) в org.apache .hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock (FSNamesystem.java:2496) в org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock (NameNodeRpcServer.java:828) Но если я смотрю в Hadoop сети консоли администратора, все узлы, кажется, и OK. Я проверил HDFS-сайт и настройка «dfs.client.use.datanode.hostname» установлена ​​истина как на NameNode и DataNodes. Все изобр в файлах конфигурации Hadoop определяются с помощью 0.0.0.0 адреса. Я' пробовал форматировать NameNode тоже, но ошибка повторилась. Может проблема в том, что Кафка пишет слишком быстро в HDFS, так что переполняет его? Было бы странно, как я попробовал ту же конфигурацию, в меньшем кластере, и она работала хорошо, даже с большими сообщениями throughputof Кафки. Есть ли у вас какие-либо другие идеи о происхождении этой проблемы? Спасибо
Loic
1

голосов
0

ответ
1.6k

Просмотры

Кафка группы потребителей балансировку и группа Координатор мертв

Я играл вокруг с Кафкой (1.0.0) в течение нескольких месяцев, и пытаюсь понять, как работает группа потребителей. У меня есть один брокер Кафка и я использую Кафка-Connect-Кассандру, чтобы получать сообщения от темы к таблицам базы данных. У меня есть 10 тем, все они имеют только один раздел, и у меня есть один группа потребителей с 10 потребительских экземплярами (по одному для каждой темы). Во время работы этой установки я иногда вижу следующие журналы в Кафка-подключении консоли: 1: [Рабочий ClientId = соединить-1, идентификатор_группы = подключения кластеров] Маркировка координатор Qa-сервер: 9092 (ID: 2147483647 стойки: Null) мертвой ( org.apache.kafka.clients.consumer.internals.AbstractCoordinator: 341) [рабочий ClientID = соединить-1, идентификатор_группы = соединить-кластер] Обнаруженный координатор группы QA-сервера: 9092 (ID: 2147483647 стойка: нуль) (org.apache .kafka.clients.consumer.internals.AbstractCoordinator: ] (Org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: 341) После этого он начинает потребляющие сообщения и записывает в Cassandra таблицы. Это часто случается с нерегулярными интервалами. Тем не менее, иногда разъем останавливается и выключается. Затем он начинает и снова потребляет сообщения. Это журнал: INFO [Рабочий ClientId = соединить-1, идентификатор_группы = соединить-кластер] Пометка координатора Qa-сервера: 9092 (ID: 2147483647 стойку: нуль) мертвые (org.apache.kafka.clients.consumer.internals. AbstractCoordinator: 341) ИНФОРМАЦИЯ [рабочий ClientID = соединить-1, идентификатор_группы = соединить-кластер] Обнаруженный координатор группы QA-сервера: 9092 (ID: 2147483647 стойка: нулевая) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator: 341) ИНФОРМАЦИЯ [рабочий ClientID = соединить-1, идентификатор_группы = соединить-кластер] Маркировка координатор Qa-сервер: 9092 (ID: 2147483647 стойку: нуль) мертвые (org.apache.kafka. clients.consumer.internals.AbstractCoordinator: 341) ИНФОРМАЦИЯ [Рабочий ClientID = соединить-1, идентификатор_группы = соединить-кластер] Открыто группа координатор-сервер QA: 9092 (ID: 2147483647 стойка: нуль) (org.apache.kafka.clients. consumer.internals.AbstractCoordinator: 341) ИНФОРМАЦИЯ WorkerSinkTask {ID = Cassandra-мойка-CASB-0} совершение смещения асинхронно, используя порядковый номер 42: {topic1-0 = OffsetAndMetadata {смещение = 1074, метаданные = ''}, topic2-0 = OffsetAndMetadata {смещение = 112, метаданные = ''}, ...}} (org.apache.kafka.connect.runtime.WorkerSinkTask: 311) ИНФОРМАЦИЯ балансировку начала (org.apache.kafka.connect.runtime.distributed.DistributedHerder: 1214) ИНФОРМАЦИЯ Остановка разъем Cassandra-мойка-CASB (org.apache.kafka.connect.runtime.Worker: 304) ИНФОРМАЦИЯ Остановка задачи Cassandra-мойка-CASB-0 (org.apache.kafka.connect.runtime.Worker: 464) INFO Остановка Cassandra тонуть. (Ком. datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask: 79) INFO Выключение сессии драйвера Кассандры и кластера. (Com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter: 253) ИНФОРМАЦИЯ Stopped разъем Cassandra-мойка-CASB (org.apache.kafka.connect.runtime.Worker: 320) ИНФОРМАЦИЯ Готовой остановки задач при подготовке к ребалансу (орг .apache.kafka.connect.runtime.distributed.DistributedHerder: 1244) ИНФОРМАЦИЯ [рабочий ClientID = соединить-1, идентификатор_группы = соединить-кластер] (пере-) присоединение группы (org.apache.kafka.clients.consumer.internals.AbstractCoordinator : 336) ИНФОРМАЦИЯ [рабочий ClientID = соединить-1, идентификатор_группы = соединить-кластер] Успешно присоединился группа с 7 поколения (org.apache.kafka.clients.consumer.internals.AbstractCoordinator: 341) INFO присоединился к группе и получил задание: назначение { ошибка = 0, лидер =» Connect-1-1dc56cda-ed54-4181-a5f9-d11022d8e8c3' , leaderUrl = 'HTTP: //127.0.1.1: 8083 /', смещение = 8, connectorIds = [Кассандры раковина CASB], taskIds = [Кассандры мойку -casb-0]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder: 1192) INFO Запуск соединителей и задачи с использованием конфигурации смещения 8 (org.apache.kafka.connect.runtime.distributed.DistributedHerder: 837) INFO Начиная разъем Кассандра раковины-CASB (org.apache.kafka.connect.runtime.distributed.DistributedHerder: 890) 2: org.apache.kafka.clients.consumer.CommitFailedException: Commit не может быть завершен, поскольку группа уже балансировки и назначена перегородки к другому члену. Это означает, что время между последующими вызовами для опроса () больше, чем сконфигурированный max.poll.interval.ms, которые, как правило, означает, что цикл опроса тратит слишком много времени на обработку сообщений. Вы можете решить эту проблему либо путем увеличения тайм-аута сеанса или за счет уменьшения максимального размера пакетов, возвращаемых в опросе () с max.poll.records. на org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest (ConsumerCoordinator.java:722) в org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync (ConsumerCoordinator.java:600) в org.apache .kafka.clients.consumer.KafkaConsumer.commitSync (KafkaConsumer.java:1250) при org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync (WorkerSinkTask.java:299) при org.apache.kafka.connect.runtime.WorkerSinkTask .doCommit (WorkerSinkTask.java:327) при org.apache.kafka.connect.runtime.WorkerSinkTask. идентификатор_группы = соединить-Cassandra-раковина-CASB] Успешно присоединился к группе с поколением 343 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator: 341) ИНФОРМАЦИЯ [Потребитель ClientID = потребитель-5, идентификатор_группы = соединить-Cassandra-sink- CASB] Установка вновь назначенных разделов [topic1-0, topic2-0, ...] (org.apache.kafka.cl ients.consumer.internals.ConsumerCoordinator: 341) INFO WorkerSinkTask {ID = Кассандры раковина CASB-0} совершение смещения асинхронно, используя порядковый номер 155: {topic1-0 = OffsetAndMetadata {смещение = 836, метаданные = ''}, topic2-0 = OffsetAndMetadata {смещение = 86, метаданные = ''}, ...}} (org.apache .kafka.connect.runtime.WorkerSinkTask: 311) Опять же иногда Кафка-Connect начинает потреблять сообщения после ребаланса, а иногда он выключается. У меня есть следующие вопросы: 1) Почему координатор группы (Кафка брокер) умирает? Я ищу в нескольких Кафки-конфиги для решения этих проблем, как connections.max.idle.ms, max.poll.records, session.timeout.ms, group.min.session.timeout.ms и group.max.session. тайм-аут. Я не уверен, что лучшие конфиги бы для вещей, чтобы работать бесперебойно. 2) Почему восстановить равновесие происходит? Я знаю, что группа ребаланс может произойти при добавлении новой задачи, изменение задачи и т.д. Но я ничего не изменилось. Иногда рамки Кафка Connect, кажется, обработать ошибку немного слишком агрессивный и убивает CONNECT задачи вместо выполнения на работе. 2) Почему восстановить равновесие происходит? Я знаю, что группа ребаланс может произойти при добавлении новой задачи, изменение задачи и т.д. Но я ничего не изменилось. Иногда рамки Кафка Connect, кажется, обработать ошибку немного слишком агрессивный и убивает CONNECT задачи вместо выполнения на работе. 2) Почему восстановить равновесие происходит? Я знаю, что группа ребаланс может произойти при добавлении новой задачи, изменение задачи и т.д. Но я ничего не изменилось. Иногда рамки Кафка Connect, кажется, обработать ошибку немного слишком агрессивный и убивает CONNECT задачи вместо выполнения на работе.
el323
1

голосов
0

ответ
192

Просмотры

Как я могу использовать свечу-SQL, чтобы прочитать и обработать событие JSON от Кафки?

Разработал: глядя реализовать код SCALA с использованием структурированных искровой потоковой DataFrame прочитать событие JSON от Кафки, а также использовать свечи-SQL для манипулирования данных / столбцов и записать его в улей? Использование 2.11 Scala / искровые 2.2 Я понимаю создания соединения прямо вперед: вал ДФ = искра .readStream .format ( "Кафка") .option ( "kafka.bootstrap.servers", "host1: port1, host2: port2"). вариант ( "подписаться", "topic1") .load () df.selectExpr ( "CAST (ключ AS STRING)", "CAST (значение AS STRING)") .as [(String, String)] Как справиться с JSON событие? Предполагая, что все события имеют одинаковую схему, я должен предоставить схему, и если это так, как это делается, а также, если есть способ сделать вывод схемы, как это делается? Если я правильно понимаю, что я тогда создать tempView, как я бегу SQL-подобные запросы на эту точку зрения? Edit требует вашей системы: система автоматической помечена этот пост повторов, это не так. В связанном вопросе OP попросил решить проблему с его существующим кодом, и один (действительно) ответ был рассмотрен вопросы, с десериализацией JSON. Мой вопрос / s отличаются, как указано выше. Если мой вопрос не ясно, пожалуйста, спросите конкретно, и я попытаюсь прояснить дальше. Спасибо.
DigitalFailure

Просмотр дополнительных вопросов

Связанные вопросы