#36 rxjava: 생성 연산자

This commit is contained in:
haerong22
2023-04-28 03:17:02 +09:00
parent eaf7da0fd9
commit f931669090
10 changed files with 292 additions and 0 deletions

View File

@@ -1,5 +1,7 @@
package org.example.ex07;
import org.example.common.Car;
import java.util.function.Function;
/**

View File

@@ -0,0 +1,31 @@
package org.example.ex08;
import org.example.utils.LogType;
import org.example.utils.Logger;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
public class CarRepairShop {
public int getCarRepairCostSync(int brokens) {
return calculateCarRepair(brokens);
}
public Future<Integer> getCarRepairCostAsync(int brokens) {
return CompletableFuture.supplyAsync(() -> calculateCarRepair(brokens));
}
private int calculateCarRepair(int brokens){
Logger.log(LogType.PRINT, "# 차량 수리비 계산 중................");
delay();
return brokens * 20000;
}
private void delay(){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@@ -0,0 +1,56 @@
package org.example.ex08;
import org.example.utils.LogType;
import org.example.utils.Logger;
import java.util.concurrent.Future;
public class FutureExampleASync {
public static void main(String[] args){
long startTime = System.currentTimeMillis();
Logger.log(LogType.PRINT, "# Start");
CarRepairShop shop = new CarRepairShop();
// 차량 수리비(시간이 더 오래 걸리는 미래에 끝날 일)
Future<Integer> future = shop.getCarRepairCostAsync(10);
// 회사에 병가 신청(짧은 처리 시간)
requestSickLeave("20170903-01");
// 보험 청구(짧은 처리 시간)
requestInsurance("44나4444");
try {
int carRepairCost = future.get();
Logger.log(LogType.PRINT, "# (1) 차량 수리비 계산이 완료되었습니다.");
Logger.log(LogType.PRINT, "# 차량 수리비는 " + carRepairCost + "원 입니다.");
} catch (Exception e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
double executeTime = (endTime - startTime) / 1000.0;
System.out.println();
System.out.println("# 처리 시간: " + executeTime);
}
private static void requestSickLeave(String empNumber) {
try {
Thread.sleep(1000);
Logger.log(LogType.PRINT, "# (2) 병가 신청이 완료되었습니다.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void requestInsurance(String carNumber) {
try {
Thread.sleep(1000);
Logger.log(LogType.PRINT, "# (3) 보험 접수가 완료 되었습니다.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@@ -0,0 +1,50 @@
package org.example.ex08;
import org.example.utils.LogType;
import org.example.utils.Logger;
public class FutureExampleSync {
public static void main(String[] args){
long startTime = System.currentTimeMillis();
CarRepairShop shop = new CarRepairShop();
// 차량 수리비
int carRepairCost = shop.getCarRepairCostSync(10);
Logger.log(LogType.PRINT, "# (1) 차량 수리비 계산이 완료되었습니다.");
Logger.log(LogType.PRINT, "# 차량 수리비는 " + carRepairCost + "원 입니다.");
// 회사에 병가 신청
requestSickLeave("20170903-01");
// 보험 접수
requestInsurance("44나4444");
long endTime = System.currentTimeMillis();
double executeTime = (endTime - startTime) / 1000.0;
System.out.println();
System.out.println("# 처리 시간: " + executeTime + "");
}
private static void requestSickLeave(String empNumber) {
try {
Thread.sleep(1000);
Logger.log(LogType.PRINT, "# (2) 병가 신청이 완료되었습니다.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void requestInsurance(String carNumber) {
try {
Thread.sleep(1000);
Logger.log(LogType.PRINT, "# (3) 보험 접수가 완료 되었습니다.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@@ -0,0 +1,30 @@
package org.example.ex08;
import io.reactivex.rxjava3.core.Observable;
import org.example.utils.LogType;
import org.example.utils.Logger;
import java.time.LocalTime;
/**
* 실제 구독이 발생할 때 Observable을 새로 반환하여 새로운 Observable을 생성한다.
* defer()를 활용하면 데이터 흐름의 생성을 지연하는 효과를 보여준다.
*/
public class ObservableDeferEx {
public static void main(String[] args) throws InterruptedException {
Observable<LocalTime> observable = Observable.defer(() -> {
LocalTime currentTime = LocalTime.now();
return Observable.just(currentTime);
});
Observable<LocalTime> observableJust = Observable.just(LocalTime.now());
observable.subscribe(time -> Logger.log(LogType.PRINT, " # defer() 구독1의 구독 시간: " + time));
observableJust.subscribe(time -> Logger.log(LogType.PRINT, " # just() 구독1의 구독 시간: " + time));
Thread.sleep(3000);
observable.subscribe(time -> Logger.log(LogType.PRINT, " # defer() 구독2의 구독 시간: " + time));
observableJust.subscribe(time -> Logger.log(LogType.PRINT, " # just() 구독자2의 구독 시간: " + time));
}
}

View File

@@ -0,0 +1,44 @@
package org.example.ex08;
import io.reactivex.rxjava3.core.Observable;
import org.example.utils.LogType;
import org.example.utils.Logger;
import org.example.utils.TimeUtil;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class ObservableFromFutureEx {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Logger.log(LogType.PRINT, "# start time");
// 긴 처리 시간이 걸리는 작업
Future<Double> future = longTimeWork();
// 짧은 처리 시간이 걸리는 작업
shortTimeWork();
Observable.fromFuture(future)
.subscribe(data -> Logger.log(LogType.PRINT, "# 긴 처리 시간 작업 결과 : " + data));
Logger.log(LogType.PRINT, "# end time");
}
public static CompletableFuture<Double> longTimeWork(){
return CompletableFuture.supplyAsync(ObservableFromFutureEx::calculate);
}
private static Double calculate() {
Logger.log(LogType.PRINT, "# 긴 처리 시간이 걸리는 작업 중.........");
TimeUtil.sleep(6000L);
return 100000000000000000.0;
}
private static void shortTimeWork() {
TimeUtil.sleep(3000L);
Logger.log(LogType.PRINT, "# 짧은 처리 시간 작업 완료!");
}
}

View File

@@ -0,0 +1,17 @@
package org.example.ex08;
import io.reactivex.rxjava3.core.Observable;
import org.example.utils.LogType;
import org.example.utils.Logger;
import java.util.Arrays;
import java.util.List;
public class ObservableFromIterableEx {
public static void main(String[] args){
List<String> countries = Arrays.asList("Korea", "Canada", "USA", "Italy");
Observable.fromIterable(countries)
.subscribe(country -> Logger.log(LogType.ON_NEXT, country));
}
}

View File

@@ -0,0 +1,23 @@
package org.example.ex08;
import io.reactivex.rxjava3.core.Observable;
import org.example.utils.LogType;
import org.example.utils.Logger;
import org.example.utils.TimeUtil;
import java.util.concurrent.TimeUnit;
/**
* polling 용도로 주로 사용.
*/
public class ObservableIntervalEx {
public static void main(String[] args){
System.out.println("# start : " + TimeUtil.getCurrentTimeFormatted());
Observable.interval(0, 1000L, TimeUnit.MILLISECONDS)
.map(num -> num + " count")
.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(3000);
}
}

View File

@@ -0,0 +1,15 @@
package org.example.ex08;
import io.reactivex.rxjava3.core.Observable;
import org.example.utils.LogType;
import org.example.utils.Logger;
/**
* 반복문으로 사용 가능
*/
public class ObservableRangeEx {
public static void main(String[] args){
Observable<Integer> source = Observable.range(0, 5);
source.subscribe(num -> Logger.log(LogType.ON_NEXT, num));
}
}

View File

@@ -0,0 +1,24 @@
package org.example.ex08;
import io.reactivex.rxjava3.core.Observable;
import org.example.utils.LogType;
import org.example.utils.Logger;
import org.example.utils.TimeUtil;
import java.util.concurrent.TimeUnit;
/**
* 설정한 시간이 지난 후에 특정 동작을 수행하고자 할때 사용
*/
public class ObservableTimerEx {
public static void main(String[] args){
Logger.log(LogType.PRINT, "# Start!");
Observable<String> observable =
Observable.timer(2000, TimeUnit.MILLISECONDS)
.map(count -> "Do work!");
observable.subscribe(data -> Logger.log(LogType.ON_NEXT, data));
TimeUtil.sleep(3000);
}
}