Skip to main content

ConsumeKafka

Описание

Потребляет сообщения от API Consumer Apache Kafka. Дополнительный процессор NiFi для отправки сообщений — это PublishKafka. Процессор поддерживает потребление сообщений Kafka, которые могут быть, по выбору, интерпретированы как записи NiFi. Обратите внимание, что на данный момент (в режиме чтения записей) процессор предполагает, что все записи, полученные из данного раздела, имеют одинаковую схему. Для этого режима, если какие-либо сообщения Kafka получены, но не могут быть разобраны или записаны с использованием настроенного Record Reader или Record Writer, содержимое сообщения будет записано в отдельный файл потока, и этот файл потока будет передан в отношение «parse.failure». В противном случае каждый файл потока отправляется в отношение «success» и может содержать множество отдельных сообщений в одном файле потока. Добавляется атрибут «record.count», указывающий, сколько сообщений содержится в файле потока. Ни одно из сообщений Kafka не будет помещено в один файл потока, если они имеют разные схемы или если они имеют разные значения для заголовка сообщения, включенного в свойство <Заголовки для добавления в качестве атрибутов>.

Теги

Kafka, Get, Record, csv, avro, json, Ingest, Ingress, Topic, PubSub, Consume

Свойства

НазваниеОписание
Kafka Connection ServiceПредоставляет подключения к брокеру Kafka для публикации записей Kafka
Group IDИдентификатор группы потребителей Kafka, соответствующий свойству Kafka group.id
Topic FormatУказывает, являются ли указанные темы списком имён, разделённых запятыми, или одним регулярным выражением
TopicsИмя или шаблон тем Kafka, из которых процессор потребляет записи Kafka. Если темы разделены запятыми, можно указать более одной. Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)
Auto Offset ResetАвтоматическая настройка смещения, применяемая, если не найдено предыдущее смещение потребителя, соответствующее свойству Kafka auto.offset.reset
Commit OffsetsУказывает, должен ли этот процессор фиксировать смещения в Kafka после получения сообщений. Как правило, для этого значения следует установить значение true, чтобы получаемые сообщения не дублировались. Однако в некоторых сценариях мы можем захотеть не фиксировать смещения, чтобы данные можно было обработать и позже подтвердить с помощью PublishKafka, чтобы обеспечить семантику «точно один раз».
Max Uncommitted TimeУказывает максимальное время, которое может пройти до фиксации смещений. Это значение влияет на то, как часто будут фиксироваться смещения. Чем реже фиксируются смещения, тем выше пропускная способность, но тем больше вероятность дублирования данных в случае перезагрузки JVM или балансировки нагрузки между фиксациями. Это значение также связано с максимальным количеством записей опроса и использованием разделителя сообщений. При использовании разделителя сообщений у нас может быть гораздо больше незафиксированных сообщений, чем без него, поскольку нам нужно отслеживать гораздо меньше сообщений в памяти.This Property is only considered if the [Commit Offsets] Property has a value of "true".
Header Name PatternШаблон регулярного выражения, применяемый к именам заголовков записей Kafka для выбора значений заголовков, которые будут записаны в качестве атрибутов FlowFile
Header EncodingNo description provided.
Processing StrategyСтратегия обработки записей Kafka и записи сериализованного вывода в FlowFiles
Record ReaderСредство чтения записей, используемое для входящих сообщений KafkaThis Property is only considered if the [Processing Strategy] Property has a value of "RECORD".
Record WriterСредство записи, используемое для сериализации исходящих FlowFilesThis Property is only considered if the [Processing Strategy] Property has a value of "RECORD".
Output StrategyФормат, используемый для вывода записи Kafka в запись FlowFile.This Property is only considered if the [Processing Strategy] Property has a value of "RECORD".
Key Attribute EncodingКодировка значения настроенного атрибута FlowFile, содержащего ключ записи Kafka.This Property is only considered if the [Output Strategy] Property has a value of "Use Content as Value".
Key FormatУказывает, как представлять ключ записи Kafka в выходном FlowFileThis Property is only considered if the [Output Strategy] Property has a value of "Use Wrapper".
Key Record ReaderСредство чтения записей, используемое для преобразования ключа записи Kafka в записьThis Property is only considered if the [Key Format] Property has a value of "Record".
Message DemarcatorПоскольку KafkaConsumer получает сообщения пакетами, этот процессор может выводить файлы Flow, содержащие все сообщения Kafka в одном пакете для заданной темы и раздела. Это свойство позволяет указать строку (интерпретируемую как UTF-8) для разделения нескольких сообщений Kafka. Это необязательное свойство, и если оно не указано, каждое полученное сообщение Kafka будет приводить к созданию одного файла Flow. Чтобы ввести специальный символ, например «новую строку», нажмите CTRL+Enter или Shift+Enter в зависимости от операционной системы.This Property is only considered if the [Processing Strategy] Property has a value of "DEMARCATOR".
Separate By KeyЕсли это свойство включено, два сообщения будут добавлены в один и тот же FlowFile только в том случае, если оба сообщения Kafka имеют одинаковые ключи.This Property is only considered if the [Message Demarcator] Property has a value specified.

Взаимосвязи

  • success: Файлы потока, содержащие одну или несколько сериализованных записей Kafka