Add Dining Philosopher and Consumer Producer base code structure

This commit is contained in:
2022-09-23 05:06:34 +02:00
commit 83e6b4d4c6
12 changed files with 792 additions and 0 deletions

View File

@@ -0,0 +1,67 @@
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main(args: Array<String>) = runBlocking {
val kitchen = Kitchen((2..8).random())
repeat((10..20).random()) {
launch {
Consumer(
hunger = (2..8).random(),
kitchen = kitchen,
whenConsuming = { it?.let { println("Yummy $it") } ?: run { println("I'm still hungry :(") } }
).start()
}
}
repeat((50..70).random()) {
launch {
Producer(kitchen).start()
}
}
}
class Consumer(
private val hunger: Int,
private val kitchen: Kitchen,
private val whenConsuming: (String?) -> Unit
) {
suspend fun start() {
TODO("Not yet implemented")
}
private suspend fun consume(product: String?) {
delay((1L..30L).random())
whenConsuming(product)
}
private suspend fun collect(): String? {
delay((5L..10L).random())
TODO("Not yet implemented")
}
}
class Producer(
private val kitchen: Kitchen,
private val whenProducing: (String?) -> Unit = {}
) {
suspend fun start() {
while (true) {
TODO("Not yet implemented")
}
}
private suspend fun produce(): String {
delay((30L..60L).random())
whenProducing("Weißwurst")
return "Weißwurst"
}
private suspend fun deliver(product: String) {
delay((5L..10L).random())
}
}
class Kitchen(capacity: Int) {
//TODO
}

View File

@@ -0,0 +1,104 @@
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
class Fork(id: String) : Observable<String?>(id) {
//TODO
// --------------- EDIT CODE BELOW ---------------
suspend fun take(philosopher: String) {
setOccupiedBy(philosopher)
}
suspend fun release() {
setOccupiedBy(null)
}
// --------------- EDIT CODE ABOVE ---------------
private suspend fun setOccupiedBy(philosopher: String?) {
delay(5L)
onChange(philosopher)
occupiedBy = philosopher
}
private var occupiedBy: String? = null
fun available() = occupiedBy == null
}
class Philosopher(
id: String,
private val leftFork: Fork,
private val rightFork: Fork,
) : Observable<String>(id) {
//TODO
// --------------- EDIT CODE BELOW ---------------
suspend fun start() = repeat(10) {
}.also { changeState("FINISHED") }
private suspend fun takeForks() {
leftFork.take(id)
rightFork.take(id)
}
// --------------- EDIT CODE ABOVE ---------------
private suspend fun wait() {
delay(1)
}
private suspend fun eat() {
changeState("EATING")
delay((0L..10L).random())
}
private suspend fun think() {
changeState("THINKING")
delay((0L..10L).random())
changeState("HUNGRY")
}
private suspend fun changeState(state: String) {
onChange(state)
this.state = state
}
private var state = "THINKING"
}
open class Observable<T>(val id: String) {
var onChange: suspend (String, T) -> Unit = { _, _ -> }
protected suspend fun onChange(state: T) = onChange(id, state)
}
suspend fun main(args: Array<String>) = runBlocking {
val number = 5
val forks = Array(number) { Fork("Fork$it") }
val philosophers = Array(number) { index ->
Philosopher(
id = "Philosopher$index",
leftFork = forks[index],
rightFork = forks[(index + 1) % number]
)
}
philosophers.forEach {
it.onChange = { philosopher, state ->
println("$philosopher is now $state")
}
}
forks.forEach {
it.onChange = { fork, philosopher ->
println(philosopher?.let { "$fork acquired by $it" } ?: "$fork released")
}
}
philosophers.forEach {
launch { it.start() }
}
}

View File

@@ -0,0 +1,58 @@
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.Exhaustive
import io.kotest.property.arbitrary.next
import io.kotest.property.arbitrary.positiveInt
import io.kotest.property.exhaustive.cartesian
import io.kotest.property.exhaustive.exhaustive
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import java.util.concurrent.atomic.AtomicInteger
class ConsumerProducerTest : StringSpec() {
private val consumerNumber = listOf(2, 5, 18).exhaustive()
private val producerNumber = listOf(3, 26, 39).exhaustive()
private val capacity = listOf(1, 5, 12).exhaustive()
private val hunger = Arb.positiveInt(max = 50)
init {
Exhaustive.cartesian(consumerNumber, producerNumber, capacity) { c, p, n ->
"$c consumers, $p producers, capacity $n" {
val consumerCounters = Array(c) { AtomicInteger(0) }
val consumerHunger = Array(p) { hunger.next() }
val producerCounter = AtomicInteger(0)
val kitchen = Kitchen(n)
val consumers = (0..p).map { index ->
async {
Consumer(
hunger = consumerHunger[index],
kitchen = kitchen,
whenConsuming = { if (it == "Weißwurst") consumerCounters[index].getAndIncrement() }
).start()
}
}
val producers = (0..p).map {
async {
Producer(
kitchen = kitchen,
whenProducing = { if (it == "Weißwurst") producerCounter.getAndIncrement() }
).start()
}
}
consumers.awaitAll()
producers.awaitAll()
consumerCounters.forEachIndexed { i, c ->
c shouldBe consumerHunger[i]
}
consumerCounters.fold(0) { sum, counter ->
sum + counter.get()
} shouldBe producerCounter.get()
}
}
}
}

View File

@@ -0,0 +1,200 @@
import io.kotest.assertions.fail
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.longs.shouldBeLessThan
import io.kotest.matchers.shouldBe
import io.kotest.property.checkAll
import io.kotest.property.exhaustive.exhaustive
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlin.system.measureTimeMillis
import kotlin.time.Duration.Companion.milliseconds
@OptIn(ObsoleteCoroutinesApi::class)
class DiningPhilosophersTest : StringSpec() {
init {
coroutineDebugProbes = true
timeout = 2000
val philosopherNumbers = listOf(2, 5, 12).exhaustive()
"No Deadlock" {
philosopherNumbers.checkAll { n ->
println("-----Seating $n philosophers-----")
val diningPhilosopherLogger = actor<LogMessage> {
val forks = HashMap<String, String?>()
for (msg in channel) {
when {
msg.id.contains("Fork") -> {
forks[msg.id] = msg.state
if (forks.entries.filter { it.value != null }.groupBy { it.value }.size == n) {
fail(
"All forks are blocked by pair wise distinct philosophers. This usually results in a dead lock. " +
"Some solutions to deadlocks won't resolve this test. If the other tests run successfully ignore this."
)
}
}
}
}
}
withContext(Dispatchers.Default) {
philosophyTime(n, diningPhilosopherLogger)
}
diningPhilosopherLogger.close()
}
}
"All philosophers think 10 times" {
philosopherNumbers.checkAll { n ->
println("-----Seating $n philosophers-----")
val diningPhilosopherLogger = observingActor()
philosophyTime(n, diningPhilosopherLogger)
val response = CompletableDeferred<List<Pair<String, String?>>>()
diningPhilosopherLogger.send(ReadLogMessage(response))
response.await()
.filter { it.first.contains("Philosophers") }
.filter { it.second == "THINKING" }
.groupBy { it.first }
.apply {
size shouldBe n
forEach {
it.value.size shouldBe n
}
}
diningPhilosopherLogger.close()
}
}
"Philosopher eat after thinking" {
philosopherNumbers.checkAll { n ->
println("-----Seating $n philosophers-----")
val diningPhilosopherLogger = observingActor()
philosophyTime(n, diningPhilosopherLogger)
val response = CompletableDeferred<List<Pair<String, String?>>>()
diningPhilosopherLogger.send(ReadLogMessage(response))
response.await()
.filter { it.first.contains("Philosophers") }
.groupBy { it.first }
.map { it.value.map { it.second }.iterator() }
.forEach {
while (it.hasNext()) {
val x = it.next()
if (it.hasNext()) {
x shouldBe "THINKING"
} else {
x shouldBe "FINISHED"
break
}
it.next() shouldBe "HUNGRY"
it.next() shouldBe "EATING"
}
}
diningPhilosopherLogger.close()
}
}
"Forks are not obtained when still used" {
philosopherNumbers.checkAll { n ->
println("-----Seating $n philosophers-----")
val diningPhilosopherLogger = actor<LogMessage> {
val forks = HashMap<String, String?>()
for (msg in channel) {
when {
msg.id.contains("Fork") -> {
if (msg.state != null && forks[msg.id] != null) {
fail("${msg.state} tried to acquire ${msg.id} which is currently acquired by ${forks[msg.id]}")
}
forks[msg.id] = msg.state
}
}
}
}
philosophyTime(n, diningPhilosopherLogger)
diningPhilosopherLogger.close()
}
}
"Philosophers obtain 2 forks before eating" {
philosopherNumbers.checkAll { n ->
println("-----Seating $n philosophers-----")
val diningPhilosopherLogger = actor<LogMessage> {
val forks = HashMap<String, String?>()
for (msg in channel) {
when {
msg.id.contains("Fork") -> forks[msg.id] = msg.state
msg.state == "EATING" -> {
if (forks.filterValues { it == msg.id }.size != 2) {
fail("${msg.id} tried eating without acquiring 2 forks first. Acquired forks: ${forks.filterValues { it == msg.id }}")
}
}
}
}
}
philosophyTime(n, diningPhilosopherLogger)
diningPhilosopherLogger.close()
}
}
"Use some parallelization".config(timeout = 40000.milliseconds) {
val measureTimeMillis = measureTimeMillis {
repeat(10) {
philosophyTime(10)
}
}
println("Finished in $measureTimeMillis")
measureTimeMillis shouldBeLessThan 35000
}
}
private fun CoroutineScope.observingActor() =
actor<ObserverMessage> {
val log = ArrayList<Pair<String, String?>>()
for (msg in channel) {
when (msg) {
is LogMessage -> log.add(Pair(msg.id, msg.state))
is ReadLogMessage -> msg.response.complete(log)
}
}
}
private suspend fun philosophyTime(number: Int, channel: SendChannel<LogMessage>? = null) = withContext(Dispatchers.Default) {
val forks = Array(number) { Fork("Fork$it") }
val philosophers = Array(number) { index ->
Philosopher(
id = "Philosopher$index",
leftFork = forks[index],
rightFork = forks[(index + 1) % number]
)
}
philosophers.forEach {
it.onChange = { philosopher, state ->
channel?.send(LogMessage(philosopher, state))
println("$philosopher is now $state")
}
}
forks.forEach {
it.onChange = { fork, philosopher ->
println(philosopher?.let { "$fork acquired by $it" } ?: "$fork released")
channel?.send(LogMessage(fork, philosopher))
}
}
philosophers.forEach {
launch { it.start() }
}
}
sealed class ObserverMessage
class ReadLogMessage(val response: CompletableDeferred<List<Pair<String, String?>>>) : ObserverMessage()
class LogMessage(val id: String, val state: String?) : ObserverMessage()
}