diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/pom.xml b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/pom.xml index 397f06399f..9d0d91b2c0 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/pom.xml +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/pom.xml @@ -36,7 +36,7 @@ com.amazonaws amazon-kinesis-client - 1.11.2 + 1.14.9 org.springframework.cloud @@ -46,9 +46,9 @@ - 1.11.632 - 2.0.2.RELEASE - 2.2.1.RELEASE + 1.12.380 + 2.2.0 + 4.0.0 \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ConsumerBinder.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ConsumerBinder.java index 38ad634086..1fedec83ad 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ConsumerBinder.java +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ConsumerBinder.java @@ -1,16 +1,16 @@ package com.baeldung.binder; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.messaging.Sink; -import org.springframework.stereotype.Component; +import java.util.function.Consumer; -@Component -@EnableBinding(Sink.class) +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration public class ConsumerBinder { - - @StreamListener(Sink.INPUT) - public void consume(String ip) { - System.out.println(ip); - } + @Bean + Consumer input() { + return str -> { + System.out.println(str); + }; + } } \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/KinesisBinderApplication.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/KinesisBinderApplication.java index e4f6916ed9..1cf31f9928 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/KinesisBinderApplication.java +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/KinesisBinderApplication.java @@ -6,7 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KinesisBinderApplication { - public static void main(String[] args) { - SpringApplication.run(KinesisBinderApplication.class, args); - } + public static void main(String[] args) { + SpringApplication.run(KinesisBinderApplication.class, args); + } } \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ProducerBinder.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ProducerBinder.java index 468f2886de..1486459a1e 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ProducerBinder.java +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/binder/ProducerBinder.java @@ -1,24 +1,20 @@ package com.baeldung.binder; +import java.util.function.Supplier; import java.util.stream.IntStream; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.messaging.Source; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.messaging.support.MessageBuilder; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; -@Component -@EnableBinding(Source.class) -public class ProducerBinder { +@Configuration +class ProducerBinder { - @Autowired - private Source source; - - @Scheduled(fixedDelay = 3000L) - private void produce() { - IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix) - .forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build())); + @Bean + public Supplier output() { + return () -> IntStream.range(1, 200) + .mapToObj(ipSuffix -> "192.168.0." + ipSuffix) + .map(entry -> MessageBuilder.withPayload(entry) + .build()); } } \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKCLApplication.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKCLApplication.java index 01c5af596d..1ab205d3aa 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKCLApplication.java +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKCLApplication.java @@ -1,11 +1,14 @@ package com.baeldung.kclkpl; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.*; + import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.regions.Regions; @@ -17,32 +20,30 @@ public class KinesisKCLApplication implements ApplicationRunner { @Value("${aws.access.key}") private String accessKey; - + @Value("${aws.secret.key}") private String secretKey; - + @Value("${ips.stream}") private String IPS_STREAM; - + public static void main(String[] args) { SpringApplication.run(KinesisKCLApplication.class, args); } - @Override - public void run(ApplicationArguments args) throws Exception { - BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); - KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration( - "KinesisKCLConsumer", - IPS_STREAM, - new AWSStaticCredentialsProvider(awsCredentials), - "KinesisKCLConsumer") - .withRegionName(Regions.EU_CENTRAL_1.getName()); + @Override + public void run(ApplicationArguments args) throws Exception { + BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); - new Worker.Builder() - .recordProcessorFactory(new IpProcessorFactory()) - .config(consumerConfig) - .build() - .run(); - } - + KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration("KinesisKCLConsumer", IPS_STREAM, "", "", DEFAULT_INITIAL_POSITION_IN_STREAM, new AWSStaticCredentialsProvider(awsCredentials), + new AWSStaticCredentialsProvider(awsCredentials), new AWSStaticCredentialsProvider(awsCredentials), DEFAULT_FAILOVER_TIME_MILLIS, "KinesisKCLConsumer", DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, + DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, new ClientConfiguration(), new ClientConfiguration(), + new ClientConfiguration(), DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE, DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, Regions.EU_CENTRAL_1.getName(), DEFAULT_SHUTDOWN_GRACE_MILLIS, + DEFAULT_DDB_BILLING_MODE, null, 0, 0, 0); + + new Worker.Builder().recordProcessorFactory(new IpProcessorFactory()) + .config(consumerConfig) + .build() + .run(); + } } \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKPLApplication.java b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKPLApplication.java index 4ff7cf8087..1b2d9bed3f 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKPLApplication.java +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/java/com/baeldung/kclkpl/KinesisKPLApplication.java @@ -16,23 +16,22 @@ public class KinesisKPLApplication { @Value("${aws.access.key}") private String accessKey; - + @Value("${aws.secret.key}") private String secretKey; - + public static void main(String[] args) { SpringApplication.run(KinesisKPLApplication.class, args); } @Bean public KinesisProducer kinesisProducer() { - BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); - KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration() - .setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials)) - .setVerifyCertificate(false) - .setRegion(Regions.EU_CENTRAL_1.getName()); + BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration().setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials)) + .setVerifyCertificate(false) + .setRegion(Regions.EU_CENTRAL_1.getName()); - return new KinesisProducer(producerConfig); + return new KinesisProducer(producerConfig); } - + } \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties index 777abef1cc..fddea95d45 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/main/resources/application.properties @@ -12,9 +12,11 @@ cloud.aws.credentials.secret-key=my-aws-secret-key cloud.aws.region.static=eu-central-1 cloud.aws.stack.auto=false -spring.cloud.stream.bindings.input.destination=live-ips -spring.cloud.stream.bindings.input.group=live-ips-group -spring.cloud.stream.bindings.input.content-type=text/plain +spring.cloud.stream.bindings.input-in-0.destination=live-ips +spring.cloud.stream.bindings.input-in-0.group=live-ips-group +spring.cloud.stream.bindings.input-in-0.content-type=text/plain +spring.cloud.stream.function.definition = input -spring.cloud.stream.bindings.output.destination=myStream -spring.cloud.stream.bindings.output.content-type=text/plain \ No newline at end of file +spring.cloud.stream.bindings.output-out-0.destination=myStream +spring.cloud.stream.bindings.output-out-0.content-type=text/plain +spring.cloud.stream.poller.fixed-delay = 3000 \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/test/resources/application.properties b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/test/resources/application.properties index 777abef1cc..fddea95d45 100644 --- a/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/test/resources/application.properties +++ b/spring-cloud-modules/spring-cloud-stream/spring-cloud-stream-kinesis/src/test/resources/application.properties @@ -12,9 +12,11 @@ cloud.aws.credentials.secret-key=my-aws-secret-key cloud.aws.region.static=eu-central-1 cloud.aws.stack.auto=false -spring.cloud.stream.bindings.input.destination=live-ips -spring.cloud.stream.bindings.input.group=live-ips-group -spring.cloud.stream.bindings.input.content-type=text/plain +spring.cloud.stream.bindings.input-in-0.destination=live-ips +spring.cloud.stream.bindings.input-in-0.group=live-ips-group +spring.cloud.stream.bindings.input-in-0.content-type=text/plain +spring.cloud.stream.function.definition = input -spring.cloud.stream.bindings.output.destination=myStream -spring.cloud.stream.bindings.output.content-type=text/plain \ No newline at end of file +spring.cloud.stream.bindings.output-out-0.destination=myStream +spring.cloud.stream.bindings.output-out-0.content-type=text/plain +spring.cloud.stream.poller.fixed-delay = 3000 \ No newline at end of file