Skip to content

Instantly share code, notes, and snippets.

@OsoianMarcel
Last active December 15, 2025 14:39
Show Gist options
  • Select an option

  • Save OsoianMarcel/5f003d09020eeeaa922042dd3a410b8a to your computer and use it in GitHub Desktop.

Select an option

Save OsoianMarcel/5f003d09020eeeaa922042dd3a410b8a to your computer and use it in GitHub Desktop.
Exploring Go channels and the fan-in concurrency pattern.
package main
import (
"context"
"fmt"
"sync"
"time"
)
// streamString demonstrates manual cancellation without context.
// It supports manual cancellation via a returned cancel function.
func streamString(s string) (<-chan string, func()) {
out := make(chan string)
cancel := make(chan struct{})
go func() {
defer close(out)
for _, c := range s {
select {
case out <- string(c):
case <-cancel:
return
}
}
}()
return out, func() {
close(cancel)
}
}
// streamStringWithContext generates a channel that emits each character of the given string.
// It respects a context for cancellation, exiting early if the context is done.
func streamStringWithContext(ctx context.Context, s string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for _, c := range s {
select {
case out <- string(c):
case <-ctx.Done():
return
}
}
}()
return out
}
// fanIn merges multiple string channels into a single output channel.
// It launches one goroutine per input channel and closes the output channel
// once all input channels are drained or the context is canceled.
func fanIn(ctx context.Context, sources ...<-chan string) <-chan string {
var wg sync.WaitGroup
out := make(chan string)
for _, src := range sources {
wg.Add(1)
go func(c <-chan string) {
defer wg.Done()
for v := range c {
select {
case out <- v:
case <-ctx.Done():
return
}
}
}(src)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
ctx := context.Background()
// reduce 1000 to 100 to see the timeout behavior
ctx, cancel := context.WithTimeout(ctx, time.Microsecond*1000)
defer cancel()
out1 := streamStringWithContext(ctx, "what a nice day")
out2 := streamStringWithContext(ctx, "wish you happy go coding")
all := fanIn(ctx, out1, out2)
// uncomment to try manual cancellation instead of context.
// out, cancel := streamString("try me, I am also nice")
// defer cancel()
// count how many characters were received before context timeout
count := 0
for s := range all {
count++
fmt.Println(s)
}
fmt.Printf("len: %d\n", count)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment