diff --git a/kotlin-quasar/pom.xml b/kotlin-quasar/pom.xml index 44feabd183..a3e5a1ec25 100644 --- a/kotlin-quasar/pom.xml +++ b/kotlin-quasar/pom.xml @@ -45,6 +45,27 @@ junit 4.12 + + + org.slf4j + slf4j-api + ${org.slf4j.version} + + + ch.qos.logback + logback-classic + ${logback.version} + + + ch.qos.logback + logback-core + ${logback.version} + + + org.slf4j + jcl-over-slf4j + ${org.slf4j.version} + @@ -116,5 +137,7 @@ 0.8.0 1.3.31 + 1.7.21 + 1.1.7 diff --git a/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/ActorsBehaviorTest.kt b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/ActorsBehaviorTest.kt new file mode 100644 index 0000000000..b4d0288a64 --- /dev/null +++ b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/ActorsBehaviorTest.kt @@ -0,0 +1,157 @@ +package com.baeldung.quasar + +import co.paralleluniverse.actors.Actor +import co.paralleluniverse.actors.ActorRef +import co.paralleluniverse.actors.behaviors.* +import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.strands.SuspendableCallable +import org.junit.Test +import org.slf4j.LoggerFactory +import java.lang.Exception + +class ActorsBehaviorTest { + companion object { + private val LOG = LoggerFactory.getLogger(ActorsBehaviorTest::class.java) + } + + @Test + fun requestReplyHelper() { + data class TestMessage(val input: Int) : RequestMessage() + + val actor = object : Actor("requestReplyActor", null) { + @Suspendable + override fun doRun(): Void? { + while (true) { + val msg = receive() + LOG.info("Processing message: {}", msg) + + RequestReplyHelper.reply(msg, msg.input * 100) + } + } + } + + val actorRef = actor.spawn() + + val result = RequestReplyHelper.call(actorRef, TestMessage(50)) + LOG.info("Received reply: {}", result) + } + + @Test + fun server() { + val actor = ServerActor(object : AbstractServerHandler() { + @Suspendable + override fun handleCall(from: ActorRef<*>?, id: Any?, m: Int?): String { + LOG.info("Called with message: {} from {} with ID {}", m, from, id) + return m.toString() ?: "None" + } + + @Suspendable + override fun handleCast(from: ActorRef<*>?, id: Any?, m: Float?) { + LOG.info("Cast message: {} from {} with ID {}", m, from, id) + } + }) + + val server = actor.spawn() + + LOG.info("Call result: {}", server.call(5)) + server.cast(2.5f) + + server.shutdown() + } + + interface Summer { + fun sum(a: Int, b: Int) : Int + } + + @Test + fun proxyServer() { + val actor = ProxyServerActor(false, object : Summer { + @Synchronized + override fun sum(a: Int, b: Int): Int { + Exception().printStackTrace() + LOG.info("Adding together {} and {}", a, b) + return a + b + } + }) + + val summerActor = actor.spawn() + + val result = (summerActor as Summer).sum(1, 2) + LOG.info("Result: {}", result) + + summerActor.shutdown() + } + + @Test + fun eventSource() { + val actor = EventSourceActor() + val eventSource = actor.spawn() + + eventSource.addHandler { msg -> + LOG.info("Sent message: {}", msg) + } + + val name = "Outside Value" + eventSource.addHandler { msg -> + LOG.info("Also Sent message: {} {}", msg, name) + } + + eventSource.send("Hello") + + eventSource.shutdown() + } + + @Test + fun finiteStateMachine() { + val actor = object : FiniteStateMachineActor() { + @Suspendable + override fun initialState(): SuspendableCallable> { + LOG.info("Starting") + return SuspendableCallable { lockedState() } + } + + @Suspendable + fun lockedState() : SuspendableCallable> { + return receive {msg -> + when (msg) { + "PUSH" -> { + LOG.info("Still locked") + lockedState() + } + "COIN" -> { + LOG.info("Unlocking...") + unlockedState() + } + else -> TERMINATE + } + } + } + + @Suspendable + fun unlockedState() : SuspendableCallable> { + return receive {msg -> + when (msg) { + "PUSH" -> { + LOG.info("Locking") + lockedState() + } + "COIN" -> { + LOG.info("Unlocked") + unlockedState() + } + else -> TERMINATE + } + } + } + } + + val actorRef = actor.spawn() + + listOf("PUSH", "COIN", "COIN", "PUSH", "PUSH").forEach { + LOG.info(it) + actorRef.sendSync(it) + } + + actorRef.shutdown() + } +} diff --git a/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/ActorsTest.kt b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/ActorsTest.kt new file mode 100644 index 0000000000..819a149af3 --- /dev/null +++ b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/ActorsTest.kt @@ -0,0 +1,298 @@ +package com.baeldung.quasar + +import co.paralleluniverse.actors.* +import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.strands.channels.Channels +import org.junit.Assert +import org.junit.Test +import org.slf4j.LoggerFactory +import java.util.concurrent.TimeUnit + +class ActorsTest { + companion object { + private val LOG = LoggerFactory.getLogger(ActorsTest::class.java) + } + + @Test + fun createNoopActor() { + val actor = object : Actor("noopActor", MailboxConfig(5, Channels.OverflowPolicy.THROW)) { + @Suspendable + override fun doRun(): String { + return "Hello" + } + } + + actor.spawn() + + println("Noop Actor: ${actor.get()}") + } + + @Test + fun registerActor() { + val actor = object : Actor("registerActor", null) { + @Suspendable + override fun doRun(): String { + return "Hello" + } + } + + val actorRef = actor.spawn() + actor.register() + + val retrievedRef = ActorRegistry.getActor>("registerActor") + + Assert.assertEquals(actorRef, retrievedRef) + actor.join() + } + + @Test + fun registerActorNewName() { + val actor = object : Actor(null, null) { + @Suspendable + override fun doRun(): String { + return "Hello" + } + } + + val actorRef = actor.spawn() + actor.register("renamedActor") + + val retrievedRef = ActorRegistry.getActor>("renamedActor") + + Assert.assertEquals(actorRef, retrievedRef) + actor.join() + } + + @Test + fun retrieveUnknownActor() { + val retrievedRef = ActorRegistry.getActor>("unknownActor", 1, TimeUnit.SECONDS) + + Assert.assertNull(retrievedRef) + } + + @Test + fun createSimpleActor() { + val actor = object : Actor("simpleActor", null) { + @Suspendable + override fun doRun(): Void? { + val msg = receive() + LOG.info("SimpleActor Received Message: {}", msg) + + return null + } + } + + val actorRef = actor.spawn() + + actorRef.send(1) + + actor.join() + } + + @Test + fun createLoopingActor() { + val actor = object : Actor("loopingActor", null) { + @Suspendable + override fun doRun(): Void? { + while (true) { + val msg = receive() + + if (msg > 0) { + LOG.info("LoopingActor Received Message: {}", msg) + } else { + break + } + } + + return null + } + } + + val actorRef = actor.spawn() + + actorRef.send(3) + actorRef.send(2) + actorRef.send(1) + actorRef.send(0) + + actor.join() + } + + @Test + fun actorBacklog() { + val actor = object : Actor("backlogActor", MailboxConfig(1, Channels.OverflowPolicy.THROW)) { + @Suspendable + override fun doRun(): String { + TimeUnit.MILLISECONDS.sleep(500); + LOG.info("Backlog Actor Received: {}", receive()) + + try { + receive() + } catch (e: Throwable) { + LOG.info("==== Exception throws by receive() ====") + e.printStackTrace() + } + + return "No Exception" + } + } + + val actorRef = actor.spawn() + + actorRef.send(1) + actorRef.send(2) + + try { + LOG.info("Backlog Actor: {}", actor.get()) + } catch (e: Exception) { + // Expected + LOG.info("==== Exception throws by get() ====") + e.printStackTrace() + } + } + + @Test + fun actorBacklogTrySend() { + val actor = object : Actor("backlogTrySendActor", MailboxConfig(1, Channels.OverflowPolicy.THROW)) { + @Suspendable + override fun doRun(): String { + TimeUnit.MILLISECONDS.sleep(500); + LOG.info("Backlog TrySend Actor Received: {}", receive()) + + return "No Exception" + } + } + + val actorRef = actor.spawn() + + LOG.info("Backlog TrySend 1: {}", actorRef.trySend(1)) + LOG.info("Backlog TrySend 1: {}", actorRef.trySend(2)) + + actor.join() + } + + @Test + fun actorTimeoutReceive() { + val actor = object : Actor("TimeoutReceiveActor", MailboxConfig(1, Channels.OverflowPolicy.THROW)) { + @Suspendable + override fun doRun(): String { + LOG.info("Timeout Actor Received: {}", receive(500, TimeUnit.MILLISECONDS)) + + return "Finished" + } + } + + val actorRef = actor.spawn() + + TimeUnit.MILLISECONDS.sleep(300) + actorRef.trySend(1) + + actor.join() + } + + + @Test + fun actorNonBlockingReceive() { + val actor = object : Actor("NonBlockingReceiveActor", MailboxConfig(1, Channels.OverflowPolicy.THROW)) { + @Suspendable + override fun doRun(): String { + LOG.info("NonBlocking Actor Received #1: {}", tryReceive()) + TimeUnit.MILLISECONDS.sleep(500) + LOG.info("NonBlocking Actor Received #2: {}", tryReceive()) + + return "Finished" + } + } + + val actorRef = actor.spawn() + + TimeUnit.MILLISECONDS.sleep(300) + actorRef.trySend(1) + + actor.join() + } + + @Test + fun evenActor() { + val actor = object : Actor("EvenActor", null) { + @Suspendable + override fun filterMessage(m: Any?): Int? { + return when (m) { + is Int -> { + if (m % 2 == 0) { + m * 10 + } else { + null + } + } + else -> super.filterMessage(m) + } + } + + @Suspendable + override fun doRun(): Void? { + while (true) { + val msg = receive() + + if (msg > 0) { + LOG.info("EvenActor Received Message: {}", msg) + } else { + break + } + } + + return null + } + } + + val actorRef = actor.spawn() + + actorRef.send(3) + actorRef.send(2) + actorRef.send(1) + actorRef.send(0) + + actor.join() + } + + @Test + fun watchingActors() { + val watched = object : Actor("WatchedActor", null) { + @Suspendable + override fun doRun(): Void? { + LOG.info("WatchedActor Starting") + receive(500, TimeUnit.MILLISECONDS) + LOG.info("WatchedActor Finishing") + return null + } + } + + val watcher = object : Actor("WatcherActor", null) { + @Suspendable + override fun doRun(): Void? { + LOG.info("WatcherActor Listening") + try { + LOG.info("WatcherActor received Message: {}", receive(2, TimeUnit.SECONDS)) + } catch (e: Exception) { + LOG.info("WatcherActor Received Exception", e) + } + return null + } + + @Suspendable + override fun handleLifecycleMessage(m: LifecycleMessage?): Int? { + LOG.info("WatcherActor Received Lifecycle Message: {}", m) + return super.handleLifecycleMessage(m) + } + } + + val watcherRef = watcher.spawn() + TimeUnit.MILLISECONDS.sleep(200) + + val watchedRef = watched.spawn() + watcher.link(watchedRef) + + watched.join() + watcher.join() + } +} diff --git a/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/ReactiveStreamsTest.kt b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/ReactiveStreamsTest.kt new file mode 100644 index 0000000000..83e06bf7d6 --- /dev/null +++ b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/ReactiveStreamsTest.kt @@ -0,0 +1,135 @@ +package com.baeldung.quasar + +import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.kotlin.fiber +import co.paralleluniverse.strands.channels.Channels +import co.paralleluniverse.strands.channels.Topic +import co.paralleluniverse.strands.channels.reactivestreams.ReactiveStreams +import org.junit.Test +import org.reactivestreams.Subscriber +import org.reactivestreams.Subscription +import org.slf4j.LoggerFactory +import java.util.concurrent.TimeUnit + +class ReactiveStreamsTest { + companion object { + private val LOG = LoggerFactory.getLogger(ReactiveStreamsTest::class.java) + } + + @Test + fun publisher() { + val inputChannel = Channels.newChannel(1); + + val publisher = ReactiveStreams.toPublisher(inputChannel) + publisher.subscribe(object : Subscriber { + @Suspendable + override fun onComplete() { + LOG.info("onComplete") + } + + @Suspendable + override fun onSubscribe(s: Subscription) { + LOG.info("onSubscribe: {}", s) + s.request(2) + } + + @Suspendable + override fun onNext(t: String?) { + LOG.info("onNext: {}", t) + } + + @Suspendable + override fun onError(t: Throwable?) { + LOG.info("onError: {}", t) + } + }) + + inputChannel.send("Hello") + inputChannel.send("World") + + TimeUnit.SECONDS.sleep(1) + + inputChannel.close() + } + + @Test + fun publisherTopic() { + val inputTopic = Topic() + + val publisher = ReactiveStreams.toPublisher(inputTopic) + publisher.subscribe(object : Subscriber { + @Suspendable + override fun onComplete() { + LOG.info("onComplete 1") + } + + @Suspendable + override fun onSubscribe(s: Subscription) { + LOG.info("onSubscribe 1: {}", s) + s.request(2) + } + + @Suspendable + override fun onNext(t: String?) { + LOG.info("onNext 1: {}", t) + } + + @Suspendable + override fun onError(t: Throwable?) { + LOG.info("onError 1: {}", t) + } + }) + publisher.subscribe(object : Subscriber { + @Suspendable + override fun onComplete() { + LOG.info("onComplete 2") + } + + @Suspendable + override fun onSubscribe(s: Subscription) { + LOG.info("onSubscribe 2: {}", s) + s.request(2) + } + + @Suspendable + override fun onNext(t: String?) { + LOG.info("onNext 2: {}", t) + } + + @Suspendable + override fun onError(t: Throwable?) { + LOG.info("onError 2: {}", t) + } + }) + + inputTopic.send("Hello") + inputTopic.send("World") + + TimeUnit.SECONDS.sleep(1) + + inputTopic.close() + } + + @Test + fun subscribe() { + val inputChannel = Channels.newChannel(10); + val publisher = ReactiveStreams.toPublisher(inputChannel) + + val channel = ReactiveStreams.subscribe(10, Channels.OverflowPolicy.THROW, publisher) + + fiber @Suspendable { + while (!channel.isClosed) { + val message = channel.receive() + LOG.info("Received: {}", message) + } + LOG.info("Stopped receiving messages") + } + + inputChannel.send("Hello") + inputChannel.send("World") + + TimeUnit.SECONDS.sleep(1) + + inputChannel.close() + } +}