Java-1470 Split libraries module
This commit is contained in:
17
libraries-6/README.md
Normal file
17
libraries-6/README.md
Normal file
@@ -0,0 +1,17 @@
|
||||
## Libraries-6
|
||||
|
||||
This module contains articles about various Java libraries.
|
||||
These are small libraries that are relatively easy to use and do not require any separate module of their own.
|
||||
|
||||
The code examples related to different libraries are each in their own module.
|
||||
|
||||
Remember, for advanced libraries like [Jackson](/jackson) and [JUnit](/testing-modules) we already have separate modules. Please make sure to have a look at the existing modules in such cases.
|
||||
|
||||
### Relevant articles
|
||||
- [Introduction to JavaPoet](https://www.baeldung.com/java-poet)
|
||||
- [Guide to Resilience4j](https://www.baeldung.com/resilience4j)
|
||||
- [Implementing a FTP-Client in Java](https://www.baeldung.com/java-ftp-client)
|
||||
- [Introduction to Functional Java](https://www.baeldung.com/java-functional-library)
|
||||
- [A Guide to the Reflections Library](https://www.baeldung.com/reflections-library)
|
||||
- [Exactly Once Processing in Kafka](https://www.baeldung.com/kafka-exactly-once)
|
||||
- More articles [[<-- prev]](/libraries-5)
|
||||
@@ -11,5 +11,101 @@
|
||||
|
||||
<artifactId>libraries-6</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.functionaljava</groupId>
|
||||
<artifactId>functionaljava-java8</artifactId>
|
||||
<version>${functionaljava.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.codepoetics</groupId>
|
||||
<artifactId>protonpack</artifactId>
|
||||
<version>${protonpack.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-streams</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
<classifier>test</classifier>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.resilience4j</groupId>
|
||||
<artifactId>resilience4j-circuitbreaker</artifactId>
|
||||
<version>${resilience4j.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.resilience4j</groupId>
|
||||
<artifactId>resilience4j-bulkhead</artifactId>
|
||||
<version>${resilience4j.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.resilience4j</groupId>
|
||||
<artifactId>resilience4j-retry</artifactId>
|
||||
<version>${resilience4j.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.resilience4j</groupId>
|
||||
<artifactId>resilience4j-timelimiter</artifactId>
|
||||
<version>${resilience4j.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.squareup</groupId>
|
||||
<artifactId>javapoet</artifactId>
|
||||
<version>${javapoet.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockftpserver</groupId>
|
||||
<artifactId>MockFtpServer</artifactId>
|
||||
<version>${mockftpserver.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!-- Reflections -->
|
||||
<dependency>
|
||||
<groupId>org.reflections</groupId>
|
||||
<artifactId>reflections</artifactId>
|
||||
<version>${reflections.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>${commons-lang3.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-net</groupId>
|
||||
<artifactId>commons-net</artifactId>
|
||||
<version>${commons-net.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>${assertj.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
<version>${commonsio.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
<kafka.version>2.0.0</kafka.version>
|
||||
<javapoet.version>1.10.0</javapoet.version>
|
||||
<reflections.version>0.9.11</reflections.version>
|
||||
<mockftpserver.version>2.7.1</mockftpserver.version>
|
||||
<functionaljava.version>4.8.1</functionaljava.version>
|
||||
<resilience4j.version>0.12.1</resilience4j.version>
|
||||
<protonpack.version>1.15</protonpack.version>
|
||||
<commons-net.version>3.6</commons-net.version>
|
||||
<assertj.version>3.6.2</assertj.version>
|
||||
<commonsio.version>2.6</commonsio.version>
|
||||
</properties>
|
||||
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,43 @@
|
||||
package com.baeldung.fj;
|
||||
|
||||
import fj.F;
|
||||
import fj.F1Functions;
|
||||
import fj.Unit;
|
||||
import fj.data.IO;
|
||||
import fj.data.IOFunctions;
|
||||
|
||||
public class FunctionalJavaIOMain {
|
||||
|
||||
public static IO<Unit> printLetters(final String s) {
|
||||
return () -> {
|
||||
for (int i = 0; i < s.length(); i++) {
|
||||
System.out.println(s.charAt(i));
|
||||
}
|
||||
return Unit.unit();
|
||||
};
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
F<String, IO<Unit>> printLetters = i -> printLetters(i);
|
||||
|
||||
IO<Unit> lowerCase = IOFunctions.stdoutPrintln("What's your first Name ?");
|
||||
|
||||
IO<Unit> input = IOFunctions.stdoutPrint("First Name: ");
|
||||
|
||||
IO<Unit> userInput = IOFunctions.append(lowerCase, input);
|
||||
|
||||
IO<String> readInput = IOFunctions.stdinReadLine();
|
||||
|
||||
F<String, String> toUpperCase = i -> i.toUpperCase();
|
||||
|
||||
F<String, IO<Unit>> transformInput = F1Functions.<String, IO<Unit>, String> o(printLetters).f(toUpperCase);
|
||||
|
||||
IO<Unit> readAndPrintResult = IOFunctions.bind(readInput, transformInput);
|
||||
|
||||
IO<Unit> program = IOFunctions.bind(userInput, nothing -> readAndPrintResult);
|
||||
|
||||
IOFunctions.toSafe(program).run();
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package com.baeldung.fj;
|
||||
|
||||
import fj.F;
|
||||
import fj.Show;
|
||||
import fj.data.Array;
|
||||
import fj.data.List;
|
||||
import fj.data.Option;
|
||||
import fj.function.Characters;
|
||||
import fj.function.Integers;
|
||||
|
||||
public class FunctionalJavaMain {
|
||||
|
||||
public static final F<Integer, Boolean> isEven = i -> i % 2 == 0;
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
List<Integer> fList = List.list(3, 4, 5, 6);
|
||||
List<Boolean> evenList = fList.map(isEven);
|
||||
Show.listShow(Show.booleanShow).println(evenList);
|
||||
|
||||
fList = fList.map(i -> i + 1);
|
||||
Show.listShow(Show.intShow).println(fList);
|
||||
|
||||
Array<Integer> a = Array.array(17, 44, 67, 2, 22, 80, 1, 27);
|
||||
Array<Integer> b = a.filter(Integers.even);
|
||||
Show.arrayShow(Show.intShow).println(b);
|
||||
|
||||
Array<String> array = Array.array("Welcome", "To", "baeldung");
|
||||
Boolean isExist = array.exists(s -> List.fromString(s).forall(Characters.isLowerCase));
|
||||
System.out.println(isExist);
|
||||
|
||||
Array<Integer> intArray = Array.array(17, 44, 67, 2, 22, 80, 1, 27);
|
||||
int sum = intArray.foldLeft(Integers.add, 0);
|
||||
System.out.println(sum);
|
||||
|
||||
Option<Integer> n1 = Option.some(1);
|
||||
Option<Integer> n2 = Option.some(2);
|
||||
|
||||
F<Integer, Option<Integer>> f1 = i -> i % 2 == 0 ? Option.some(i + 100) : Option.none();
|
||||
|
||||
Option<Integer> result1 = n1.bind(f1);
|
||||
Option<Integer> result2 = n2.bind(f1);
|
||||
|
||||
Show.optionShow(Show.intShow).println(result1);
|
||||
Show.optionShow(Show.intShow).println(result2);
|
||||
}
|
||||
|
||||
}
|
||||
64
libraries-6/src/main/java/com/baeldung/ftp/FtpClient.java
Normal file
64
libraries-6/src/main/java/com/baeldung/ftp/FtpClient.java
Normal file
@@ -0,0 +1,64 @@
|
||||
package com.baeldung.ftp;
|
||||
|
||||
import org.apache.commons.net.PrintCommandListener;
|
||||
import org.apache.commons.net.ftp.FTPClient;
|
||||
import org.apache.commons.net.ftp.FTPFile;
|
||||
import org.apache.commons.net.ftp.FTPReply;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
class FtpClient {
|
||||
|
||||
private final String server;
|
||||
private final int port;
|
||||
private final String user;
|
||||
private final String password;
|
||||
private FTPClient ftp;
|
||||
|
||||
FtpClient(String server, int port, String user, String password) {
|
||||
this.server = server;
|
||||
this.port = port;
|
||||
this.user = user;
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
void open() throws IOException {
|
||||
ftp = new FTPClient();
|
||||
|
||||
ftp.addProtocolCommandListener(new PrintCommandListener(new PrintWriter(System.out)));
|
||||
|
||||
ftp.connect(server, port);
|
||||
int reply = ftp.getReplyCode();
|
||||
if (!FTPReply.isPositiveCompletion(reply)) {
|
||||
ftp.disconnect();
|
||||
throw new IOException("Exception in connecting to FTP Server");
|
||||
}
|
||||
|
||||
ftp.login(user, password);
|
||||
}
|
||||
|
||||
void close() throws IOException {
|
||||
ftp.disconnect();
|
||||
}
|
||||
|
||||
Collection<String> listFiles(String path) throws IOException {
|
||||
FTPFile[] files = ftp.listFiles(path);
|
||||
|
||||
return Arrays.stream(files)
|
||||
.map(FTPFile::getName)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
void putFileToPath(File file, String path) throws IOException {
|
||||
ftp.storeFile(path, new FileInputStream(file));
|
||||
}
|
||||
|
||||
void downloadFile(String source, String destination) throws IOException {
|
||||
FileOutputStream out = new FileOutputStream(destination);
|
||||
ftp.retrieveFile(source, out);
|
||||
out.close();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,183 @@
|
||||
package com.baeldung.javapoet;
|
||||
|
||||
import com.squareup.javapoet.ClassName;
|
||||
import com.squareup.javapoet.CodeBlock;
|
||||
import com.squareup.javapoet.FieldSpec;
|
||||
import com.squareup.javapoet.JavaFile;
|
||||
import com.squareup.javapoet.MethodSpec;
|
||||
import com.squareup.javapoet.ParameterSpec;
|
||||
import com.squareup.javapoet.ParameterizedTypeName;
|
||||
import com.squareup.javapoet.TypeName;
|
||||
import com.squareup.javapoet.TypeSpec;
|
||||
|
||||
import javax.lang.model.element.Modifier;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
public class PersonGenerator {
|
||||
|
||||
private static final String FOUR_WHITESPACES = " ";
|
||||
private static final String PERSON_PACKAGE_NAME = "com.baeldung.javapoet.test.person";
|
||||
|
||||
private File outputFile;
|
||||
|
||||
public PersonGenerator() {
|
||||
outputFile = new File(getOutputPath().toUri());
|
||||
}
|
||||
|
||||
public static String getPersonPackageName() {
|
||||
return PERSON_PACKAGE_NAME;
|
||||
}
|
||||
|
||||
public Path getOutputPath() {
|
||||
return Paths.get(new File(".").getAbsolutePath() + "/gensrc");
|
||||
}
|
||||
|
||||
public FieldSpec getDefaultNameField() {
|
||||
return FieldSpec
|
||||
.builder(String.class, "DEFAULT_NAME")
|
||||
.addModifiers(Modifier.PUBLIC, Modifier.STATIC, Modifier.FINAL)
|
||||
.initializer("$S", "Alice")
|
||||
.build();
|
||||
}
|
||||
|
||||
public MethodSpec getSortByLengthMethod() {
|
||||
return MethodSpec
|
||||
.methodBuilder("sortByLength")
|
||||
.addModifiers(Modifier.PUBLIC, Modifier.STATIC)
|
||||
.addParameter(ParameterSpec
|
||||
.builder(ParameterizedTypeName.get(ClassName.get(List.class), TypeName.get(String.class)), "strings")
|
||||
.build())
|
||||
.addStatement("$T.sort($N, $L)", Collections.class, "strings", getComparatorAnonymousClass())
|
||||
.build();
|
||||
}
|
||||
|
||||
public MethodSpec getPrintNameMultipleTimesMethod() {
|
||||
return MethodSpec
|
||||
.methodBuilder("printNameMultipleTimes")
|
||||
.addModifiers(Modifier.PUBLIC)
|
||||
.addCode(getPrintNameMultipleTimesLambdaImpl())
|
||||
.build();
|
||||
}
|
||||
|
||||
public CodeBlock getPrintNameMultipleTimesImpl() {
|
||||
return CodeBlock
|
||||
.builder()
|
||||
.beginControlFlow("for (int i = $L; i < $L; i++)")
|
||||
.addStatement("System.out.println(name)")
|
||||
.endControlFlow()
|
||||
.build();
|
||||
}
|
||||
|
||||
public CodeBlock getPrintNameMultipleTimesLambdaImpl() {
|
||||
return CodeBlock
|
||||
.builder()
|
||||
.addStatement("$T<$T> names = new $T<>()", List.class, String.class, ArrayList.class)
|
||||
.addStatement("$T.range($L, $L).forEach(i -> names.add(name))", IntStream.class, 0, 10)
|
||||
.addStatement("names.forEach(System.out::println)")
|
||||
.build();
|
||||
}
|
||||
|
||||
public TypeSpec getGenderEnum() {
|
||||
return TypeSpec
|
||||
.enumBuilder("Gender")
|
||||
.addModifiers(Modifier.PUBLIC)
|
||||
.addEnumConstant("MALE")
|
||||
.addEnumConstant("FEMALE")
|
||||
.addEnumConstant("UNSPECIFIED")
|
||||
.build();
|
||||
}
|
||||
|
||||
public TypeSpec getPersonInterface() {
|
||||
return TypeSpec
|
||||
.interfaceBuilder("Person")
|
||||
.addModifiers(Modifier.PUBLIC)
|
||||
.addField(getDefaultNameField())
|
||||
.addMethod(MethodSpec
|
||||
.methodBuilder("getName")
|
||||
.addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
|
||||
.returns(String.class)
|
||||
.build())
|
||||
.addMethod(MethodSpec
|
||||
.methodBuilder("getDefaultName")
|
||||
.addModifiers(Modifier.PUBLIC, Modifier.DEFAULT)
|
||||
.returns(String.class)
|
||||
.addCode(CodeBlock
|
||||
.builder()
|
||||
.addStatement("return DEFAULT_NAME")
|
||||
.build())
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
|
||||
public TypeSpec getStudentClass() {
|
||||
return TypeSpec
|
||||
.classBuilder("Student")
|
||||
.addSuperinterface(ClassName.get(PERSON_PACKAGE_NAME, "Person"))
|
||||
.addModifiers(Modifier.PUBLIC)
|
||||
.addField(FieldSpec
|
||||
.builder(String.class, "name")
|
||||
.addModifiers(Modifier.PRIVATE)
|
||||
.build())
|
||||
.addMethod(MethodSpec
|
||||
.methodBuilder("getName")
|
||||
.addAnnotation(Override.class)
|
||||
.addModifiers(Modifier.PUBLIC)
|
||||
.returns(String.class)
|
||||
.addStatement("return this.name")
|
||||
.build())
|
||||
.addMethod(MethodSpec
|
||||
.methodBuilder("setName")
|
||||
.addParameter(String.class, "name")
|
||||
.addModifiers(Modifier.PUBLIC)
|
||||
.addStatement("this.name = name")
|
||||
.build())
|
||||
.addMethod(getPrintNameMultipleTimesMethod())
|
||||
.addMethod(getSortByLengthMethod())
|
||||
.build();
|
||||
}
|
||||
|
||||
public TypeSpec getComparatorAnonymousClass() {
|
||||
return TypeSpec
|
||||
.anonymousClassBuilder("")
|
||||
.addSuperinterface(ParameterizedTypeName.get(Comparator.class, String.class))
|
||||
.addMethod(MethodSpec
|
||||
.methodBuilder("compare")
|
||||
.addModifiers(Modifier.PUBLIC)
|
||||
.addAnnotation(Override.class)
|
||||
.addParameter(String.class, "a")
|
||||
.addParameter(String.class, "b")
|
||||
.returns(int.class)
|
||||
.addStatement("return a.length() - b.length()")
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
|
||||
public void generateGenderEnum() throws IOException {
|
||||
writeToOutputFile(getPersonPackageName(), getGenderEnum());
|
||||
}
|
||||
|
||||
public void generatePersonInterface() throws IOException {
|
||||
writeToOutputFile(getPersonPackageName(), getPersonInterface());
|
||||
}
|
||||
|
||||
public void generateStudentClass() throws IOException {
|
||||
writeToOutputFile(getPersonPackageName(), getStudentClass());
|
||||
}
|
||||
|
||||
private void writeToOutputFile(String packageName, TypeSpec typeSpec) throws IOException {
|
||||
JavaFile javaFile = JavaFile
|
||||
.builder(packageName, typeSpec)
|
||||
.indent(FOUR_WHITESPACES)
|
||||
.build();
|
||||
javaFile.writeTo(outputFile);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
package com.baeldung.kafka;
|
||||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
|
||||
import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
|
||||
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
|
||||
import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
|
||||
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
|
||||
|
||||
public class TransactionalMessageProducer {
|
||||
|
||||
private static final String DATA_MESSAGE_1 = "Put any space separated data here for count";
|
||||
private static final String DATA_MESSAGE_2 = "Output will contain count of every word in the message";
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
KafkaProducer<String, String> producer = createKafkaProducer();
|
||||
|
||||
producer.initTransactions();
|
||||
|
||||
try{
|
||||
|
||||
producer.beginTransaction();
|
||||
|
||||
Stream.of(DATA_MESSAGE_1, DATA_MESSAGE_2).forEach(s -> producer.send(
|
||||
new ProducerRecord<String, String>("input", null, s)));
|
||||
|
||||
producer.commitTransaction();
|
||||
|
||||
}catch (KafkaException e){
|
||||
|
||||
producer.abortTransaction();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static KafkaProducer<String, String> createKafkaProducer() {
|
||||
|
||||
Properties props = new Properties();
|
||||
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
|
||||
props.put(TRANSACTIONAL_ID_CONFIG, "prod-0");
|
||||
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||
|
||||
return new KafkaProducer(props);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,105 @@
|
||||
package com.baeldung.kafka;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static java.time.Duration.ofSeconds;
|
||||
import static java.util.Collections.singleton;
|
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
|
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
|
||||
import static org.apache.kafka.clients.producer.ProducerConfig.*;
|
||||
|
||||
public class TransactionalWordCount {
|
||||
|
||||
private static final String CONSUMER_GROUP_ID = "my-group-id";
|
||||
private static final String OUTPUT_TOPIC = "output";
|
||||
private static final String INPUT_TOPIC = "input";
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
KafkaConsumer<String, String> consumer = createKafkaConsumer();
|
||||
KafkaProducer<String, String> producer = createKafkaProducer();
|
||||
|
||||
producer.initTransactions();
|
||||
|
||||
try {
|
||||
|
||||
while (true) {
|
||||
|
||||
ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
|
||||
|
||||
Map<String, Integer> wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0))
|
||||
.stream()
|
||||
.flatMap(record -> Stream.of(record.value().split(" ")))
|
||||
.map(word -> Tuple.of(word, 1))
|
||||
.collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));
|
||||
|
||||
producer.beginTransaction();
|
||||
|
||||
wordCountMap.forEach((key, value) -> producer.send(new ProducerRecord<String, String>(OUTPUT_TOPIC, key, value.toString())));
|
||||
|
||||
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
|
||||
|
||||
for (TopicPartition partition : records.partitions()) {
|
||||
List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
|
||||
long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
|
||||
|
||||
offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
|
||||
}
|
||||
|
||||
producer.sendOffsetsToTransaction(offsetsToCommit, CONSUMER_GROUP_ID);
|
||||
producer.commitTransaction();
|
||||
|
||||
}
|
||||
|
||||
} catch (KafkaException e) {
|
||||
|
||||
producer.abortTransaction();
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
private static KafkaConsumer<String, String> createKafkaConsumer() {
|
||||
Properties props = new Properties();
|
||||
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
|
||||
props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
|
||||
props.put(ISOLATION_LEVEL_CONFIG, "read_committed");
|
||||
props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
|
||||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
|
||||
consumer.subscribe(singleton(INPUT_TOPIC));
|
||||
return consumer;
|
||||
}
|
||||
|
||||
private static KafkaProducer<String, String> createKafkaProducer() {
|
||||
|
||||
Properties props = new Properties();
|
||||
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
|
||||
props.put(TRANSACTIONAL_ID_CONFIG, "prod-1");
|
||||
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||
|
||||
return new KafkaProducer(props);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
24
libraries-6/src/main/java/com/baeldung/kafka/Tuple.java
Normal file
24
libraries-6/src/main/java/com/baeldung/kafka/Tuple.java
Normal file
@@ -0,0 +1,24 @@
|
||||
package com.baeldung.kafka;
|
||||
|
||||
public class Tuple {
|
||||
|
||||
private String key;
|
||||
private Integer value;
|
||||
|
||||
private Tuple(String key, Integer value) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public static Tuple of(String key, Integer value){
|
||||
return new Tuple(key,value);
|
||||
}
|
||||
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public Integer getValue() {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
package com.baeldung.reflections;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Date;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.reflections.Reflections;
|
||||
import org.reflections.scanners.MethodAnnotationsScanner;
|
||||
import org.reflections.scanners.MethodParameterScanner;
|
||||
import org.reflections.scanners.ResourcesScanner;
|
||||
import org.reflections.scanners.Scanner;
|
||||
import org.reflections.scanners.SubTypesScanner;
|
||||
import org.reflections.util.ClasspathHelper;
|
||||
import org.reflections.util.ConfigurationBuilder;
|
||||
|
||||
public class ReflectionsApp {
|
||||
|
||||
public Set<Class<? extends Scanner>> getReflectionsSubTypes() {
|
||||
Reflections reflections = new Reflections("org.reflections");
|
||||
Set<Class<? extends Scanner>> scannersSet = reflections.getSubTypesOf(Scanner.class);
|
||||
return scannersSet;
|
||||
}
|
||||
|
||||
public Set<Class<?>> getJDKFunctinalInterfaces() {
|
||||
Reflections reflections = new Reflections("java.util.function");
|
||||
Set<Class<?>> typesSet = reflections.getTypesAnnotatedWith(FunctionalInterface.class);
|
||||
return typesSet;
|
||||
}
|
||||
|
||||
public Set<Method> getDateDeprecatedMethods() {
|
||||
Reflections reflections = new Reflections(java.util.Date.class, new MethodAnnotationsScanner());
|
||||
Set<Method> deprecatedMethodsSet = reflections.getMethodsAnnotatedWith(Deprecated.class);
|
||||
return deprecatedMethodsSet;
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
public Set<Constructor> getDateDeprecatedConstructors() {
|
||||
Reflections reflections = new Reflections(java.util.Date.class, new MethodAnnotationsScanner());
|
||||
Set<Constructor> constructorsSet = reflections.getConstructorsAnnotatedWith(Deprecated.class);
|
||||
return constructorsSet;
|
||||
}
|
||||
|
||||
public Set<Method> getMethodsWithDateParam() {
|
||||
Reflections reflections = new Reflections(java.text.SimpleDateFormat.class, new MethodParameterScanner());
|
||||
Set<Method> methodsSet = reflections.getMethodsMatchParams(Date.class);
|
||||
return methodsSet;
|
||||
}
|
||||
|
||||
public Set<Method> getMethodsWithVoidReturn() {
|
||||
Reflections reflections = new Reflections(java.text.SimpleDateFormat.class, new MethodParameterScanner());
|
||||
Set<Method> methodsSet = reflections.getMethodsReturn(void.class);
|
||||
return methodsSet;
|
||||
}
|
||||
|
||||
public Set<String> getPomXmlPaths() {
|
||||
Reflections reflections = new Reflections(new ResourcesScanner());
|
||||
Set<String> resourcesSet = reflections.getResources(Pattern.compile(".*pom\\.xml"));
|
||||
return resourcesSet;
|
||||
}
|
||||
|
||||
public Set<Class<? extends Scanner>> getReflectionsSubTypesUsingBuilder() {
|
||||
Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forPackage("org.reflections"))
|
||||
.setScanners(new SubTypesScanner()));
|
||||
|
||||
Set<Class<? extends Scanner>> scannersSet = reflections.getSubTypesOf(Scanner.class);
|
||||
return scannersSet;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,117 @@
|
||||
package com.baeldung.fj;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import fj.F;
|
||||
import fj.Show;
|
||||
import fj.data.Array;
|
||||
import fj.data.List;
|
||||
import fj.data.Option;
|
||||
import fj.function.Characters;
|
||||
import fj.function.Integers;
|
||||
|
||||
public class FunctionalJavaUnitTest {
|
||||
|
||||
public static final F<Integer, Boolean> isEven = i -> i % 2 == 0;
|
||||
|
||||
public static final Integer timesTwoRegular(Integer i) {
|
||||
return i * 2;
|
||||
}
|
||||
|
||||
public static final F<Integer, Integer> timesTwo = i -> i * 2;
|
||||
|
||||
public static final F<Integer, Integer> plusOne = i -> i + 1;
|
||||
|
||||
@Test
|
||||
public void multiplyNumbers_givenIntList_returnTrue() {
|
||||
List<Integer> fList = List.list(1, 2, 3, 4);
|
||||
List<Integer> fList1 = fList.map(timesTwo);
|
||||
List<Integer> fList2 = fList.map(i -> i * 2);
|
||||
|
||||
assertTrue(fList1.equals(fList2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void applyMultipleFunctions_givenIntList_returnFalse() {
|
||||
List<Integer> fList = List.list(1, 2, 3, 4);
|
||||
List<Integer> fList1 = fList.map(timesTwo).map(plusOne);
|
||||
Show.listShow(Show.intShow).println(fList1);
|
||||
List<Integer> fList2 = fList.map(plusOne).map(timesTwo);
|
||||
Show.listShow(Show.intShow).println(fList2);
|
||||
|
||||
assertFalse(fList1.equals(fList2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void calculateEvenNumbers_givenIntList_returnTrue() {
|
||||
List<Integer> fList = List.list(3, 4, 5, 6);
|
||||
List<Boolean> evenList = fList.map(isEven);
|
||||
List<Boolean> evenListTrueResult = List.list(false, true, false, true);
|
||||
|
||||
assertTrue(evenList.equals(evenListTrueResult));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mapList_givenIntList_returnResult() {
|
||||
List<Integer> fList = List.list(3, 4, 5, 6);
|
||||
fList = fList.map(i -> i + 100);
|
||||
List<Integer> resultList = List.list(103, 104, 105, 106);
|
||||
|
||||
assertTrue(fList.equals(resultList));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void filterList_givenIntList_returnResult() {
|
||||
Array<Integer> array = Array.array(3, 4, 5, 6);
|
||||
Array<Integer> filteredArray = array.filter(isEven);
|
||||
Array<Integer> result = Array.array(4, 6);
|
||||
|
||||
assertTrue(filteredArray.equals(result));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkForLowerCase_givenStringArray_returnResult() {
|
||||
Array<String> array = Array.array("Welcome", "To", "baeldung");
|
||||
assertTrue(array.exists(s -> List.fromString(s).forall(Characters.isLowerCase)));
|
||||
|
||||
Array<String> array2 = Array.array("Welcome", "To", "Baeldung");
|
||||
assertFalse(array2.exists(s -> List.fromString(s).forall(Characters.isLowerCase)));
|
||||
|
||||
assertFalse(array.forall(s -> List.fromString(s).forall(Characters.isLowerCase)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkOptions_givenOptions_returnResult() {
|
||||
Option<Integer> n1 = Option.some(1);
|
||||
Option<Integer> n2 = Option.some(2);
|
||||
Option<Integer> n3 = Option.none();
|
||||
|
||||
F<Integer, Option<Integer>> function = i -> i % 2 == 0 ? Option.some(i + 100) : Option.none();
|
||||
|
||||
Option<Integer> result1 = n1.bind(function);
|
||||
Option<Integer> result2 = n2.bind(function);
|
||||
Option<Integer> result3 = n3.bind(function);
|
||||
|
||||
Assert.assertEquals(Option.none(), result1);
|
||||
Assert.assertEquals(Option.some(102), result2);
|
||||
Assert.assertEquals(Option.none(), result3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void foldLeft_givenArray_returnResult() {
|
||||
Array<Integer> intArray = Array.array(17, 44, 67, 2, 22, 80, 1, 27);
|
||||
int sumAll = intArray.foldLeft(Integers.add, 0);
|
||||
|
||||
assertEquals(260, sumAll);
|
||||
|
||||
int sumEven = intArray.filter(isEven).foldLeft(Integers.add, 0);
|
||||
|
||||
assertEquals(148, sumEven);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package com.baeldung.ftp;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockftpserver.fake.FakeFtpServer;
|
||||
import org.mockftpserver.fake.UserAccount;
|
||||
import org.mockftpserver.fake.filesystem.DirectoryEntry;
|
||||
import org.mockftpserver.fake.filesystem.FileEntry;
|
||||
import org.mockftpserver.fake.filesystem.FileSystem;
|
||||
import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class FtpClientIntegrationTest {
|
||||
|
||||
private FakeFtpServer fakeFtpServer;
|
||||
|
||||
private FtpClient ftpClient;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
fakeFtpServer = new FakeFtpServer();
|
||||
fakeFtpServer.addUserAccount(new UserAccount("user", "password", "/data"));
|
||||
|
||||
FileSystem fileSystem = new UnixFakeFileSystem();
|
||||
fileSystem.add(new DirectoryEntry("/data"));
|
||||
fileSystem.add(new FileEntry("/data/foobar.txt", "abcdef 1234567890"));
|
||||
fakeFtpServer.setFileSystem(fileSystem);
|
||||
fakeFtpServer.setServerControlPort(0);
|
||||
|
||||
fakeFtpServer.start();
|
||||
|
||||
ftpClient = new FtpClient("localhost", fakeFtpServer.getServerControlPort(), "user", "password");
|
||||
ftpClient.open();
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() throws IOException {
|
||||
ftpClient.close();
|
||||
fakeFtpServer.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRemoteFile_whenListingRemoteFiles_thenItIsContainedInList() throws IOException {
|
||||
Collection<String> files = ftpClient.listFiles("");
|
||||
|
||||
assertThat(files).contains("foobar.txt");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRemoteFile_whenDownloading_thenItIsOnTheLocalFilesystem() throws IOException {
|
||||
ftpClient.downloadFile("/foobar.txt", "downloaded_buz.txt");
|
||||
|
||||
assertThat(new File("downloaded_buz.txt")).exists();
|
||||
new File("downloaded_buz.txt").delete(); // cleanup
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenLocalFile_whenUploadingIt_thenItExistsOnRemoteLocation() throws URISyntaxException, IOException {
|
||||
File file = new File(getClass().getClassLoader().getResource("ftp/baz.txt").toURI());
|
||||
|
||||
ftpClient.putFileToPath(file, "/buz.txt");
|
||||
|
||||
assertThat(fakeFtpServer.getFileSystem().exists("/buz.txt")).isTrue();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
package com.baeldung.ftp;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockftpserver.fake.FakeFtpServer;
|
||||
import org.mockftpserver.fake.UserAccount;
|
||||
import org.mockftpserver.fake.filesystem.DirectoryEntry;
|
||||
import org.mockftpserver.fake.filesystem.FileEntry;
|
||||
import org.mockftpserver.fake.filesystem.FileSystem;
|
||||
import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class JdkFtpClientIntegrationTest {
|
||||
|
||||
private FakeFtpServer fakeFtpServer;
|
||||
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
fakeFtpServer = new FakeFtpServer();
|
||||
fakeFtpServer.addUserAccount(new UserAccount("user", "password", "/data"));
|
||||
|
||||
FileSystem fileSystem = new UnixFakeFileSystem();
|
||||
fileSystem.add(new DirectoryEntry("/data"));
|
||||
fileSystem.add(new FileEntry("/data/foobar.txt", "abcdef 1234567890"));
|
||||
fakeFtpServer.setFileSystem(fileSystem);
|
||||
fakeFtpServer.setServerControlPort(0);
|
||||
|
||||
fakeFtpServer.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() throws IOException {
|
||||
fakeFtpServer.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenRemoteFile_whenDownloading_thenItIsOnTheLocalFilesystem() throws IOException {
|
||||
String ftpUrl = String.format("ftp://user:password@localhost:%d/foobar.txt", fakeFtpServer.getServerControlPort());
|
||||
|
||||
URLConnection urlConnection = new URL(ftpUrl).openConnection();
|
||||
InputStream inputStream = urlConnection.getInputStream();
|
||||
Files.copy(inputStream, new File("downloaded_buz.txt").toPath());
|
||||
inputStream.close();
|
||||
|
||||
assertThat(new File("downloaded_buz.txt")).exists();
|
||||
|
||||
new File("downloaded_buz.txt").delete(); // cleanup
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
package com.baeldung.javapoet.test;
|
||||
|
||||
import com.baeldung.javapoet.PersonGenerator;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
@RunWith(JUnit4.class)
|
||||
public class PersonGeneratorUnitTest {
|
||||
|
||||
private PersonGenerator generator;
|
||||
private Path generatedFolderPath;
|
||||
private Path expectedFolderPath;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
String packagePath = this
|
||||
.getClass()
|
||||
.getPackage()
|
||||
.getName()
|
||||
.replace(".", "/") + "/person";
|
||||
generator = new PersonGenerator();
|
||||
generatedFolderPath = generator
|
||||
.getOutputPath()
|
||||
.resolve(packagePath);
|
||||
expectedFolderPath = Paths.get(new File(".").getAbsolutePath() + "/src/test/java/" + packagePath);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
FileUtils.deleteDirectory(new File(generator
|
||||
.getOutputPath()
|
||||
.toUri()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenGenerateGenderEnum_thenGenerateGenderEnumAndWriteToFile() throws IOException {
|
||||
generator.generateGenderEnum();
|
||||
String fileName = "Gender.java";
|
||||
assertThatFileIsGeneratedAsExpected(fileName);
|
||||
deleteGeneratedFile(fileName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenGeneratePersonInterface_thenGeneratePersonInterfaceAndWriteToFile() throws IOException {
|
||||
generator.generatePersonInterface();
|
||||
String fileName = "Person.java";
|
||||
assertThatFileIsGeneratedAsExpected(fileName);
|
||||
deleteGeneratedFile(fileName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenGenerateStudentClass_thenGenerateStudentClassAndWriteToFile() throws IOException {
|
||||
generator.generateStudentClass();
|
||||
String fileName = "Student.java";
|
||||
assertThatFileIsGeneratedAsExpected(fileName);
|
||||
deleteGeneratedFile(fileName);
|
||||
}
|
||||
|
||||
private void assertThatFileIsGeneratedAsExpected(String fileName) throws IOException {
|
||||
String generatedFileContent = extractFileContent(generatedFolderPath.resolve(fileName));
|
||||
String expectedFileContent = extractFileContent(expectedFolderPath.resolve(fileName));
|
||||
|
||||
assertThat("Generated file is identical to the file with the expected content", generatedFileContent, is(equalTo(expectedFileContent)));
|
||||
|
||||
}
|
||||
|
||||
private void deleteGeneratedFile(String fileName) throws IOException {
|
||||
Path generatedFilePath = generatedFolderPath.resolve(fileName);
|
||||
Files.delete(generatedFilePath);
|
||||
}
|
||||
|
||||
private String extractFileContent(Path filePath) throws IOException {
|
||||
byte[] fileContentAsBytes = Files.readAllBytes(filePath);
|
||||
String fileContentAsString = new String(fileContentAsBytes, StandardCharsets.UTF_8);
|
||||
|
||||
if (!fileContentAsString.contains("\r\n")) {
|
||||
// file is not in DOS format
|
||||
// convert it first, so that the content comparison will be relevant
|
||||
return fileContentAsString.replaceAll("\n", "\r\n");
|
||||
}
|
||||
return fileContentAsString;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
package com.baeldung.javapoet.test.person;
|
||||
|
||||
public enum Gender {
|
||||
MALE,
|
||||
|
||||
FEMALE,
|
||||
|
||||
UNSPECIFIED
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.baeldung.javapoet.test.person;
|
||||
|
||||
import java.lang.String;
|
||||
|
||||
public interface Person {
|
||||
String DEFAULT_NAME = "Alice";
|
||||
|
||||
String getName();
|
||||
|
||||
default String getDefaultName() {
|
||||
return DEFAULT_NAME;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.baeldung.javapoet.test.person;
|
||||
|
||||
import java.lang.Override;
|
||||
import java.lang.String;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
public class Student implements Person {
|
||||
private String name;
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public void printNameMultipleTimes() {
|
||||
List<String> names = new ArrayList<>();
|
||||
IntStream.range(0, 10).forEach(i -> names.add(name));
|
||||
names.forEach(System.out::println);
|
||||
}
|
||||
|
||||
public static void sortByLength(List<String> strings) {
|
||||
Collections.sort(strings, new Comparator<String>() {
|
||||
@Override
|
||||
public int compare(String a, String b) {
|
||||
return a.length() - b.length();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
package com.baeldung.kafkastreams;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
import org.apache.kafka.streams.kstream.Produced;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class KafkaStreamsLiveTest {
|
||||
private String bootstrapServers = "localhost:9092";
|
||||
|
||||
@Test
|
||||
@Ignore("it needs to have kafka broker running on local")
|
||||
public void shouldTestKafkaStreams() throws InterruptedException {
|
||||
// given
|
||||
String inputTopic = "inputTopic";
|
||||
|
||||
Properties streamsConfiguration = new Properties();
|
||||
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
|
||||
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
|
||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
// Use a temporary directory for storing state, which will be automatically removed after the test.
|
||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
|
||||
|
||||
// when
|
||||
StreamsBuilder builder = new StreamsBuilder();
|
||||
KStream<String, String> textLines = builder.stream(inputTopic);
|
||||
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
|
||||
|
||||
KTable<String, Long> wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))).groupBy((key, word) -> word).count();
|
||||
|
||||
textLines.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
|
||||
|
||||
String outputTopic = "outputTopic";
|
||||
final Serde<String> stringSerde = Serdes.String();
|
||||
final Serde<String> longSerde = Serdes.String();
|
||||
textLines.to(outputTopic, Produced.with(stringSerde,longSerde));
|
||||
|
||||
KafkaStreams streams = new KafkaStreams(new Topology(), streamsConfiguration);
|
||||
streams.start();
|
||||
|
||||
// then
|
||||
Thread.sleep(30000);
|
||||
streams.close();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
package com.baeldung.reflections;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class ReflectionsUnitTest {
|
||||
|
||||
@Test
|
||||
public void givenTypeThenGetAllSubTypes() {
|
||||
ReflectionsApp reflectionsApp = new ReflectionsApp();
|
||||
assertFalse(reflectionsApp.getReflectionsSubTypes()
|
||||
.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenTypeAndUsingBuilderThenGetAllSubTypes() {
|
||||
ReflectionsApp reflectionsApp = new ReflectionsApp();
|
||||
assertFalse(reflectionsApp.getReflectionsSubTypesUsingBuilder()
|
||||
.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAnnotationThenGetAllAnnotatedMethods() {
|
||||
ReflectionsApp reflectionsApp = new ReflectionsApp();
|
||||
assertFalse(reflectionsApp.getDateDeprecatedMethods()
|
||||
.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAnnotationThenGetAllAnnotatedConstructors() {
|
||||
ReflectionsApp reflectionsApp = new ReflectionsApp();
|
||||
assertFalse(reflectionsApp.getDateDeprecatedConstructors()
|
||||
.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenParamTypeThenGetAllMethods() {
|
||||
ReflectionsApp reflectionsApp = new ReflectionsApp();
|
||||
assertFalse(reflectionsApp.getMethodsWithDateParam()
|
||||
.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenReturnTypeThenGetAllMethods() {
|
||||
ReflectionsApp reflectionsApp = new ReflectionsApp();
|
||||
assertFalse(reflectionsApp.getMethodsWithVoidReturn()
|
||||
.isEmpty());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,126 @@
|
||||
package com.baeldung.resilence4j;
|
||||
|
||||
import io.github.resilience4j.bulkhead.Bulkhead;
|
||||
import io.github.resilience4j.bulkhead.BulkheadConfig;
|
||||
import io.github.resilience4j.bulkhead.BulkheadRegistry;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
|
||||
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
|
||||
import io.github.resilience4j.retry.Retry;
|
||||
import io.github.resilience4j.retry.RetryConfig;
|
||||
import io.github.resilience4j.retry.RetryRegistry;
|
||||
import io.github.resilience4j.timelimiter.TimeLimiter;
|
||||
import io.github.resilience4j.timelimiter.TimeLimiterConfig;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class Resilience4jUnitTest {
|
||||
|
||||
interface RemoteService {
|
||||
|
||||
int process(int i);
|
||||
}
|
||||
|
||||
private RemoteService service;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
service = mock(RemoteService.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenCircuitBreakerIsUsed_thenItWorksAsExpected() {
|
||||
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
|
||||
// Percentage of failures to start short-circuit
|
||||
.failureRateThreshold(20)
|
||||
// Min number of call attempts
|
||||
.ringBufferSizeInClosedState(5)
|
||||
.build();
|
||||
CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);
|
||||
CircuitBreaker circuitBreaker = registry.circuitBreaker("my");
|
||||
Function<Integer, Integer> decorated = CircuitBreaker.decorateFunction(circuitBreaker, service::process);
|
||||
|
||||
when(service.process(anyInt())).thenThrow(new RuntimeException());
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try {
|
||||
decorated.apply(i);
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
}
|
||||
|
||||
verify(service, times(5)).process(any(Integer.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenBulkheadIsUsed_thenItWorksAsExpected() throws InterruptedException {
|
||||
BulkheadConfig config = BulkheadConfig.custom().maxConcurrentCalls(1).build();
|
||||
BulkheadRegistry registry = BulkheadRegistry.of(config);
|
||||
Bulkhead bulkhead = registry.bulkhead("my");
|
||||
Function<Integer, Integer> decorated = Bulkhead.decorateFunction(bulkhead, service::process);
|
||||
|
||||
Future<?> taskInProgress = callAndBlock(decorated);
|
||||
try {
|
||||
assertThat(bulkhead.isCallPermitted()).isFalse();
|
||||
} finally {
|
||||
taskInProgress.cancel(true);
|
||||
}
|
||||
}
|
||||
|
||||
private Future<?> callAndBlock(Function<Integer, Integer> decoratedService) throws InterruptedException {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
when(service.process(anyInt())).thenAnswer(invocation -> {
|
||||
latch.countDown();
|
||||
Thread.currentThread().join();
|
||||
return null;
|
||||
});
|
||||
|
||||
ForkJoinTask<?> result = ForkJoinPool.commonPool().submit(() -> {
|
||||
decoratedService.apply(1);
|
||||
});
|
||||
latch.await();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenRetryIsUsed_thenItWorksAsExpected() {
|
||||
RetryConfig config = RetryConfig.custom().maxAttempts(2).build();
|
||||
RetryRegistry registry = RetryRegistry.of(config);
|
||||
Retry retry = registry.retry("my");
|
||||
Function<Integer, Void> decorated = Retry.decorateFunction(retry, (Integer s) -> {
|
||||
service.process(s);
|
||||
return null;
|
||||
});
|
||||
|
||||
when(service.process(anyInt())).thenThrow(new RuntimeException());
|
||||
try {
|
||||
decorated.apply(1);
|
||||
fail("Expected an exception to be thrown if all retries failed");
|
||||
} catch (Exception e) {
|
||||
verify(service, times(2)).process(any(Integer.class));
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void whenTimeLimiterIsUsed_thenItWorksAsExpected() throws Exception {
|
||||
long ttl = 1;
|
||||
TimeLimiterConfig config = TimeLimiterConfig.custom().timeoutDuration(Duration.ofMillis(ttl)).build();
|
||||
TimeLimiter timeLimiter = TimeLimiter.of(config);
|
||||
|
||||
Future futureMock = mock(Future.class);
|
||||
Callable restrictedCall = TimeLimiter.decorateFutureSupplier(timeLimiter, () -> futureMock);
|
||||
restrictedCall.call();
|
||||
|
||||
verify(futureMock).get(ttl, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
0
libraries-6/src/test/resources/ftp/baz.txt
Normal file
0
libraries-6/src/test/resources/ftp/baz.txt
Normal file
Reference in New Issue
Block a user