diff --git a/kotlin-quasar/pom.xml b/kotlin-quasar/pom.xml new file mode 100644 index 0000000000..44feabd183 --- /dev/null +++ b/kotlin-quasar/pom.xml @@ -0,0 +1,120 @@ + + + 4.0.0 + com.baeldung + kotlin-quasar + 1.0.0-SNAPSHOT + kotlin-quasar + jar + + + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + ${kotlin.version} + + + org.jetbrains.kotlin + kotlin-test + ${kotlin.version} + test + + + co.paralleluniverse + quasar-core + ${quasar.version} + + + co.paralleluniverse + quasar-actors + ${quasar.version} + + + co.paralleluniverse + quasar-reactive-streams + ${quasar.version} + + + co.paralleluniverse + quasar-kotlin + ${quasar.version} + + + junit + junit + 4.12 + + + + + src/main/kotlin + src/test/kotlin + + + org.jetbrains.kotlin + kotlin-maven-plugin + ${kotlin.version} + + + compile + compile + + compile + + + + test-compile + test-compile + + test-compile + + + + + 1.8 + + + + maven-dependency-plugin + 3.1.1 + + + getClasspathFilenames + + properties + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.1 + + -Dco.paralleluniverse.fibers.verifyInstrumentation=true + -javaagent:${co.paralleluniverse:quasar-core:jar} + + + + org.codehaus.mojo + exec-maven-plugin + 1.3.2 + + target/classes + echo + + -javaagent:${co.paralleluniverse:quasar-core:jar} + -classpath + com.baeldung.quasar.QuasarHelloWorldKt + + + + + + + + 0.8.0 + 1.3.31 + + diff --git a/kotlin-quasar/src/main/kotlin/com/baeldung/quasar/QuasarHelloWorld.kt b/kotlin-quasar/src/main/kotlin/com/baeldung/quasar/QuasarHelloWorld.kt new file mode 100644 index 0000000000..9bf01ecb09 --- /dev/null +++ b/kotlin-quasar/src/main/kotlin/com/baeldung/quasar/QuasarHelloWorld.kt @@ -0,0 +1,19 @@ +package com.baeldung.quasar + +import co.paralleluniverse.fibers.Fiber +import co.paralleluniverse.strands.SuspendableRunnable + + +/** + * Entrypoint into the application + */ +fun main(args: Array) { + class Runnable : SuspendableRunnable { + override fun run() { + println("Hello") + } + } + val result = Fiber(Runnable()).start() + result.join() + println("World") +} diff --git a/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/ChannelsTest.kt b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/ChannelsTest.kt new file mode 100644 index 0000000000..b51943446e --- /dev/null +++ b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/ChannelsTest.kt @@ -0,0 +1,155 @@ +package com.baeldung.quasar + +import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.kotlin.fiber +import co.paralleluniverse.strands.channels.Channels +import co.paralleluniverse.strands.channels.Selector +import com.google.common.base.Function +import org.junit.Test + +class ChannelsTest { + @Test + fun createChannel() { + Channels.newChannel(0, // The size of the channel buffer + Channels.OverflowPolicy.BLOCK, // The policy for when the buffer is full + true, // Whether we should optimize for a single message producer + true) // Whether we should optimize for a single message consumer + } + + @Test + fun blockOnMessage() { + val channel = Channels.newChannel(0, Channels.OverflowPolicy.BLOCK, true, true) + + fiber @Suspendable { + while (!channel.isClosed) { + val message = channel.receive() + println("Received: $message") + } + println("Stopped receiving messages") + } + + channel.send("Hello") + channel.send("World") + + channel.close() + } + + @Test + fun selectReceiveChannels() { + val channel1 = Channels.newChannel(0, Channels.OverflowPolicy.BLOCK, true, true) + val channel2 = Channels.newChannel(0, Channels.OverflowPolicy.BLOCK, true, true) + + fiber @Suspendable { + while (!channel1.isClosed && !channel2.isClosed) { + val received = Selector.select(Selector.receive(channel1), Selector.receive(channel2)) + + println("Received: $received") + } + } + + fiber @Suspendable { + for (i in 0..10) { + channel1.send("Channel 1: $i") + } + } + + fiber @Suspendable { + for (i in 0..10) { + channel2.send("Channel 2: $i") + } + } + } + + @Test + fun selectSendChannels() { + val channel1 = Channels.newChannel(0, Channels.OverflowPolicy.BLOCK, true, true) + val channel2 = Channels.newChannel(0, Channels.OverflowPolicy.BLOCK, true, true) + + fiber @Suspendable { + for (i in 0..10) { + Selector.select( + Selector.send(channel1, "Channel 1: $i"), + Selector.send(channel2, "Channel 2: $i") + ) + } + } + + fiber @Suspendable { + while (!channel1.isClosed) { + val msg = channel1.receive() + println("Read: $msg") + } + } + + fiber @Suspendable { + while (!channel2.isClosed) { + val msg = channel2.receive() + println("Read: $msg") + } + } + } + + @Test + fun tickerChannel() { + val channel = Channels.newChannel(3, Channels.OverflowPolicy.DISPLACE) + + for (i in 0..10) { + val tickerConsumer = Channels.newTickerConsumerFor(channel) + fiber @Suspendable { + while (!tickerConsumer.isClosed) { + val message = tickerConsumer.receive() + println("Received on $i: $message") + } + println("Stopped receiving messages on $i") + } + } + + for (i in 0..50) { + channel.send("Message $i") + } + + channel.close() + } + + + @Test + fun transformOnSend() { + val channel = Channels.newChannel(0, Channels.OverflowPolicy.BLOCK, true, true) + + fiber @Suspendable { + while (!channel.isClosed) { + val message = channel.receive() + println("Received: $message") + } + println("Stopped receiving messages") + } + + val transformOnSend = Channels.mapSend(channel, Function { msg: String? -> msg?.toUpperCase() }) + + transformOnSend.send("Hello") + transformOnSend.send("World") + + channel.close() + } + + @Test + fun transformOnReceive() { + val channel = Channels.newChannel(0, Channels.OverflowPolicy.BLOCK, true, true) + + val transformOnReceive = Channels.map(channel, Function { msg: String? -> msg?.reversed() }) + + fiber @Suspendable { + while (!transformOnReceive.isClosed) { + val message = transformOnReceive.receive() + println("Received: $message") + } + println("Stopped receiving messages") + } + + + channel.send("Hello") + channel.send("World") + + channel.close() + } +} diff --git a/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/DataflowTest.kt b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/DataflowTest.kt new file mode 100644 index 0000000000..3f73af3917 --- /dev/null +++ b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/DataflowTest.kt @@ -0,0 +1,35 @@ +package com.baeldung.quasar + +import co.paralleluniverse.strands.dataflow.Val +import co.paralleluniverse.strands.dataflow.Var +import org.junit.Assert +import org.junit.Test +import java.util.concurrent.TimeUnit + +class DataflowTest { + @Test + fun testValVar() { + val a = Var() + val b = Val() + + val c = Var { a.get() + b.get() } + val d = Var { a.get() * b.get() } + + // (a*b) - (a+b) + val initialResult = Val { d.get() - c.get() } + val currentResult = Var { d.get() - c.get() } + + a.set(2) + b.set(4) + + Assert.assertEquals(2, initialResult.get()) + Assert.assertEquals(2, currentResult.get()) + + a.set(3) + + TimeUnit.SECONDS.sleep(1) + + Assert.assertEquals(2, initialResult.get()) + Assert.assertEquals(5, currentResult.get()) + } +} diff --git a/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/PiAsyncTest.kt b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/PiAsyncTest.kt new file mode 100644 index 0000000000..d4ea04820d --- /dev/null +++ b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/PiAsyncTest.kt @@ -0,0 +1,53 @@ +package com.baeldung.quasar + +import co.paralleluniverse.fibers.Fiber +import co.paralleluniverse.fibers.FiberAsync +import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.kotlin.fiber +import co.paralleluniverse.strands.Strand +import org.junit.Assert +import org.junit.Test +import java.math.BigDecimal +import java.util.concurrent.TimeUnit + +interface PiCallback { + fun success(result: BigDecimal) + fun failure(error: Exception) +} + +fun computePi(callback: PiCallback) { + println("Starting calculations") + TimeUnit.SECONDS.sleep(2) + println("Finished calculations") + callback.success(BigDecimal("3.14")) +} + +class PiAsync : PiCallback, FiberAsync() { + override fun success(result: BigDecimal) { + asyncCompleted(result) + } + + override fun failure(error: Exception) { + asyncFailed(error) + } + + override fun requestAsync() { + computePi(this) + } +} + +class PiAsyncTest { + @Test + fun testPi() { + val result = fiber @Suspendable { + val pi = PiAsync() + println("Waiting to get PI on: " + Fiber.currentFiber().name) + val result = pi.run() + println("Got PI") + + result + }.get() + + Assert.assertEquals(BigDecimal("3.14"), result) + } +} diff --git a/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/SuspendableCallableTest.kt b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/SuspendableCallableTest.kt new file mode 100644 index 0000000000..9b139dd686 --- /dev/null +++ b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/SuspendableCallableTest.kt @@ -0,0 +1,48 @@ +package com.baeldung.quasar + +import co.paralleluniverse.fibers.Fiber +import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.kotlin.fiber +import co.paralleluniverse.strands.SuspendableCallable +import org.junit.Assert +import org.junit.Test +import java.util.concurrent.TimeUnit + + +class SuspendableCallableTest { + @Test + fun createFiber() { + class Callable : SuspendableCallable { + override fun run(): String { + println("Inside Fiber") + return "Hello" + } + } + val result = Fiber(Callable()).start() + + Assert.assertEquals("Hello", result.get()) + } + + @Test + fun createFiberLambda() { + val lambda: (() -> String) = { + println("Inside Fiber Lambda") + "Hello" + } + val result = Fiber(lambda) + result.start() + + Assert.assertEquals("Hello", result.get()) + } + + @Test + fun createFiberDsl() { + val result = fiber @Suspendable { + TimeUnit.SECONDS.sleep(5) + println("Inside Fiber DSL") + "Hello" + } + + Assert.assertEquals("Hello", result.get()) + } +} diff --git a/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/SuspensableRunnableTest.kt b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/SuspensableRunnableTest.kt new file mode 100644 index 0000000000..ba4cef8f4c --- /dev/null +++ b/kotlin-quasar/src/test/kotlin/com/baeldung/quasar/SuspensableRunnableTest.kt @@ -0,0 +1,47 @@ +package com.baeldung.quasar + +import co.paralleluniverse.fibers.Fiber +import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.kotlin.fiber +import co.paralleluniverse.strands.SuspendableRunnable +import org.junit.Test +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException + + +class SuspensableRunnableTest { + @Test + fun createFiber() { + class Runnable : SuspendableRunnable { + override fun run() { + println("Inside Fiber") + } + } + val result = Fiber(Runnable()).start() + result.join() + } + + @Test + fun createFiberLambda() { + val result = Fiber { + println("Inside Fiber Lambda") + } + result.start() + result.join() + } + + @Test + fun createFiberDsl() { + fiber @Suspendable { + println("Inside Fiber DSL") + }.join() + } + + @Test(expected = TimeoutException::class) + fun fiberTimeout() { + fiber @Suspendable { + TimeUnit.SECONDS.sleep(5) + println("Inside Fiber DSL") + }.join(2, TimeUnit.SECONDS) + } +}