KafkaBindingProperties has no documentation (#802)
* KafkaBindingProperties has no documentation Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/739 No popup help in IDE due to missing javadocs in KafkaBindingProperties, KafkaConsumerProperties and KafkaProducerProperties. Some IDEs currently only show javadocs on getter methods from POJOs used insdide a map. Therefore, some javadocs are duplicated between fields and getter methods. * Addressing PR review comments * Addressing PR review comments
This commit is contained in:
committed by
Gary Russell
parent
637ec2e55d
commit
78ff4f1a70
@@ -26,10 +26,20 @@ import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
|
||||
*/
|
||||
public class KafkaBindingProperties implements BinderSpecificPropertiesProvider {
|
||||
|
||||
/**
|
||||
* Consumer specific binding properties. @see {@link KafkaConsumerProperties}.
|
||||
*/
|
||||
private KafkaConsumerProperties consumer = new KafkaConsumerProperties();
|
||||
|
||||
/**
|
||||
* Producer specific binding properties. @see {@link KafkaProducerProperties}.
|
||||
*/
|
||||
private KafkaProducerProperties producer = new KafkaProducerProperties();
|
||||
|
||||
/**
|
||||
* @return {@link KafkaConsumerProperties}
|
||||
* Consumer specific binding properties. @see {@link KafkaConsumerProperties}.
|
||||
*/
|
||||
public KafkaConsumerProperties getConsumer() {
|
||||
return this.consumer;
|
||||
}
|
||||
@@ -38,6 +48,10 @@ public class KafkaBindingProperties implements BinderSpecificPropertiesProvider
|
||||
this.consumer = consumer;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@link KafkaProducerProperties}
|
||||
* Producer specific binding properties. @see {@link KafkaProducerProperties}.
|
||||
*/
|
||||
public KafkaProducerProperties getProducer() {
|
||||
return this.producer;
|
||||
}
|
||||
|
||||
@@ -84,40 +84,109 @@ public class KafkaConsumerProperties {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* When true the offset is committed after each record, otherwise the offsets for the complete set of records
|
||||
* received from the poll() are committed after all records have been processed.
|
||||
*/
|
||||
private boolean ackEachRecord;
|
||||
|
||||
/**
|
||||
* When true, topic partitions is automatically rebalanced between the members of a consumer group.
|
||||
* When false, each consumer is assigned a fixed set of partitions based on spring.cloud.stream.instanceCount and spring.cloud.stream.instanceIndex.
|
||||
*/
|
||||
private boolean autoRebalanceEnabled = true;
|
||||
|
||||
/**
|
||||
* Whether to autocommit offsets when a message has been processed.
|
||||
* If set to false, a header with the key kafka_acknowledgment of the type org.springframework.kafka.support.Acknowledgment header
|
||||
* is present in the inbound message. Applications may use this header for acknowledging messages.
|
||||
*/
|
||||
private boolean autoCommitOffset = true;
|
||||
|
||||
/**
|
||||
* Effective only if autoCommitOffset is set to true.
|
||||
* If set to false, it suppresses auto-commits for messages that result in errors and commits only for successful messages.
|
||||
* It allows a stream to automatically replay from the last successfully processed message, in case of persistent failures.
|
||||
* If set to true, it always auto-commits (if auto-commit is enabled).
|
||||
* If not set (the default), it effectively has the same value as enableDlq,
|
||||
* auto-committing erroneous messages if they are sent to a DLQ and not committing them otherwise.
|
||||
*/
|
||||
private Boolean autoCommitOnError;
|
||||
|
||||
/**
|
||||
* The starting offset for new groups. Allowed values: earliest and latest.
|
||||
*/
|
||||
private StartOffset startOffset;
|
||||
|
||||
/**
|
||||
* Whether to reset offsets on the consumer to the value provided by startOffset.
|
||||
* Must be false if a KafkaRebalanceListener is provided.
|
||||
*/
|
||||
private boolean resetOffsets;
|
||||
|
||||
/**
|
||||
* When set to true, it enables DLQ behavior for the consumer.
|
||||
* By default, messages that result in errors are forwarded to a topic named error.name-of-destination.name-of-group.
|
||||
* The DLQ topic name can be configurable by setting the dlqName property.
|
||||
*/
|
||||
private boolean enableDlq;
|
||||
|
||||
/**
|
||||
* The name of the DLQ topic to receive the error messages.
|
||||
*/
|
||||
private String dlqName;
|
||||
|
||||
/**
|
||||
* Number of partitions to use on the DLQ.
|
||||
*/
|
||||
private Integer dlqPartitions;
|
||||
|
||||
/**
|
||||
* Using this, DLQ-specific producer properties can be set.
|
||||
* All the properties available through kafka producer properties can be set through this property.
|
||||
*/
|
||||
private KafkaProducerProperties dlqProducerProperties = new KafkaProducerProperties();
|
||||
|
||||
/**
|
||||
* @deprecated No longer used by the binder.
|
||||
*/
|
||||
private int recoveryInterval = 5000;
|
||||
|
||||
/**
|
||||
* List of trusted packages to provide the header mapper.
|
||||
*/
|
||||
private String[] trustedPackages;
|
||||
|
||||
/**
|
||||
* Indicates which standard headers are populated by the inbound channel adapter.
|
||||
* Allowed values: none, id, timestamp, or both.
|
||||
*/
|
||||
private StandardHeaders standardHeaders = StandardHeaders.none;
|
||||
|
||||
/**
|
||||
* The name of a bean that implements RecordMessageConverter.
|
||||
*/
|
||||
private String converterBeanName;
|
||||
|
||||
/**
|
||||
* The interval, in milliseconds, between events indicating that no messages have recently been received.
|
||||
*/
|
||||
private long idleEventInterval = 30_000;
|
||||
|
||||
/**
|
||||
* When true, the destination is treated as a regular expression Pattern used to match topic names by the broker.
|
||||
*/
|
||||
private boolean destinationIsPattern;
|
||||
|
||||
/**
|
||||
* Map with a key/value pair containing generic Kafka consumer properties.
|
||||
* In addition to having Kafka consumer properties, other configuration properties can be passed here.
|
||||
*/
|
||||
private Map<String, String> configuration = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Various topic level properties. @see {@link KafkaTopicProperties} for more details.
|
||||
*/
|
||||
private KafkaTopicProperties topic = new KafkaTopicProperties();
|
||||
|
||||
/**
|
||||
@@ -125,6 +194,12 @@ public class KafkaConsumerProperties {
|
||||
*/
|
||||
private long pollTimeout = org.springframework.kafka.listener.ConsumerProperties.DEFAULT_POLL_TIMEOUT;
|
||||
|
||||
/**
|
||||
* @return if each record needs to be acknowledged.
|
||||
*
|
||||
* When true the offset is committed after each record, otherwise the offsets for the complete set of records
|
||||
* received from the poll() are committed after all records have been processed.
|
||||
*/
|
||||
public boolean isAckEachRecord() {
|
||||
return this.ackEachRecord;
|
||||
}
|
||||
@@ -133,6 +208,13 @@ public class KafkaConsumerProperties {
|
||||
this.ackEachRecord = ackEachRecord;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return is autocommit offset enabled
|
||||
*
|
||||
* Whether to autocommit offsets when a message has been processed.
|
||||
* If set to false, a header with the key kafka_acknowledgment of the type org.springframework.kafka.support.Acknowledgment header
|
||||
* is present in the inbound message. Applications may use this header for acknowledging messages.
|
||||
*/
|
||||
public boolean isAutoCommitOffset() {
|
||||
return this.autoCommitOffset;
|
||||
}
|
||||
@@ -141,6 +223,11 @@ public class KafkaConsumerProperties {
|
||||
this.autoCommitOffset = autoCommitOffset;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return start offset
|
||||
*
|
||||
* The starting offset for new groups. Allowed values: earliest and latest.
|
||||
*/
|
||||
public StartOffset getStartOffset() {
|
||||
return this.startOffset;
|
||||
}
|
||||
@@ -149,6 +236,12 @@ public class KafkaConsumerProperties {
|
||||
this.startOffset = startOffset;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return if resetting offset is enabled
|
||||
*
|
||||
* Whether to reset offsets on the consumer to the value provided by startOffset.
|
||||
* Must be false if a KafkaRebalanceListener is provided.
|
||||
*/
|
||||
public boolean isResetOffsets() {
|
||||
return this.resetOffsets;
|
||||
}
|
||||
@@ -157,6 +250,13 @@ public class KafkaConsumerProperties {
|
||||
this.resetOffsets = resetOffsets;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return is DLQ enabled.
|
||||
*
|
||||
* When set to true, it enables DLQ behavior for the consumer.
|
||||
* By default, messages that result in errors are forwarded to a topic named error.name-of-destination.name-of-group.
|
||||
* The DLQ topic name can be configurable by setting the dlqName property.
|
||||
*/
|
||||
public boolean isEnableDlq() {
|
||||
return this.enableDlq;
|
||||
}
|
||||
@@ -165,6 +265,16 @@ public class KafkaConsumerProperties {
|
||||
this.enableDlq = enableDlq;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return is autocommit on error
|
||||
*
|
||||
* Effective only if autoCommitOffset is set to true.
|
||||
* If set to false, it suppresses auto-commits for messages that result in errors and commits only for successful messages.
|
||||
* It allows a stream to automatically replay from the last successfully processed message, in case of persistent failures.
|
||||
* If set to true, it always auto-commits (if auto-commit is enabled).
|
||||
* If not set (the default), it effectively has the same value as enableDlq,
|
||||
* auto-committing erroneous messages if they are sent to a DLQ and not committing them otherwise.
|
||||
*/
|
||||
public Boolean getAutoCommitOnError() {
|
||||
return this.autoCommitOnError;
|
||||
}
|
||||
@@ -193,6 +303,12 @@ public class KafkaConsumerProperties {
|
||||
this.recoveryInterval = recoveryInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return is auto rebalance enabled
|
||||
*
|
||||
* When true, topic partitions is automatically rebalanced between the members of a consumer group.
|
||||
* When false, each consumer is assigned a fixed set of partitions based on spring.cloud.stream.instanceCount and spring.cloud.stream.instanceIndex.
|
||||
*/
|
||||
public boolean isAutoRebalanceEnabled() {
|
||||
return this.autoRebalanceEnabled;
|
||||
}
|
||||
@@ -201,6 +317,12 @@ public class KafkaConsumerProperties {
|
||||
this.autoRebalanceEnabled = autoRebalanceEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a map of configuration
|
||||
*
|
||||
* Map with a key/value pair containing generic Kafka consumer properties.
|
||||
* In addition to having Kafka consumer properties, other configuration properties can be passed here.
|
||||
*/
|
||||
public Map<String, String> getConfiguration() {
|
||||
return this.configuration;
|
||||
}
|
||||
@@ -209,6 +331,11 @@ public class KafkaConsumerProperties {
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return dlq name
|
||||
*
|
||||
* The name of the DLQ topic to receive the error messages.
|
||||
*/
|
||||
public String getDlqName() {
|
||||
return this.dlqName;
|
||||
}
|
||||
@@ -217,6 +344,11 @@ public class KafkaConsumerProperties {
|
||||
this.dlqName = dlqName;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return number of partitions on the DLQ topic
|
||||
*
|
||||
* Number of partitions to use on the DLQ.
|
||||
*/
|
||||
public Integer getDlqPartitions() {
|
||||
return this.dlqPartitions;
|
||||
}
|
||||
@@ -225,6 +357,11 @@ public class KafkaConsumerProperties {
|
||||
this.dlqPartitions = dlqPartitions;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return trusted packages
|
||||
*
|
||||
* List of trusted packages to provide the header mapper.
|
||||
*/
|
||||
public String[] getTrustedPackages() {
|
||||
return this.trustedPackages;
|
||||
}
|
||||
@@ -233,6 +370,12 @@ public class KafkaConsumerProperties {
|
||||
this.trustedPackages = trustedPackages;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return dlq producer properties
|
||||
*
|
||||
* Using this, DLQ-specific producer properties can be set.
|
||||
* All the properties available through kafka producer properties can be set through this property.
|
||||
*/
|
||||
public KafkaProducerProperties getDlqProducerProperties() {
|
||||
return this.dlqProducerProperties;
|
||||
}
|
||||
@@ -241,6 +384,12 @@ public class KafkaConsumerProperties {
|
||||
this.dlqProducerProperties = dlqProducerProperties;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return standard headers
|
||||
*
|
||||
* Indicates which standard headers are populated by the inbound channel adapter.
|
||||
* Allowed values: none, id, timestamp, or both.
|
||||
*/
|
||||
public StandardHeaders getStandardHeaders() {
|
||||
return this.standardHeaders;
|
||||
}
|
||||
@@ -249,6 +398,11 @@ public class KafkaConsumerProperties {
|
||||
this.standardHeaders = standardHeaders;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return converter bean name
|
||||
*
|
||||
* The name of a bean that implements RecordMessageConverter.
|
||||
*/
|
||||
public String getConverterBeanName() {
|
||||
return this.converterBeanName;
|
||||
}
|
||||
@@ -257,6 +411,11 @@ public class KafkaConsumerProperties {
|
||||
this.converterBeanName = converterBeanName;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return idle event interval
|
||||
*
|
||||
* The interval, in milliseconds, between events indicating that no messages have recently been received.
|
||||
*/
|
||||
public long getIdleEventInterval() {
|
||||
return this.idleEventInterval;
|
||||
}
|
||||
@@ -265,6 +424,11 @@ public class KafkaConsumerProperties {
|
||||
this.idleEventInterval = idleEventInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return is destination given through a pattern
|
||||
*
|
||||
* When true, the destination is treated as a regular expression Pattern used to match topic names by the broker.
|
||||
*/
|
||||
public boolean isDestinationIsPattern() {
|
||||
return this.destinationIsPattern;
|
||||
}
|
||||
@@ -273,6 +437,11 @@ public class KafkaConsumerProperties {
|
||||
this.destinationIsPattern = destinationIsPattern;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return topic properties
|
||||
*
|
||||
* Various topic level properties. @see {@link KafkaTopicProperties} for more details.
|
||||
*/
|
||||
public KafkaTopicProperties getTopic() {
|
||||
return this.topic;
|
||||
}
|
||||
@@ -281,6 +450,11 @@ public class KafkaConsumerProperties {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return timeout in pollable consumers
|
||||
*
|
||||
* Timeout used for polling in pollable consumers.
|
||||
*/
|
||||
public long getPollTimeout() {
|
||||
return this.pollTimeout;
|
||||
}
|
||||
|
||||
@@ -33,28 +33,72 @@ import org.springframework.expression.Expression;
|
||||
*/
|
||||
public class KafkaProducerProperties {
|
||||
|
||||
/**
|
||||
* Upper limit, in bytes, of how much data the Kafka producer attempts to batch before sending.
|
||||
*/
|
||||
private int bufferSize = 16384;
|
||||
|
||||
/**
|
||||
* Set the compression.type producer property. Supported values are none, gzip, snappy and lz4.
|
||||
* See {@link CompressionType} for more details.
|
||||
*/
|
||||
private CompressionType compressionType = CompressionType.none;
|
||||
|
||||
/**
|
||||
* Whether the producer is synchronous.
|
||||
*/
|
||||
private boolean sync;
|
||||
|
||||
/**
|
||||
* A SpEL expression evaluated against the outgoing message used to evaluate the time to wait
|
||||
* for ack when synchronous publish is enabled.
|
||||
*/
|
||||
private Expression sendTimeoutExpression;
|
||||
|
||||
/**
|
||||
* How long the producer waits to allow more messages to accumulate in the same batch before sending the messages.
|
||||
*/
|
||||
private int batchTimeout;
|
||||
|
||||
/**
|
||||
* A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message.
|
||||
*/
|
||||
private Expression messageKeyExpression;
|
||||
|
||||
/**
|
||||
* A comma-delimited list of simple patterns to match Spring messaging headers
|
||||
* to be mapped to the Kafka Headers in the ProducerRecord.
|
||||
*/
|
||||
private String[] headerPatterns;
|
||||
|
||||
/**
|
||||
* Map with a key/value pair containing generic Kafka producer properties.
|
||||
*/
|
||||
private Map<String, String> configuration = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Various topic level properties. @see {@link KafkaTopicProperties} for more details.
|
||||
*/
|
||||
private KafkaTopicProperties topic = new KafkaTopicProperties();
|
||||
|
||||
/**
|
||||
* Set to true to override the default binding destination (topic name) with the value of the
|
||||
* KafkaHeaders.TOPIC message header in the outbound message. If the header is not present,
|
||||
* the default binding destination is used.
|
||||
*/
|
||||
private boolean useTopicHeader;
|
||||
|
||||
/**
|
||||
* The bean name of a MessageChannel to which successful send results should be sent;
|
||||
* the bean must exist in the application context.
|
||||
*/
|
||||
private String recordMetadataChannel;
|
||||
|
||||
/**
|
||||
* @return buffer size
|
||||
*
|
||||
* Upper limit, in bytes, of how much data the Kafka producer attempts to batch before sending.
|
||||
*/
|
||||
public int getBufferSize() {
|
||||
return this.bufferSize;
|
||||
}
|
||||
@@ -63,6 +107,12 @@ public class KafkaProducerProperties {
|
||||
this.bufferSize = bufferSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return compression type {@link CompressionType}
|
||||
*
|
||||
* Set the compression.type producer property. Supported values are none, gzip, snappy and lz4.
|
||||
* See {@link CompressionType} for more details.
|
||||
*/
|
||||
@NotNull
|
||||
public CompressionType getCompressionType() {
|
||||
return this.compressionType;
|
||||
@@ -72,6 +122,11 @@ public class KafkaProducerProperties {
|
||||
this.compressionType = compressionType;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return if synchronous sending is enabled
|
||||
*
|
||||
* Whether the producer is synchronous.
|
||||
*/
|
||||
public boolean isSync() {
|
||||
return this.sync;
|
||||
}
|
||||
@@ -80,6 +135,12 @@ public class KafkaProducerProperties {
|
||||
this.sync = sync;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return timeout expression for send
|
||||
*
|
||||
* A SpEL expression evaluated against the outgoing message used to evaluate the time to wait
|
||||
* for ack when synchronous publish is enabled.
|
||||
*/
|
||||
public Expression getSendTimeoutExpression() {
|
||||
return this.sendTimeoutExpression;
|
||||
}
|
||||
@@ -88,6 +149,11 @@ public class KafkaProducerProperties {
|
||||
this.sendTimeoutExpression = sendTimeoutExpression;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return batch timeout
|
||||
*
|
||||
* How long the producer waits to allow more messages to accumulate in the same batch before sending the messages.
|
||||
*/
|
||||
public int getBatchTimeout() {
|
||||
return this.batchTimeout;
|
||||
}
|
||||
@@ -96,6 +162,11 @@ public class KafkaProducerProperties {
|
||||
this.batchTimeout = batchTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return message key expression
|
||||
*
|
||||
* A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message.
|
||||
*/
|
||||
public Expression getMessageKeyExpression() {
|
||||
return this.messageKeyExpression;
|
||||
}
|
||||
@@ -104,6 +175,12 @@ public class KafkaProducerProperties {
|
||||
this.messageKeyExpression = messageKeyExpression;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return header patterns
|
||||
*
|
||||
* A comma-delimited list of simple patterns to match Spring messaging headers
|
||||
* to be mapped to the Kafka Headers in the ProducerRecord.
|
||||
*/
|
||||
public String[] getHeaderPatterns() {
|
||||
return this.headerPatterns;
|
||||
}
|
||||
@@ -112,6 +189,11 @@ public class KafkaProducerProperties {
|
||||
this.headerPatterns = headerPatterns;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return map of configuration
|
||||
*
|
||||
* Map with a key/value pair containing generic Kafka producer properties.
|
||||
*/
|
||||
public Map<String, String> getConfiguration() {
|
||||
return this.configuration;
|
||||
}
|
||||
@@ -120,6 +202,11 @@ public class KafkaProducerProperties {
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return topic properties
|
||||
*
|
||||
* Various topic level properties. @see {@link KafkaTopicProperties} for more details.
|
||||
*/
|
||||
public KafkaTopicProperties getTopic() {
|
||||
return this.topic;
|
||||
}
|
||||
@@ -128,6 +215,13 @@ public class KafkaProducerProperties {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return if using topic header
|
||||
*
|
||||
* Set to true to override the default binding destination (topic name) with the value of the
|
||||
* KafkaHeaders.TOPIC message header in the outbound message. If the header is not present,
|
||||
* the default binding destination is used.
|
||||
*/
|
||||
public boolean isUseTopicHeader() {
|
||||
return this.useTopicHeader;
|
||||
}
|
||||
@@ -136,6 +230,12 @@ public class KafkaProducerProperties {
|
||||
this.useTopicHeader = useTopicHeader;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return record metadata channel
|
||||
*
|
||||
* The bean name of a MessageChannel to which successful send results should be sent;
|
||||
* the bean must exist in the application context.
|
||||
*/
|
||||
public String getRecordMetadataChannel() {
|
||||
return this.recordMetadataChannel;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user