ConsumeKafkaRecord_2_6
Описание
Потребляет сообщения от Apache Kafka, специально созданные с использованием API Consumer Kafka 2.6. Дополнительный процессор NiFi для отправки сообщений — это PublishKafkaRecord_2_6. Обратите внимание, что на данный момент процессор предполагает, что все записи, полученные из данного раздела, имеют одинаковую схему. Если какие-либо сообщения Kafka получены, но не могут быть разобраны или записаны с использованием настроенного Record Reader или Record Writer, содержимое сообщения будет записано в отдельный файл потока, и этот файл потока будет передан в отношение «parse.failure». В противном случае каждый файл потока отправляется в отношение «success» и может содержать множество отдельных сообщений в одном файле потока. Добавляется атрибут «record.count», указывающий, сколько сообщений содержится в файле потока. Ни одно из сообщений Kafka не будет помещено в один файл потока, если они имеют разные схемы или если они имеют разные значения для заголовка сообщения, включенного в свойство <Заголовки для добавления в качестве атрибутов>.
Теги
Kafka, Get, Record, csv, avro, json, Ingest, Ingress, Topic, PubSub, Consume, 2.6
Свойства
Название | Описание |
---|---|
Kafka Brokers | Comma-separated list of Kafka Brokers in the format host:port Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды) |
Topic Name(s) | The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated. Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды) |
Topic Name Format | Указывает, являются ли указанные темы списком названий, разделенных запятыми, или одним регулярным выражением. |
Value Record Reader | Средство чтения записей, используемое для входящих потоковых файлов. |
Record Value Writer | Запись, используемая для сериализации данных перед отправкой в Kafka |
Group ID | Идентификатор группы используется для идентификации потребителей, входящих в одну группу. Соответствует свойству 'group.id' в Kafka. Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды) |
Output Strategy | Формат, используемый для вывода записи Kafka в запись FlowFile. |
Headers to Add as Attributes (Regex) | A Regular Expression that is matched against all message headers. Any message header whose name matches the regex will be added to the FlowFile as an Attribute. If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that header is selected by the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a regex like ".*" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling the messages together efficiently.This Property is only considered if the [Output Strategy] Property has a value of "Use Content as Value". |
Key Attribute Encoding | Если для свойства <Разделять по ключу> установлено значение true, в отправляемых файлах FlowFiles есть атрибут с именем Это свойство определяет, как следует кодировать значение атрибута.This Property is only considered if the [Output Strategy] Property has a value of "Use Content as Value". |
Key Format | Указывает, как представлять ключ записи Kafka в выводеThis Property is only considered if the [Output Strategy] Property has a value of "Use Wrapper". |
Key Record Reader | Средство чтения записей, используемое для преобразования ключа записи Kafka в записьThis Property is only considered if the [Key Format] Property has a value of "Record". |
Commit Offsets | Указывает, должен ли этот процессор фиксировать смещения в Kafka после получения сообщений. Это значение должно быть равно false, если ожидается, что процессор PublishKafkaRecord зафиксирует смещения, используя семантику Exactly Once, и должно быть зарезервировано для потоков данных, которые предназначены для работы в NiFi без сохранения состояния. Дополнительную информацию смотрите в разделе Использование процессора / Дополнительные сведения. Обратите внимание, что установка для этого значения значения false может привести к значительному дублированию данных или, возможно, даже к потере данных, если поток данных настроен неправильно. |
Max Uncommitted Time | Указывает максимальное время, которое может пройти до фиксации смещений. Это значение влияет на то, как часто будут фиксироваться смещения. Чем реже фиксируются смещения, тем выше пропускная способность, но тем больше вероятность дублирования данных в случае перебалансировки или перезапуска JVM между фиксациями. Это значение также связано с максимальным количеством записей опроса и использованием разделителя сообщений. При использовании разделителя сообщений у нас может быть гораздо больше незафиксированных сообщений, чем без него, так как нам нужно отслеживать гораздо меньше данных в памяти.This Property is only considered if the [Commit Offsets] Property has a value of "true". |
Honor Transactions | Указывает, должен ли NiFi соблюдать гарантии транзакций при взаимодействии с Kafka. Если значение равно false, процессор будет использовать "уровень изоляции" read_uncomitted. Это означает, что сообщения будут получены, как только они будут записаны в Kafka, но будут извлечены, даже если производитель отменит транзакции. Если это значение равно true, NiFi не будет получать никаких сообщений, для которых транзакция производителя была отменена, но это может привести к некоторой задержке, поскольку потребитель должен ждать, пока производитель завершит всю транзакцию, вместо того чтобы извлекать сообщения по мере их поступления. |
Security Protocol | Security protocol used to communicate with brokers. Corresponds to Kafka Client security.protocol property |
SASL Mechanism | SASL mechanism used for authentication. Corresponds to Kafka Client sasl.mechanism property |
Kerberos User Service | Service supporting user authentication with Kerberos |
Kerberos Service Name | The service name that matches the primary name of the Kafka server configured in the broker JAAS configuration Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды) |
Username | Username provided with configured password when using PLAIN or SCRAM SASL Mechanisms Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)This Property is only considered if the [SASL Mechanism] Property is set to one of the following values: [PLAIN], [SCRAM-SHA-512], [SCRAM-SHA-256] |
Password | Password provided with configured username when using PLAIN or SCRAM SASL Mechanisms Чувствительный параметр: true Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)This Property is only considered if the [SASL Mechanism] Property is set to one of the following values: [PLAIN], [SCRAM-SHA-512], [SCRAM-SHA-256] |
Token Authentication | Enables or disables Token authentication when using SCRAM SASL MechanismsThis Property is only considered if the [SASL Mechanism] Property is set to one of the following values: [SCRAM-SHA-512], [SCRAM-SHA-256] |
AWS Profile Name | The Amazon Web Services Profile to select when multiple profiles are available. Поддерживает язык выражений: true (будет оцениваться только с использованием переменных среды)This Property is only considered if the [SASL Mechanism] Property |
SSL Context Service | Service supporting SSL communication with Kafka brokers |
Separate By Key | Если установлено значение true, две записи будут добавлены в один и тот же FlowFile только в том случае, если оба сообщения Kafka имеют одинаковые ключи. |
Offset Reset | Позволяет управлять состоянием, когда в Kafka нет начального смещения или если текущее смещение больше не существует на сервере (например, потому что эти данные были удалены). Соответствует свойству Kafka 'auto.offset.reset'. |
Message Header Encoding | Любой заголовок сообщения, обнаруженный в сообщении Kafka, будет добавлен в исходящий FlowFile в качестве атрибута. Это свойство указывает кодировку символов, используемую для десериализации заголовков. |
Max Poll Records | Указывает максимальное количество записей, которые Kafka должна возвращать за один опрос. |
Communications Timeout | Указывает время ожидания, которое потребитель должен использовать при взаимодействии с брокером Kafka. |
Взаимосвязи
- parse.failure: Если сообщение из Kafka не может быть проанализировано с помощью настроенного считывателя записей, содержимое сообщения будет направлено в это отношение в виде отдельного файла потока.
- success: FlowFiles, полученные из Kafka. В зависимости от стратегии разграничения это может быть файл потока для каждого сообщения или набор сообщений, сгруппированных по темам и разделам.