Last active
December 15, 2025 14:39
-
-
Save OsoianMarcel/5f003d09020eeeaa922042dd3a410b8a to your computer and use it in GitHub Desktop.
Exploring Go channels and the fan-in concurrency pattern.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "context" | |
| "fmt" | |
| "sync" | |
| "time" | |
| ) | |
| // streamString demonstrates manual cancellation without context. | |
| // It supports manual cancellation via a returned cancel function. | |
| func streamString(s string) (<-chan string, func()) { | |
| out := make(chan string) | |
| cancel := make(chan struct{}) | |
| go func() { | |
| defer close(out) | |
| for _, c := range s { | |
| select { | |
| case out <- string(c): | |
| case <-cancel: | |
| return | |
| } | |
| } | |
| }() | |
| return out, func() { | |
| close(cancel) | |
| } | |
| } | |
| // streamStringWithContext generates a channel that emits each character of the given string. | |
| // It respects a context for cancellation, exiting early if the context is done. | |
| func streamStringWithContext(ctx context.Context, s string) <-chan string { | |
| out := make(chan string) | |
| go func() { | |
| defer close(out) | |
| for _, c := range s { | |
| select { | |
| case out <- string(c): | |
| case <-ctx.Done(): | |
| return | |
| } | |
| } | |
| }() | |
| return out | |
| } | |
| // fanIn merges multiple string channels into a single output channel. | |
| // It launches one goroutine per input channel and closes the output channel | |
| // once all input channels are drained or the context is canceled. | |
| func fanIn(ctx context.Context, sources ...<-chan string) <-chan string { | |
| var wg sync.WaitGroup | |
| out := make(chan string) | |
| for _, src := range sources { | |
| wg.Add(1) | |
| go func(c <-chan string) { | |
| defer wg.Done() | |
| for v := range c { | |
| select { | |
| case out <- v: | |
| case <-ctx.Done(): | |
| return | |
| } | |
| } | |
| }(src) | |
| } | |
| go func() { | |
| wg.Wait() | |
| close(out) | |
| }() | |
| return out | |
| } | |
| func main() { | |
| ctx := context.Background() | |
| // reduce 1000 to 100 to see the timeout behavior | |
| ctx, cancel := context.WithTimeout(ctx, time.Microsecond*1000) | |
| defer cancel() | |
| out1 := streamStringWithContext(ctx, "what a nice day") | |
| out2 := streamStringWithContext(ctx, "wish you happy go coding") | |
| all := fanIn(ctx, out1, out2) | |
| // uncomment to try manual cancellation instead of context. | |
| // out, cancel := streamString("try me, I am also nice") | |
| // defer cancel() | |
| // count how many characters were received before context timeout | |
| count := 0 | |
| for s := range all { | |
| count++ | |
| fmt.Println(s) | |
| } | |
| fmt.Printf("len: %d\n", count) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment