Перейти к основному содержимому

ConsumeKafka_2_6

Описание

Потребляет сообщения от Apache Kafka, специально созданные с использованием API Consumer Kafka 2.6. Дополнительный процессор NiFi для отправки сообщений — это PublishKafka_2_6.

Теги

Kafka, Get, Ingest, Ingress, Topic, PubSub, Consume, 2.6

Свойства

НазваниеОписание
Kafka BrokersComma-separated list of Kafka Brokers in the format host:port Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)
Topic Name(s)Название темы (тем) Kafka для получения данных. Если они разделены запятыми, можно указать несколько. Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)
Topic Name FormatУказывает, являются ли указанные темы списком названий, разделенных запятыми, или одним регулярным выражением.
Group IDИдентификатор группы используется для идентификации потребителей, входящих в одну группу потребителей. Соответствует свойству Kafka 'group.id". Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)
Commit OffsetsУказывает, должен ли этот процессор фиксировать смещения в Kafka после получения сообщений. Как правило, мы хотим, чтобы это значение было равно true, чтобы получаемые сообщения не дублировались. Однако в некоторых сценариях мы можем захотеть избежать фиксации смещений, чтобы данные можно было обработать и позже подтвердить с помощью PublishKafkaRecord, чтобы обеспечить семантику «точно один раз». Дополнительную информацию см. в разделе «Использование процессора / Дополнительные сведения».
Max Uncommitted TimeУказывает максимальное время, которое может пройти до фиксации смещений. Это значение влияет на то, как часто будут фиксироваться смещения. Фиксация смещений реже увеличивает пропускную способность, но также увеличивает время потенциального дублирования данных в случае перебалансировки или перезапуска виртуальной машины между фиксациями. Это значение также связано с максимальным количеством записей опроса и использованием средства разграничения сообщений. При использовании демаркатора сообщений мы можем получать гораздо больше незафиксированных сообщений, чем когда их нет, поскольку нам гораздо меньше нужно отслеживать в памяти.This Property is only considered if the [Commit Offsets] Property has a value of "true".
Honor TransactionsУказывает, должен ли NiFi соблюдать гарантии транзакций при взаимодействии с Kafka. Если значение равно false, процессор будет использовать «уровень изоляции» read_uncomitted. Это означает, что сообщения будут получены, как только они будут записаны в Kafka, но будут извлечены, даже если производитель отменит транзакции. Если это значение равно true, NiFi не будет получать никаких сообщений, для которых транзакция производителя была отменена, но это может привести к некоторой задержке, поскольку потребитель должен ждать, пока производитель завершит всю транзакцию, вместо того чтобы извлекать сообщения по мере их поступления.
Message DemarcatorПоскольку KafkaConsumer получает сообщения пакетами, у вас есть возможность выводить файлы FlowFiles, которые содержат все сообщения Kafka в одном пакете для заданной темы и раздела. Это свойство позволяет указать строку (интерпретируемую как UTF-8) для разделения нескольких сообщений Kafka. Это необязательное свойство, и если оно не задано, то каждое полученное сообщение Kafka приведёт к созданию одного файла FlowFile, который будет запускаться при каждом получении сообщения. Чтобы ввести специальный символ, например «новую строку», используйте CTRL+Enter или Shift+Enter в зависимости от операционной системы. Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)
Separate By KeyЕсли установлено значение true и задано свойство <разделитель сообщений>, то два сообщения будут добавлены в один и тот же файл FlowFile только в том случае, если оба сообщения Kafka имеют одинаковые ключи.
Security ProtocolSecurity protocol used to communicate with brokers. Corresponds to Kafka Client security.protocol property
SASL MechanismSASL mechanism used for authentication. Corresponds to Kafka Client sasl.mechanism property
Kerberos User ServiceService supporting user authentication with Kerberos
Kerberos Service NameThe service name that matches the primary name of the Kafka server configured in the broker JAAS configuration Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)
UsernameUsername provided with configured password when using PLAIN or SCRAM SASL Mechanisms Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)This Property is only considered if the [SASL Mechanism] Property is set to one of the following values: [PLAIN], [SCRAM-SHA-512], [SCRAM-SHA-256]
PasswordPassword provided with configured username when using PLAIN or SCRAM SASL Mechanisms Чувствительный параметр: true Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)This Property is only considered if the [SASL Mechanism] Property is set to one of the following values: [PLAIN], [SCRAM-SHA-512], [SCRAM-SHA-256]
Token AuthenticationEnables or disables Token authentication when using SCRAM SASL MechanismsThis Property is only considered if the [SASL Mechanism] Property is set to one of the following values: [SCRAM-SHA-512], [SCRAM-SHA-256]
AWS Profile NameThe Amazon Web Services Profile to select when multiple profiles are available. Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)This Property is only considered if the [SASL Mechanism] Property
SSL Context ServiceService supporting SSL communication with Kafka brokers
Key Attribute EncodingВыводимые файлы потока имеют атрибут с именем '" + KafkaFlowFileAttribute.KAFKA_KEY + "'. Это свойство определяет, как должно быть закодировано значение атрибута.
Offset ResetПозволяет управлять состоянием, когда в Kafka нет начального смещения или если текущее смещение больше не существует на сервере (например, потому что эти данные были удалены). Соответствует свойству 'auto.offset.reset' в Kafka.
Message Header EncodingЛюбой заголовок сообщения, найденный в сообщении Kafka, будет добавлен в исходящий FlowFile в качестве атрибута. Это свойство указывает кодировку символов, используемую для десериализации заголовков.
Headers to Add as Attributes (Regex)A Regular Expression that is matched against all message headers. Any message header whose name matches the regex will be added to the FlowFile as an Attribute. If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that header is selected by the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a regex like ".*" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling the messages together efficiently.
Max Poll RecordsУказывает максимальное количество записей, которые Kafka должна возвращать за один опрос.
Communications TimeoutУказывает время ожидания, которое потребитель должен использовать при взаимодействии с брокером Kafka.

Взаимосвязи

  • success: Файлы потока, полученные из Kafka. В зависимости от стратегии разграничения это может быть файл потока для каждого сообщения или набор сообщений, сгруппированных по теме и разделу.