Channels in Kotlin — part two

Communication essentials for Coroutines

Bob Dahlberg
Analytics Vidhya

--

Photo by Taylor Vick on Unsplash

In the last article, we went through what a Channel is and the four types of Channels. Let’s get more practical and see when, how, and different ways of using Channels.

Not Rx

First things first, we need to establish the difference from Rx so that we can focus on Channels rather than a replacement for Rx. The most important thing to remember is that a Channel, by nature, is not broadcasting its values. Once a receiver has acquired some value from the Channel, it is removed from the Channel. Look at the examples of fan in and fan out where it gets clearer.

Fan out

Let’s start with a scenario. You have a list of addresses and need to do a lookup for each and every one of them to get their corresponding GPS coordinates from some web-service. In this scenario, it’s not optimal to do it sequentially. So we want to fan out the list to multiple coroutines to be able to work on several addresses concurrently. If we start at the producing end, we need a list and send all values to a channel.

val channel = Channel<String>()
val addresses = listOf(
"Fasanv 34, 11111",
"Publikv 10, 22222",
...
)
launch(Dispatchers.Default) {
addresses.forEach {
channel.send(it)
}
channel.close()
}

Now we have a channel that gets all the addresses one by one. And as we know by now, it’s of type rendevous, meaning it will suspend until someone receives each value. Next, we consume the addresses from our Channel.

launch(Dispatchers.Default) {
for
(address in channel) {
val gps = fetchCoordinates(address)
saveCoordinates(address, gps)
}
}

And now, we have the entire flow sequentially. I also used a for-loop to iterate over the Channel. It takes care of all the suspending for us and when the Channel has closed, the for-loop exits. That’s one of the subtle things I love with Kotlin, that every construct that in abstraction is iterable, also follows the conventions and can be used as any other iterable in the language.

As another side note, this was where I started to really grasp the sequentiality of suspending functions and coroutines. A hard task after at least 15 years of asynchronicity with callbacks of some sort.

I’m digressing, back to the scenario, and how to make it concurrent. Well, it’s as easy as creating several coroutines to do the same thing, like this.

repeat(5) { // iterates 5 times -> creating 5 coroutines
launch(Dispatchers.Default) {
for
(address in channel) {
val gps = fetchCoordinates(address)
saveCoordinates(address, gps)
}
}
}

I like to use the repeat function instead of a for-loop when I know the number of iterations I want, here we iterate five times, thus creating five coroutines. Now the five different coroutines iterate over the same Channel, each one suspending until it’s their turn to receive a value. And this is also one of the things where the Channel excels. You don’t have to care about synchronization, threading, or suspending forever. Even if you just send one item over the Channel, when you close it, all five coroutines, and their corresponding five for-loops will all exit.

We can also update the code to be even more Channel idiomatic, using produce- and consumeEach-functions. Here’s the updated version.

val addresses = listOf(
"Fasanv 34, 11111",
"Publikv 10, 22222",
...
)
val channel = produce {
addresses.forEach{ send(it) }
}
repeat(5) { // iterates 5 times -> creating 5 coroutines
launch(Dispatchers.Default) {
channel.consumeEach { address ->
val
gps = fetchCoordinates(address)
saveCoordinates(address, gps)
}
}
}

The consumeEach extension-function is almost identical to the for-loop and not that interesting to look into. It might come in the future since it’s still an experimental API.

But the produce extension-function is a beauty. What it does is, it creates a new Channel for us and returns it as a ReceiverChannel, which encapsulates the sending functionality within the produce-function. Outside of the closure, we can only receive from the Channel. The produce-function automatically closes the Channel when the closure is done. So you won’t accidentally end up with a coroutine and Channel that never terminates.

Fan-in

Not surprisingly, we can also fan in data from many different coroutines back into a single one with the help of channels, a so-called fan-in. Let’s stick to the previous example and build on it. Instead of saving the coordinates, we want to be able to present them when they’re done.

In most environments with graphical rendering, you need to present your data from a specific thread, often called the UI-thread or the main thread. So to gather all data from all coroutines running on an arbitrary amount of threads that we don’t control, at least not in this example, we need to fan them into a single thread again. For that, we need a new Channel.

val receiveChannel = Channel<Pair<String, GPS>>()

Now we have a way of sending from multiple coroutines (and possibly threads) and consume them in just one single coroutine. We are iterating on our previous function that saved the coordinates to instead send them over the new Channel.

...
channel.consumeEach { address ->
val
gps = fetchCoordinates(address)
receiveChannel.send(address to gps)
}
receiveChannel.close() // don't forget to close
...

All addresses, accompanied by their corresponding GPS coordinates, are now sent from multiple coroutines to the receiveChannel. So basically, all heavy lifting for the fan in is done, neath, huh? Now we need to consume that Channel from a coroutine that will display it to the user, and you should be familiar with how by now.

// this runs on the main thread - inherited from runBlocking
launch
{
receiveChannel.consumeEach { (address, gps) ->
println("$address -> $gps")
}
}

And now we’re done with the fan in as well. Now we have a complete working application that uses a typical pattern for concurrency. It can also be used for parallelism, but that demands a more concrete implementation more tightly coupled to the hardware, and I won’t go into that now.

This is quite a small example, but it shows the strengths of Channels in Kotlin. We have split a stream of values to be worked on concurrently, and we have gathered them back to a single coroutine, without any thoughts of threading nor synchronization. That is the real power of Channels, in my opinion. They make it simple to synchronize your coroutines into a system. Here’s a small diagram of our code and the full example.

Fully functional example from above

Beware of changes

Some of the functions shown are experimental and might be subject to change in the future. And if you dig deeper into the Channel API, you can see a lot of deprecated and obsolete functions like filter and take to name a few. They have been deprecated in favor of Flow to keep the Channel concise and straightforward and leave the battle against Rx to Flow instead.

The APIs also evolve, another obsolete function is the actor-builder, which works similar to the produce-builder but is set out to represent an actor where the Channel becomes the actors’ mailbox. This is an excellent use of channels, and it’s only obsolete because they’re building a better one, a more complex actor-builder. And only the future will tell if it will become a new builder (similar to now) or an own implementation, such as Flow.

Final thoughts

I hope these two articles help you understand Channels and also to help you mastering coroutines and suspending functions. It certainly helped me reason about asynchronicity in a sequential way.

First part of the article: https://medium.com/@dahlberg.bob/channels-in-kotlin-part-one-594ba12dcb5a

--

--

Bob Dahlberg
Analytics Vidhya

Lead Developer at Qvik, Coach, Agile Thinker, GDG Lead.