reactive streams test

This commit is contained in:
kim
2021-02-04 20:48:41 +09:00
parent cbe5aca2f5
commit 3b235633d7
6 changed files with 121 additions and 1 deletions

24
reactive-test/pom.xml Normal file
View File

@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>reactive-test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
</project>

View File

@@ -0,0 +1,8 @@
public class Application {
public static void main(String[] args) {
MyPublisher publisher = new MyPublisher();
MySubscriber subscriber = new MySubscriber();
publisher.subscribe(subscriber);
}
}

View File

@@ -0,0 +1,18 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import java.util.Arrays;
public class MyPublisher implements Publisher<Integer> {
Iterable<Integer> its = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
System.out.println("구독 신청");
System.out.println("구독 정보 생성");
MySubscription subscription = new MySubscription(subscriber, its);
System.out.println("구독 정보 생성완료");
subscriber.onSubscribe(subscription);
}
}

View File

@@ -0,0 +1,37 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class MySubscriber implements Subscriber<Integer> {
private Subscription s;
private final int SIZE = 5;
private int bufferSize = SIZE;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("구독 정보 받음");
this.s = subscription;
s.request(bufferSize); // 요청 개수
}
@Override
public void onNext(Integer integer) {
System.out.println("구독 데이터 전달 : " + integer);
bufferSize--;
if (bufferSize == 0) {
System.out.println("하루 지남");
bufferSize = SIZE;
s.request(bufferSize);
}
}
@Override
public void onError(Throwable throwable) {
System.out.println("구독 에러");
}
@Override
public void onComplete() {
System.out.println("구독 완료");
}
}

View File

@@ -0,0 +1,33 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.Iterator;
public class MySubscription implements Subscription {
private Subscriber s;
private Iterator<Integer> it;
public MySubscription(Subscriber s, Iterable<Integer> it) {
this.s = s;
this.it = it.iterator();
}
@Override
public void request(long l) {
while(l > 0) {
if (it.hasNext()) {
s.onNext(it.next());
} else {
s.onComplete();
break;
}
l--;
}
}
@Override
public void cancel() {
}
}

View File

@@ -16,7 +16,7 @@ public class MyFilterConfig {
@Bean
public FilterRegistrationBean<Filter> addFilter() {
FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<>(new MyFilter(eventNotify));
bean.addUrlPatterns("/*");
bean.addUrlPatterns("/sse");
return bean;
}
@Bean