When I first started using Go, I did not study the language to the extent I should have. It was only relatively recently that I delved deeper into what makes Go a fantastic language for building a large variety of software. With that said, in these blissfully ignorant early days of my Go journey, I often missed a proper stream API. You know, like the ones found in Java 8+ and many other languages.
In this article, we will write our stream API, which will be extensible and beautifully simple. There will, of course, be room for improvement (there always is), so don’t hesitate to share your ideas.
Creating a Stream
We first need to write the function that transforms a slice into a read-only channel.
func Stream[T any](ctx context.Context, in []T) <-chan T {
out := make(chan T)
go func(){
defer close(out)
for _, element := range in {
select {
case <-ctx.Done():
return
case out<-element:
}
}
}()
return out
}
This code does not do a lot, as you can see. We add a context.Context parameter to the function header, which will let us identify a cancelled operation and respond to it by immediately closing the stream. The general building blocks of this function will be used in almost all other functions to follow, so make sure that you read through it and understand what is going on.
Filtering Data
An everyday use case for stream APIs is filtering data flowing through it, so let us look at how we would achieve this.
func Filter[T any](ctx context.Context, predicate func(T) bool, in <-chan T) <-chan T {
out := make(chan T)
go func(){
defer close(out)
for element := range in {
if predicate(element) {
select {
case <-ctx.Done():
return
case out <- element:
}
}
}
}()
return out
}
Again, notice that the general structure of this function looks essentially the same as for the Stream(…) function. However, we are reading from an incoming channel instead of iterating over a slice. Wondering how we will wire these together nicely and succinctly? Don’t worry, I will cover that shortly but let’s first take a look at transforming data going through the stream.
Transforming
Until now, we have not been modifying the data. We have streamed them and implemented the ability to filter the stream. But let us now take a look at transformations.
func Transform[I any, O any](ctx context.Context, transformer func(I) O, in <-chan I) <-chan O{
out := make(chan O)
go func(){
defer close(out)
for _, element := range in {
select {
case <-ctx.Done():
return
case out <- transformer(element):
}
}
}()
return out
}
Notice how we now have two generic types, I (input) and O (output), as well as a new type of function being passed which takes an argument of the input type and produces an argument of the output type. You could, of course, allow this function to return errors as well, but then you would have to worry about handling those errors, which is not the purpose of this article.
Collecting
Great! We can now start to do some basic processing on our data. But how do we get it back as a slice again? Let us have a look.
func Collect[T any](ctx context.Context, in <-chan T) []T {
out := make([]T, 0)
for element := range in {
select {
case <-ctx.Done():
return out
default:
out = append(out, element)
}
}
return out
}
As easy as that! Nothing much worth noting going on here, so let’s get cracking at wiring all of this together.
Bringing It All Together
Now that we have all the building blocks for our basic stream API, let’s look at how to wire everything together. As you have seen in the functions defined in previous sections, we are always passing in a read-only channel to any intermediate function. This is the key to wiring it all together.
In this example, we will start with a slice of integers, and the goal is to transform the elements of this slice into a new slice where each element is the result of dividing 100 by the original element. This means that we must first filter out 0s present in the original data, which in turn means that we will be using every function we have defined earlier in the article, convenient! Let’s have a look at how to do it.
func main() {
data := []int{0, 2, 4, 6, 8}
ctx := context.Background()
result := Collect(ctx, Transform(ctx, func(n int) int {
return 100 / n
}, Filter(ctx, func(n int) bool {
return n != 0
}, Stream(ctx, data))))
fmt.Printf("%v", result)
}
As you can see, streaming the data variable and passing the result into a filter will remove any 0s from the stream. The filtered data is then fed into a transformer that mutates the data, which is finally collected into a slice. The output, as you might guess, becomes [50 25 16 12].
If you want to clean it up a bit, you can pass function references instead of inline functions, which gives us the following result:
func main() {
data := []int{0, 2, 4, 6, 8}
ctx := context.Background()
result := Collect(ctx, Transform(ctx, Dividend(100), Filter(ctx, NonZero, Stream(ctx, data))))
fmt.Printf("%v", result)
}
func NonZero(n int) bool {
return n != 0
}
func Dividend(divisor int) func(int) int {
return func(n int) int {
return divisor / n
}
}
Which, of course, yields the same output.
Wrapping Up
Amazing! We’re done! You have now started on your stream API in Go. I hope the examples and core functions have inspired you to extend the API with more features and perhaps some possibilities for error handling.
I am quite inconsistent with creating content, but I hope to continue writing articles here on anything I make up or come across that I’d like to share with the world. So keep an eye out. Next up might be a continuation of this article that discusses our options for introducing error handling to the API.
Writing a Stream API in Go was originally published in Better Programming on Medium, where people are continuing the conversation by highlighting and responding to this story.