ConsumeKafka_2_6
Описание
Потребляет сообщения от Apache Kafka, специально созданные с использованием API Consumer Kafka 2.6. Дополнительный процессор NiFi для отправки сообщений — это PublishKafka_2_6.
Теги
Kafka, Get, Ingest, Ingress, Topic, PubSub, Consume, 2.6
Свойства
Название | Описание |
---|---|
Kafka Brokers | Comma-separated list of Kafka Brokers in the format host:port Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды) |
Topic Name(s) | Название темы (тем) Kafka для получения данных. Если они разделены запятыми, можно указать несколько. Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды) |
Topic Name Format | Указывает, являются ли указанные темы списком названий, разделенных запятыми, или одним регулярным выражением. |
Group ID | Идентификатор группы используется для идентификации потребителей, входящих в одну группу потребителей. Соответствует свойству Kafka 'group.id". Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды) |
Commit Offsets | Указывает, должен ли этот процессор фиксировать смещения в Kafka после получения сообщений. Как правило, мы хотим, чтобы это значение было равно true, чтобы получаемые сообщения не дублировались. Однако в некоторых сценариях мы можем захотеть избежать фиксации смещений, чтобы данные можно было обработать и позже подтвердить с помощью PublishKafkaRecord, чтобы обеспечить семантику «точно один раз». Дополнительную информацию см. в разделе «Использование процессора / Дополнительные сведения». |
Max Uncommitted Time | Указывает максимальное время, которое может пройти до фиксации смещений. Это значение влияет на то, как часто будут фиксироваться смещения. Фиксация смещений реже увеличивает пропускную способность, но также увеличивает время потенциального дублирования данных в случае перебалансировки или перезапуска виртуальной машины между фиксациями. Это значение также связано с максимальным количеством записей опроса и использованием средства разграничения сообщений. При использовании демаркатора сообщений мы можем получать гораздо больше незафиксированных сообщений, чем когда их нет, поскольку нам гораздо меньше нужно отслеживать в памяти.This Property is only considered if the [Commit Offsets] Property has a value of "true". |
Honor Transactions | Указывает, должен ли NiFi соблюдать гарантии транзакций при взаимодействии с Kafka. Если значение равно false, процессор будет использовать «уровень изоляции» read_uncomitted. Это означает, что сообщения будут получены, как только они будут записаны в Kafka, но будут извлечены, даже если производитель отменит транзакции. Если это значение равно true, NiFi не будет получать никаких сообщений, для которых транзакция производителя была отменена, но это может привести к некоторой задержке, поскольку потребитель должен ждать, пока производитель завершит всю транзакцию, вместо того чтобы извлекать сообщения по мере их поступления. |
Message Demarcator | Поскольку KafkaConsumer получает сообщения пакетами, у вас есть возможность выводить файлы FlowFiles, которые содержат все сообщения Kafka в одном пакете для заданной темы и раздела. Это свойство позволяет указать строку (интерпретируемую как UTF-8) для разделения нескольких сообщений Kafka. Это необязательное свойство, и если оно не задано, то каждое полученное сообщение Kafka приведёт к созданию одного файла FlowFile, который будет запускаться при каждом получении сообщения. Чтобы ввести специальный символ, например «новую строку», используйте CTRL+Enter или Shift+Enter в зависимости от операционной системы. Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды) |
Separate By Key | Если установлено значение true и задано свойство <разделитель сообщений>, то два сообщения будут добавлены в один и тот же файл FlowFile только в том случае, если оба сообщения Kafka имеют одинаковые ключи. |
Security Protocol | Security protocol used to communicate with brokers. Corresponds to Kafka Client security.protocol property |
SASL Mechanism | SASL mechanism used for authentication. Corresponds to Kafka Client sasl.mechanism property |
Kerberos User Service | Service supporting user authentication with Kerberos |
Kerberos Service Name | The service name that matches the primary name of the Kafka server configured in the broker JAAS configuration Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды) |
Username | Username provided with configured password when using PLAIN or SCRAM SASL Mechanisms Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)This Property is only considered if the [SASL Mechanism] Property is set to one of the following values: [PLAIN], [SCRAM-SHA-512], [SCRAM-SHA-256] |
Password | Password provided with configured username when using PLAIN or SCRAM SASL Mechanisms Чувствительный параметр: true Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)This Property is only considered if the [SASL Mechanism] Property is set to one of the following values: [PLAIN], [SCRAM-SHA-512], [SCRAM-SHA-256] |
Token Authentication | Enables or disables Token authentication when using SCRAM SASL MechanismsThis Property is only considered if the [SASL Mechanism] Property is set to one of the following values: [SCRAM-SHA-512], [SCRAM-SHA-256] |
AWS Profile Name | The Amazon Web Services Profile to select when multiple profiles are available. Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)This Property is only considered if the [SASL Mechanism] Property |
SSL Context Service | Service supporting SSL communication with Kafka brokers |
Key Attribute Encoding | Выводимые файлы потока имеют атрибут с именем '" + KafkaFlowFileAttribute.KAFKA_KEY + "'. Это свойство определяет, как должно быть закодировано значение атрибута. |
Offset Reset | Позволяет управлять состоянием, когда в Kafka нет начального смещения или если текущее смещение больше не существует на сервере (например, потому что эти данные были удалены). Соответствует свойству 'auto.offset.reset' в Kafka. |
Message Header Encoding | Любой заголовок сообщения, найденный в сообщении Kafka, будет добавлен в исходящий FlowFile в качестве атрибута. Это свойство указывает кодировку символов, использу емую для десериализации заголовков. |
Headers to Add as Attributes (Regex) | A Regular Expression that is matched against all message headers. Any message header whose name matches the regex will be added to the FlowFile as an Attribute. If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that header is selected by the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a regex like ".*" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling the messages together efficiently. |
Max Poll Records | Указывает максимальное количество записей, которые Kafka должна возвращать за один опрос. |
Communications Timeout | Указывает время ожидания, которое потребитель должен использовать при взаимодействии с брокером Kafka. |
Взаимосвязи
- success: Файлы потока, полученные из Kafka. В зависимости от стратегии разграничения это может быть файл потока для каждого сообщения или набор сообщений, сгруппированных по теме и разделу.