#15 kafka : producer - patition producer

This commit is contained in:
haerong22
2022-08-02 01:34:44 +09:00
parent 741e5fe240
commit b9a344b361
4 changed files with 78 additions and 0 deletions

View File

@@ -0,0 +1,33 @@
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
Cluster cluster) {
if (keyBytes == null) {
throw new InvalidRecordException("Need message key");
}
if (((String)key).equals("custom"))
return 0;
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void configure(Map<String, ?> configs) {}
@Override
public void close() {}
}

View File

@@ -15,4 +15,13 @@ public class KafkaConfig {
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(configs);
}
protected static KafkaProducer<String, String> customPartitionerSetup() {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfig.BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
return new KafkaProducer<>(configs);
}
}

View File

@@ -0,0 +1,20 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerExactPartition {
public static void main(String[] args) {
KafkaProducer<String, String> producer = KafkaConfig.setup();
int partitionNo = 0;
ProducerRecord<String, String> record =
new ProducerRecord<>(KafkaConfig.TOPIC_NAME, partitionNo, "partition", "partition");
producer.send(record);
producer.flush();
producer.close();
}
}

View File

@@ -0,0 +1,16 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerWithCustomPartitioner {
public static void main(String[] args) {
KafkaProducer<String, String> producer = KafkaConfig.customPartitionerSetup();
ProducerRecord<String, String> record =
new ProducerRecord<>(KafkaConfig.TOPIC_NAME, "custom-pt", "custom-pt");
producer.send(record);
producer.flush();
producer.close();
}
}