ConsumeKafka
Описание
Потребляет сообщения от API Consumer Apache Kafka. Дополнительный процессор NiFi для отправки сообщений — это PublishKafka. Процессор поддерживает потребление сообщений Kafka, которые могут быть, по выбору, интерпретированы как записи NiFi. Обратите внимание, что на данный момент (в режиме чтения записей) процессор предполагает, что все записи, полученные из данного раздела, имеют одинаковую схему. Для этого режима, если какие-либо сообщения Kafka получены, но не могут быть разобраны или записаны с использованием настроенного Record Reader или Record Writer, содержимое сообщения будет записано в отдельный файл потока, и этот файл потока будет передан в отношение «parse.failure». В противном случае каждый файл потока отправляется в отношение «success» и может содержать множество отдельных сообщений в одном файле потока. Добавляется атрибут «record.count», указывающий, сколько сообщений содержится в файле потока. Ни одно из сообщений Kafka не будет помещено в один файл потока, если они имеют разные схемы или если они имеют разные значения для заголовка сообщения, включенного в свойство <Заголовки для добавления в качестве атрибутов>.
Теги
Kafka, Get, Record, csv, avro, json, Ingest, Ingress, Topic, PubSub, Consume
Свойства
Название | Описание |
---|---|
Kafka Connection Service | Предоставляет подключения к брокеру Kafka для публикации записей Kafka |
Group ID | Идентификатор группы потребителей Kafka, соответствующий свойству Kafka group.id |
Topic Format | Указывает, являются ли указанные темы списком имён, разделённых запятыми, или одним регулярным выражением |
Topics | Имя или шаблон тем Kafka, из которых процессор потребляет записи Kafka. Если темы разделены запятыми, можно указать более одной. Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды) |
Auto Offset Reset | Автоматическая настройка смещения, применяемая, если не найдено предыдущее смещение потребителя, соответствующее свойству Kafka auto.offset.reset |
Commit Offsets | Указывает, должен ли этот процессор фиксировать смещения в Kafka после получения сообщений. Как правило, для этого значения следует установить значение true, чтобы получаемые сообщения не дублировались. Однако в некоторых сценариях мы можем захотеть не фиксировать смещения, чтобы данные можно было обработать и позже подтвердить с помощью PublishKafka, чтобы обеспечить семантику «точно один раз». |
Max Uncommitted Time | Указывает максимальное время, которое может пройти до фиксации смещений. Это значение влияет на то, как часто будут фиксироваться смещения. Чем реже фиксируются смещения, тем выше пропускная способность, но тем больше вероятность дублирования данных в случае перезагрузки JVM или балансировки нагрузки между фиксациями. Это значение также связано с максимальным количеством записей опроса и использованием разделителя сообщений. При использовании разделителя сообщений у нас может быть гораздо больше незафиксированных сообщений, чем без него, поскольку нам нужно отслеживать гораздо меньше сообщений в памяти.This Property is only considered if the [Commit Offsets] Property has a value of "true". |
Header Name Pattern | Шаблон регулярного выражения, применяемый к именам заголовков записей Kafka для выбора значений заголовков, которые будут записаны в качестве атрибутов FlowFile |
Header Encoding | No description provided. |
Processing Strategy | Стратегия обработки записей Kafka и записи сериализованного вывода в FlowFiles |
Record Reader | Средство чтения записей, используемое для входящих сообщений KafkaThis Property is only considered if the [Processing Strategy] Property has a value of "RECORD". |
Record Writer | Средство записи, используемое для сериализации исходящих FlowFilesThis Property is only considered if the [Processing Strategy] Property has a value of "RECORD". |
Output Strategy | Формат, используемый для вывода записи Kafka в запись FlowFile.This Property is only considered if the [Processing Strategy] Property has a value of "RECORD". |
Key Attribute Encoding | Кодировка значения настроенного атрибута FlowFile, содержащего ключ записи Kafka.This Property is only considered if the [Output Strategy] Property has a value of "Use Content as Value". |
Key Format | Указывает, как представлять ключ записи Kafka в выходном FlowFileThis Property is only considered if the [Output Strategy] Property has a value of "Use Wrapper". |
Key Record Reader | Средство чтения записей, используемое для преобразования ключа записи Kafka в записьThis Property is only considered if the [Key Format] Property has a value of "Record". |
Message Demarcator | Поскольку KafkaConsumer получает сообщения пакетами, этот процессор может выводить файлы Flow, содержащие все сообщения Kafka в одном пакете для заданной темы и раздела. Это свойство позволяет указать строку (интерпретируемую как UTF-8) для разделения нескольких сообщений Kafka. Это необязательное свойство, и если оно не указано, каждое полученное сообщение Kafka будет приводить к созданию одного файла Flow. Чтобы ввести специальный символ, например «новую строку», нажмите CTRL+Enter или Shift+Enter в зависимости от операционной системы.This Property is only considered if the [Processing Strategy] Property has a value of "DEMARCATOR". |
Separate By Key | Если это свойство включено, два сообщения будут добавлены в один и тот же FlowFile только в том случае, если оба сообщения Kafka имеют одинаковые ключи.This Property is only considered if the [Message Demarcator] Property has a value specified. |
Взаимосвязи
- success: Файлы потока, содержащие одну или несколько сериализованных записей Kafka