Advanced Go Concurrency Patterns (original) (raw)

Video

This talk was presented at Google I/O in May 2013.

Watch the talk on YouTube

2

Get ready

3

Go supports concurrency

In the language and runtime, not a library.

This changes how you structure your programs.

4

Goroutines and Channels

Goroutines are independently executing functions in the same address space.

Channels are typed values that allow goroutines to synchronize and exchange information.

c := make(chan int) go func() { c <- 3 }() n := <-c

For more on the basics, watch Go Concurrency Patterns (Pike, 2012).

5

Example: ping-pong

// +build ignore,OMIT

package main

import ( "fmt" "time" )

type Ball struct{ hits int }

func main() { table := make(chan *Ball) go player("ping", table) go player("pong", table)

table <- new(Ball) // game on; toss the ball
time.Sleep(1 * time.Second)
<-table // game over; grab the ball

}

func player(name string, table chan *Ball) { for { ball := <-table ball.hits++ fmt.Println(name, ball.hits) time.Sleep(100 * time.Millisecond) table <- ball } }

6

Deadlock detection

// +build ignore,OMIT

package main

import ( "fmt" "time" )

type Ball struct{ hits int }

func main() { table := make(chan *Ball) go player("ping", table) go player("pong", table)

**// table <- new(Ball) // game on; toss the ball**
time.Sleep(1 * time.Second)
<-table // game over; grab the ball

}

func player(name string, table chan *Ball) { for { ball := <-table ball.hits++ fmt.Println(name, ball.hits) time.Sleep(100 * time.Millisecond) table <- ball } }

7

Panic dumps the stacks

// +build ignore,OMIT

package main

import ( "fmt" "time" )

type Ball struct{ hits int }

func main() { table := make(chan *Ball) go player("ping", table) go player("pong", table)

table <- new(Ball) // game on; toss the ball
time.Sleep(1 * time.Second)
<-table // game over; grab the ball

**panic("show me the stacks")**

}

func player(name string, table chan *Ball) { for { ball := <-table ball.hits++ fmt.Println(name, ball.hits) time.Sleep(100 * time.Millisecond) table <- ball } }

8

It's easy to go, but how to stop?

Long-lived programs need to clean up.

Let's look at how to write programs that handle communication, periodic events, and cancellation.

The core is Go's select statement: like a switch, but the decision is made based on the ability to communicate.

select { case xc <- x: // sent x on xc case y := <-yc: // received y from yc }

9

Example: feed reader

My favorite feed reader disappeared. I need a new one.

Why not write one?

Where do we start?

10

Find an RSS client

Searching pkg.go.dev for "rss" turns up several hits, including one that provides:

// Fetch fetches Items for uri and returns the time when the next // fetch should be attempted. On failure, Fetch returns an error. func Fetch(uri string) (items []Item, next time.Time, err error)

type Item struct{ Title, Channel, GUID string // a subset of RSS fields }

But I want a stream:

And I want multiple subscriptions.

11

Here's what we have

type Fetcher interface { Fetch() (items []Item, next time.Time, err error) }

func Fetch(domain string) Fetcher {...} // fetches Items from domain

12

Here's what we want

type Subscription interface { Updates() <-chan Item // stream of Items Close() error // shuts down the stream }

func Subscribe(fetcher Fetcher) Subscription {...} // converts Fetches to a stream

func Merge(subs ...Subscription) Subscription {...} // merges several streams

13

Example

//go:build ignore && OMIT // +build ignore,OMIT

// fakemain runs the Subscribe example with a fake RSS fetcher. package main

import ( "fmt" "math/rand" "time" )

// STARTITEM OMIT // An Item is a stripped-down RSS item. type Item struct{ Title, Channel, GUID string }

// STOPITEM OMIT

// STARTFETCHER OMIT // A Fetcher fetches Items and returns the time when the next fetch should be // attempted. On failure, Fetch returns a non-nil error. type Fetcher interface { Fetch() (items []Item, next time.Time, err error) }

// STOPFETCHER OMIT

// STARTSUBSCRIPTION OMIT // A Subscription delivers Items over a channel. Close cancels the // subscription, closes the Updates channel, and returns the last fetch error, // if any. type Subscription interface { Updates() <-chan Item Close() error }

// STOPSUBSCRIPTION OMIT

// STARTSUBSCRIBE OMIT // Subscribe returns a new Subscription that uses fetcher to fetch Items. func Subscribe(fetcher Fetcher) Subscription { s := &sub{ fetcher: fetcher, updates: make(chan Item), // for Updates closing: make(chan chan error), // for Close } go s.loop() return s }

// STOPSUBSCRIBE OMIT

// sub implements the Subscription interface. type sub struct { fetcher Fetcher // fetches items updates chan Item // sends items to the user closing chan chan error // for Close }

// STARTUPDATES OMIT func (s *sub) Updates() <-chan Item { return s.updates }

// STOPUPDATES OMIT

// STARTCLOSE OMIT // STARTCLOSESIG OMIT func (s *sub) Close() error { // STOPCLOSESIG OMIT errc := make(chan error) s.closing <- errc // HLchan return <-errc // HLchan }

// STOPCLOSE OMIT

// loopCloseOnly is a version of loop that includes only the logic // that handles Close. func (s *sub) loopCloseOnly() { // STARTCLOSEONLY OMIT var err error // set when Fetch fails for { select { case errc := <-s.closing: // HLchan errc <- err // HLchan close(s.updates) // tells receiver we're done return } } // STOPCLOSEONLY OMIT }

// loopFetchOnly is a version of loop that includes only the logic // that calls Fetch. func (s *sub) loopFetchOnly() { // STARTFETCHONLY OMIT var pending []Item // appended by fetch; consumed by send var next time.Time // initially January 1, year 0 var err error for { var fetchDelay time.Duration // initially 0 (no delay) if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay)

    select {
    case <-startFetch:
        var fetched []Item
        fetched, next, err = s.fetcher.Fetch()
        if err != nil {
            next = time.Now().Add(10 * time.Second)
            break
        }
        pending = append(pending, fetched...)
    }
}
// STOPFETCHONLY OMIT

}

// loopSendOnly is a version of loop that includes only the logic for // sending items to s.updates. func (s *sub) loopSendOnly() { // STARTSENDONLY OMIT var pending []Item // appended by fetch; consumed by send for { var first Item var updates chan Item // HLupdates if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case // HLupdates }

    select {
    case updates <- first:
        pending = pending[1:]
    }
}
// STOPSENDONLY OMIT

}

// mergedLoop is a version of loop that combines loopCloseOnly, // loopFetchOnly, and loopSendOnly. func (s *sub) mergedLoop() { // STARTFETCHVARS OMIT var pending []Item var next time.Time var err error // STOPFETCHVARS OMIT for { // STARTNOCAP OMIT var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay) // STOPNOCAP OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case }

    // STARTSELECT OMIT
    select {
    case errc := <-s.closing: // HLcases
        errc <- err
        close(s.updates)
        return
        // STARTFETCHCASE OMIT
    case <-startFetch: // HLcases
        var fetched []Item
        fetched, next, err = s.fetcher.Fetch() // HLfetch
        if err != nil {
            next = time.Now().Add(10 * time.Second)
            break
        }
        pending = append(pending, fetched...) // HLfetch
        // STOPFETCHCASE OMIT
    case updates <- first: // HLcases
        pending = pending[1:]
    }
    // STOPSELECT OMIT
}

}

// dedupeLoop extends mergedLoop with deduping of fetched items. func (s *sub) dedupeLoop() { const maxPending = 10 // STARTSEEN OMIT var pending []Item var next time.Time var err error var seen = make(map[string]bool) // set of item.GUIDs // HLseen // STOPSEEN OMIT for { // STARTCAP OMIT var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } var startFetch <-chan time.Time // HLcap if len(pending) < maxPending { // HLcap startFetch = time.After(fetchDelay) // enable fetch case // HLcap } // HLcap // STOPCAP OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } select { case errc := <-s.closing: errc <- err close(s.updates) return // STARTDEDUPE OMIT case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() // HLfetch if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if !seen[item.GUID] { // HLdupe pending = append(pending, item) // HLdupe seen[item.GUID] = true // HLdupe } // HLdupe } // STOPDEDUPE OMIT case updates <- first: pending = pending[1:] } } }

// loop periodically fetches Items, sends them on s.updates, and exits // when Close is called. It extends dedupeLoop with logic to run // Fetch asynchronously. func (s *sub) loop() { const maxPending = 10 type fetchResult struct { fetched []Item next time.Time err error } // STARTFETCHDONE OMIT var fetchDone chan fetchResult // if non-nil, Fetch is running // HL // STOPFETCHDONE OMIT var pending []Item var next time.Time var err error var seen = make(map[string]bool) for { var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } // STARTFETCHIF OMIT var startFetch <-chan time.Time if fetchDone == nil && len(pending) < maxPending { // HLfetch startFetch = time.After(fetchDelay) // enable fetch case } // STOPFETCHIF OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } // STARTFETCHASYNC OMIT select { case <-startFetch: // HLfetch fetchDone = make(chan fetchResult, 1) // HLfetch go func() { fetched, next, err := s.fetcher.Fetch() fetchDone <- fetchResult{fetched, next, err} }() case result := <-fetchDone: // HLfetch fetchDone = nil // HLfetch // Use result.fetched, result.next, result.err // STOPFETCHASYNC OMIT fetched := result.fetched next, err = result.next, result.err if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if id := item.GUID; !seen[id] { // HLdupe pending = append(pending, item) seen[id] = true // HLdupe } } case errc := <-s.closing: errc <- err close(s.updates) return case updates <- first: pending = pending[1:] } } }

// naiveMerge is a version of Merge that doesn't quite work right. In // particular, the goroutines it starts may block forever on m.updates // if the receiver stops receiving. type naiveMerge struct { subs []Subscription updates chan Item }

// STARTNAIVEMERGE OMIT func NaiveMerge(subs ...Subscription) Subscription { m := &naiveMerge{ subs: subs, updates: make(chan Item), } // STARTNAIVEMERGELOOP OMIT for _, sub := range subs { go func(s Subscription) { for it := range s.Updates() { m.updates <- it // HL } }(sub) } // STOPNAIVEMERGELOOP OMIT return m }

// STOPNAIVEMERGE OMIT

// STARTNAIVEMERGECLOSE OMIT func (m *naiveMerge) Close() (err error) { for _, sub := range m.subs { if e := sub.Close(); err == nil && e != nil { err = e } } close(m.updates) // HL return }

// STOPNAIVEMERGECLOSE OMIT

func (m *naiveMerge) Updates() <-chan Item { return m.updates }

type merge struct { subs []Subscription updates chan Item quit chan struct{} errs chan error }

// STARTMERGESIG OMIT // Merge returns a Subscription that merges the item streams from subs. // Closing the merged subscription closes subs. func Merge(subs ...Subscription) Subscription { // STOPMERGESIG OMIT m := &merge{ subs: subs, updates: make(chan Item), quit: make(chan struct{}), errs: make(chan error), } // STARTMERGE OMIT for _, sub := range subs { go func(s Subscription) { for { var it Item select { case it = <-s.Updates(): case <-m.quit: // HL m.errs <- s.Close() // HL return // HL } select { case m.updates <- it: case <-m.quit: // HL m.errs <- s.Close() // HL return // HL } } }(sub) } // STOPMERGE OMIT return m }

func (m *merge) Updates() <-chan Item { return m.updates }

// STARTMERGECLOSE OMIT func (m *merge) Close() (err error) { close(m.quit) // HL for _ = range m.subs { if e := <-m.errs; e != nil { // HL err = e } } close(m.updates) // HL return }

// STOPMERGECLOSE OMIT

// NaiveDedupe converts a stream of Items that may contain duplicates // into one that doesn't. func NaiveDedupe(in <-chan Item) <-chan Item { out := make(chan Item) go func() { seen := make(map[string]bool) for it := range in { if !seen[it.GUID] { // BUG: this send blocks if the // receiver closes the Subscription // and stops receiving. out <- it // HL seen[it.GUID] = true } } close(out) }() return out }

type deduper struct { s Subscription updates chan Item closing chan chan error }

// Dedupe converts a Subscription that may send duplicate Items into // one that doesn't. func Dedupe(s Subscription) Subscription { d := &deduper{ s: s, updates: make(chan Item), closing: make(chan chan error), } go d.loop() return d }

func (d *deduper) loop() { in := d.s.Updates() // enable receive var pending Item var out chan Item // disable send seen := make(map[string]bool) for { select { case it := <-in: if !seen[it.GUID] { pending = it in = nil // disable receive out = d.updates // enable send seen[it.GUID] = true } case out <- pending: in = d.s.Updates() // enable receive out = nil // disable send case errc := <-d.closing: err := d.s.Close() errc <- err close(d.updates) return } } }

func (d *deduper) Close() error { errc := make(chan error) d.closing <- errc return <-errc }

func (d *deduper) Updates() <-chan Item { return d.updates }

// Fetch returns a Fetcher for Items from domain. func Fetch(domain string) Fetcher { return fakeFetch(domain) }

func fakeFetch(domain string) Fetcher { return &fakeFetcher{channel: domain} }

type fakeFetcher struct { channel string items []Item }

// FakeDuplicates causes the fake fetcher to return duplicate items. var FakeDuplicates bool

func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) { now := time.Now() next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond) item := Item{ Channel: f.channel, Title: fmt.Sprintf("Item %d", len(f.items)), } item.GUID = item.Channel + "/" + item.Title f.items = append(f.items, item) if FakeDuplicates { items = f.items } else { items = []Item{item} } return }

func init() { rand.Seed(time.Now().UnixNano()) }

// STARTMAIN OMIT

func main() { // Subscribe to some feeds, and create a merged update stream. merged := Merge( Subscribe(Fetch("blog.golang.org")), Subscribe(Fetch("googleblog.blogspot.com")), Subscribe(Fetch("googledevelopers.blogspot.com")))

// Close the subscriptions after some time.
time.AfterFunc(3*time.Second, func() {
    fmt.Println("closed:", merged.Close())
})

// Print the stream.
for it := range merged.Updates() {
    fmt.Println(it.Channel, it.Title)
}

panic("show me the stacks")

}

// STOPMAIN OMIT

14

Subscribe

Subscribe creates a new Subscription that repeatedly fetches items until Close is called.

func Subscribe(fetcher Fetcher) Subscription { s := &sub{ fetcher: fetcher, updates: make(chan Item), // for Updates } go s.loop() return s }

// sub implements the Subscription interface. type sub struct { fetcher Fetcher // fetches items updates chan Item // delivers items to the user }

// loop fetches items using s.fetcher and sends them // on s.updates. loop exits when s.Close is called. func (s *sub) loop() {...}

15

Implementing Subscription

To implement the Subscription interface, define Updates and Close.

func (s *sub) Updates() <-chan Item { return s.updates }

func (s *sub) Close() error { // TODO: make loop exit // TODO: find out about any error return err }

16

What does loop do?

Naive Implementation

// naivemain runs the Subscribe example with the naive Subscribe // implementation and a fake RSS fetcher. //go:build ignore && OMIT // +build ignore,OMIT

package main

import ( "fmt" "math/rand" "time" )

// STARTITEM OMIT // An Item is a stripped-down RSS item. type Item struct{ Title, Channel, GUID string }

// STOPITEM OMIT

// STARTFETCHER OMIT // A Fetcher fetches Items and returns the time when the next fetch should be // attempted. On failure, Fetch returns a non-nil error. type Fetcher interface { Fetch() (items []Item, next time.Time, err error) }

// STOPFETCHER OMIT

// STARTSUBSCRIPTION OMIT // A Subscription delivers Items over a channel. Close cancels the // subscription, closes the Updates channel, and returns the last fetch error, // if any. type Subscription interface { Updates() <-chan Item Close() error }

// STOPSUBSCRIPTION OMIT

// STARTSUBSCRIBE OMIT // Subscribe returns a new Subscription that uses fetcher to fetch Items. func Subscribe(fetcher Fetcher) Subscription { s := &sub{ fetcher: fetcher, updates: make(chan Item), // for Updates closing: make(chan chan error), // for Close } go s.loop() return s }

// STOPSUBSCRIBE OMIT

// sub implements the Subscription interface. type sub struct { fetcher Fetcher // fetches items updates chan Item // sends items to the user closing chan chan error // for Close }

// STARTUPDATES OMIT func (s *sub) Updates() <-chan Item { return s.updates }

// STOPUPDATES OMIT

// STARTCLOSE OMIT // STARTCLOSESIG OMIT func (s *sub) Close() error { // STOPCLOSESIG OMIT errc := make(chan error) s.closing <- errc // HLchan return <-errc // HLchan }

// STOPCLOSE OMIT

// loopCloseOnly is a version of loop that includes only the logic // that handles Close. func (s *sub) loopCloseOnly() { // STARTCLOSEONLY OMIT var err error // set when Fetch fails for { select { case errc := <-s.closing: // HLchan errc <- err // HLchan close(s.updates) // tells receiver we're done return } } // STOPCLOSEONLY OMIT }

// loopFetchOnly is a version of loop that includes only the logic // that calls Fetch. func (s *sub) loopFetchOnly() { // STARTFETCHONLY OMIT var pending []Item // appended by fetch; consumed by send var next time.Time // initially January 1, year 0 var err error for { var fetchDelay time.Duration // initially 0 (no delay) if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay)

    select {
    case <-startFetch:
        var fetched []Item
        fetched, next, err = s.fetcher.Fetch()
        if err != nil {
            next = time.Now().Add(10 * time.Second)
            break
        }
        pending = append(pending, fetched...)
    }
}
// STOPFETCHONLY OMIT

}

// loopSendOnly is a version of loop that includes only the logic for // sending items to s.updates. func (s *sub) loopSendOnly() { // STARTSENDONLY OMIT var pending []Item // appended by fetch; consumed by send for { var first Item var updates chan Item // HLupdates if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case // HLupdates }

    select {
    case updates <- first:
        pending = pending[1:]
    }
}
// STOPSENDONLY OMIT

}

// mergedLoop is a version of loop that combines loopCloseOnly, // loopFetchOnly, and loopSendOnly. func (s *sub) mergedLoop() { // STARTFETCHVARS OMIT var pending []Item var next time.Time var err error // STOPFETCHVARS OMIT for { // STARTNOCAP OMIT var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay) // STOPNOCAP OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case }

    // STARTSELECT OMIT
    select {
    case errc := <-s.closing: // HLcases
        errc <- err
        close(s.updates)
        return
        // STARTFETCHCASE OMIT
    case <-startFetch: // HLcases
        var fetched []Item
        fetched, next, err = s.fetcher.Fetch() // HLfetch
        if err != nil {
            next = time.Now().Add(10 * time.Second)
            break
        }
        pending = append(pending, fetched...) // HLfetch
        // STOPFETCHCASE OMIT
    case updates <- first: // HLcases
        pending = pending[1:]
    }
    // STOPSELECT OMIT
}

}

// dedupeLoop extends mergedLoop with deduping of fetched items. func (s *sub) dedupeLoop() { const maxPending = 10 // STARTSEEN OMIT var pending []Item var next time.Time var err error var seen = make(map[string]bool) // set of item.GUIDs // HLseen // STOPSEEN OMIT for { // STARTCAP OMIT var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } var startFetch <-chan time.Time // HLcap if len(pending) < maxPending { // HLcap startFetch = time.After(fetchDelay) // enable fetch case // HLcap } // HLcap // STOPCAP OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } select { case errc := <-s.closing: errc <- err close(s.updates) return // STARTDEDUPE OMIT case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() // HLfetch if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if !seen[item.GUID] { // HLdupe pending = append(pending, item) // HLdupe seen[item.GUID] = true // HLdupe } // HLdupe } // STOPDEDUPE OMIT case updates <- first: pending = pending[1:] } } }

// loop periodically fetches Items, sends them on s.updates, and exits // when Close is called. It extends dedupeLoop with logic to run // Fetch asynchronously. func (s *sub) loop() { const maxPending = 10 type fetchResult struct { fetched []Item next time.Time err error } // STARTFETCHDONE OMIT var fetchDone chan fetchResult // if non-nil, Fetch is running // HL // STOPFETCHDONE OMIT var pending []Item var next time.Time var err error var seen = make(map[string]bool) for { var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } // STARTFETCHIF OMIT var startFetch <-chan time.Time if fetchDone == nil && len(pending) < maxPending { // HLfetch startFetch = time.After(fetchDelay) // enable fetch case } // STOPFETCHIF OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } // STARTFETCHASYNC OMIT select { case <-startFetch: // HLfetch fetchDone = make(chan fetchResult, 1) // HLfetch go func() { fetched, next, err := s.fetcher.Fetch() fetchDone <- fetchResult{fetched, next, err} }() case result := <-fetchDone: // HLfetch fetchDone = nil // HLfetch // Use result.fetched, result.next, result.err // STOPFETCHASYNC OMIT fetched := result.fetched next, err = result.next, result.err if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if id := item.GUID; !seen[id] { // HLdupe pending = append(pending, item) seen[id] = true // HLdupe } } case errc := <-s.closing: errc <- err close(s.updates) return case updates <- first: pending = pending[1:] } } }

// naiveMerge is a version of Merge that doesn't quite work right. In // particular, the goroutines it starts may block forever on m.updates // if the receiver stops receiving. type naiveMerge struct { subs []Subscription updates chan Item }

// STARTNAIVEMERGE OMIT func NaiveMerge(subs ...Subscription) Subscription { m := &naiveMerge{ subs: subs, updates: make(chan Item), } // STARTNAIVEMERGELOOP OMIT for _, sub := range subs { go func(s Subscription) { for it := range s.Updates() { m.updates <- it // HL } }(sub) } // STOPNAIVEMERGELOOP OMIT return m }

// STOPNAIVEMERGE OMIT

// STARTNAIVEMERGECLOSE OMIT func (m *naiveMerge) Close() (err error) { for _, sub := range m.subs { if e := sub.Close(); err == nil && e != nil { err = e } } close(m.updates) // HL return }

// STOPNAIVEMERGECLOSE OMIT

func (m *naiveMerge) Updates() <-chan Item { return m.updates }

type merge struct { subs []Subscription updates chan Item quit chan struct{} errs chan error }

// STARTMERGESIG OMIT // Merge returns a Subscription that merges the item streams from subs. // Closing the merged subscription closes subs. func Merge(subs ...Subscription) Subscription { // STOPMERGESIG OMIT m := &merge{ subs: subs, updates: make(chan Item), quit: make(chan struct{}), errs: make(chan error), } // STARTMERGE OMIT for _, sub := range subs { go func(s Subscription) { for { var it Item select { case it = <-s.Updates(): case <-m.quit: // HL m.errs <- s.Close() // HL return // HL } select { case m.updates <- it: case <-m.quit: // HL m.errs <- s.Close() // HL return // HL } } }(sub) } // STOPMERGE OMIT return m }

func (m *merge) Updates() <-chan Item { return m.updates }

// STARTMERGECLOSE OMIT func (m *merge) Close() (err error) { close(m.quit) // HL for _ = range m.subs { if e := <-m.errs; e != nil { // HL err = e } } close(m.updates) // HL return }

// STOPMERGECLOSE OMIT

// NaiveDedupe converts a stream of Items that may contain duplicates // into one that doesn't. func NaiveDedupe(in <-chan Item) <-chan Item { out := make(chan Item) go func() { seen := make(map[string]bool) for it := range in { if !seen[it.GUID] { // BUG: this send blocks if the // receiver closes the Subscription // and stops receiving. out <- it // HL seen[it.GUID] = true } } close(out) }() return out }

type deduper struct { s Subscription updates chan Item closing chan chan error }

// Dedupe converts a Subscription that may send duplicate Items into // one that doesn't. func Dedupe(s Subscription) Subscription { d := &deduper{ s: s, updates: make(chan Item), closing: make(chan chan error), } go d.loop() return d }

func (d *deduper) loop() { in := d.s.Updates() // enable receive var pending Item var out chan Item // disable send seen := make(map[string]bool) for { select { case it := <-in: if !seen[it.GUID] { pending = it in = nil // disable receive out = d.updates // enable send seen[it.GUID] = true } case out <- pending: in = d.s.Updates() // enable receive out = nil // disable send case errc := <-d.closing: err := d.s.Close() errc <- err close(d.updates) return } } }

func (d *deduper) Close() error { errc := make(chan error) d.closing <- errc return <-errc }

func (d *deduper) Updates() <-chan Item { return d.updates }

// Fetch returns a Fetcher for Items from domain. func Fetch(domain string) Fetcher { return fakeFetch(domain) }

func fakeFetch(domain string) Fetcher { return &fakeFetcher{channel: domain} }

type fakeFetcher struct { channel string items []Item }

// FakeDuplicates causes the fake fetcher to return duplicate items. var FakeDuplicates bool

func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) { now := time.Now() next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond) item := Item{ Channel: f.channel, Title: fmt.Sprintf("Item %d", len(f.items)), } item.GUID = item.Channel + "/" + item.Title f.items = append(f.items, item) if FakeDuplicates { items = f.items } else { items = []Item{item} } return }

func NaiveSubscribe(fetcher Fetcher) Subscription { s := &naiveSub{ fetcher: fetcher, updates: make(chan Item), } go s.loop() return s }

type naiveSub struct { fetcher Fetcher updates chan Item closed bool err error }

func (s *naiveSub) Updates() <-chan Item { return s.updates }

func (s *naiveSub) loop() {

for {
    if s.closed {
        close(s.updates)
        return
    }
    items, next, err := s.fetcher.Fetch()
    if err != nil {
        s.err = err                 
        time.Sleep(10 * time.Second)
        continue
    }
    for _, item := range items {
        s.updates <- item
    }
    if now := time.Now(); next.After(now) {
        time.Sleep(next.Sub(now))
    }
}

}

func (s *naiveSub) Close() error { s.closed = true // HLsync return s.err // HLsync }

func init() { rand.Seed(time.Now().UnixNano()) }

func main() { // Subscribe to some feeds, and create a merged update stream. merged := Merge( NaiveSubscribe(Fetch("blog.golang.org")), NaiveSubscribe(Fetch("googleblog.blogspot.com")), NaiveSubscribe(Fetch("googledevelopers.blogspot.com")))

// Close the subscriptions after some time.
time.AfterFunc(3*time.Second, func() {
    fmt.Println("closed:", merged.Close())
})

// Print the stream.
for it := range merged.Updates() {
    fmt.Println(it.Channel, it.Title)
}

// The loops are still running.  Let the race detector notice.
time.Sleep(1 * time.Second)

panic("show me the stacks")

}

func (s *naiveSub) Close() error { s.closed = true return s.err
}

18

Bug 1: unsynchronized access to s.closed/s.err

for {
    **if s.closed {**
        close(s.updates)
        return
    }
    items, next, err := s.fetcher.Fetch()
    if err != nil {
        **s.err = err**
        time.Sleep(10 * time.Second)
        continue
    }
    for _, item := range items {
        s.updates <- item
    }
    if now := time.Now(); next.After(now) {
        time.Sleep(next.Sub(now))
    }
}

func (s *naiveSub) Close() error { s.closed = true return s.err }

19

Race Detector

go run -race naivemain.go

// naivemain runs the Subscribe example with the naive Subscribe // implementation and a fake RSS fetcher. //go:build ignore && OMIT // +build ignore,OMIT

package main

import ( "fmt" "math/rand" "time" )

// STARTITEM OMIT // An Item is a stripped-down RSS item. type Item struct{ Title, Channel, GUID string }

// STOPITEM OMIT

// STARTFETCHER OMIT // A Fetcher fetches Items and returns the time when the next fetch should be // attempted. On failure, Fetch returns a non-nil error. type Fetcher interface { Fetch() (items []Item, next time.Time, err error) }

// STOPFETCHER OMIT

// STARTSUBSCRIPTION OMIT // A Subscription delivers Items over a channel. Close cancels the // subscription, closes the Updates channel, and returns the last fetch error, // if any. type Subscription interface { Updates() <-chan Item Close() error }

// STOPSUBSCRIPTION OMIT

// STARTSUBSCRIBE OMIT // Subscribe returns a new Subscription that uses fetcher to fetch Items. func Subscribe(fetcher Fetcher) Subscription { s := &sub{ fetcher: fetcher, updates: make(chan Item), // for Updates closing: make(chan chan error), // for Close } go s.loop() return s }

// STOPSUBSCRIBE OMIT

// sub implements the Subscription interface. type sub struct { fetcher Fetcher // fetches items updates chan Item // sends items to the user closing chan chan error // for Close }

// STARTUPDATES OMIT func (s *sub) Updates() <-chan Item { return s.updates }

// STOPUPDATES OMIT

// STARTCLOSE OMIT // STARTCLOSESIG OMIT func (s *sub) Close() error { // STOPCLOSESIG OMIT errc := make(chan error) s.closing <- errc // HLchan return <-errc // HLchan }

// STOPCLOSE OMIT

// loopCloseOnly is a version of loop that includes only the logic // that handles Close. func (s *sub) loopCloseOnly() { // STARTCLOSEONLY OMIT var err error // set when Fetch fails for { select { case errc := <-s.closing: // HLchan errc <- err // HLchan close(s.updates) // tells receiver we're done return } } // STOPCLOSEONLY OMIT }

// loopFetchOnly is a version of loop that includes only the logic // that calls Fetch. func (s *sub) loopFetchOnly() { // STARTFETCHONLY OMIT var pending []Item // appended by fetch; consumed by send var next time.Time // initially January 1, year 0 var err error for { var fetchDelay time.Duration // initially 0 (no delay) if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay)

    select {
    case <-startFetch:
        var fetched []Item
        fetched, next, err = s.fetcher.Fetch()
        if err != nil {
            next = time.Now().Add(10 * time.Second)
            break
        }
        pending = append(pending, fetched...)
    }
}
// STOPFETCHONLY OMIT

}

// loopSendOnly is a version of loop that includes only the logic for // sending items to s.updates. func (s *sub) loopSendOnly() { // STARTSENDONLY OMIT var pending []Item // appended by fetch; consumed by send for { var first Item var updates chan Item // HLupdates if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case // HLupdates }

    select {
    case updates <- first:
        pending = pending[1:]
    }
}
// STOPSENDONLY OMIT

}

// mergedLoop is a version of loop that combines loopCloseOnly, // loopFetchOnly, and loopSendOnly. func (s *sub) mergedLoop() { // STARTFETCHVARS OMIT var pending []Item var next time.Time var err error // STOPFETCHVARS OMIT for { // STARTNOCAP OMIT var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay) // STOPNOCAP OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case }

    // STARTSELECT OMIT
    select {
    case errc := <-s.closing: // HLcases
        errc <- err
        close(s.updates)
        return
        // STARTFETCHCASE OMIT
    case <-startFetch: // HLcases
        var fetched []Item
        fetched, next, err = s.fetcher.Fetch() // HLfetch
        if err != nil {
            next = time.Now().Add(10 * time.Second)
            break
        }
        pending = append(pending, fetched...) // HLfetch
        // STOPFETCHCASE OMIT
    case updates <- first: // HLcases
        pending = pending[1:]
    }
    // STOPSELECT OMIT
}

}

// dedupeLoop extends mergedLoop with deduping of fetched items. func (s *sub) dedupeLoop() { const maxPending = 10 // STARTSEEN OMIT var pending []Item var next time.Time var err error var seen = make(map[string]bool) // set of item.GUIDs // HLseen // STOPSEEN OMIT for { // STARTCAP OMIT var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } var startFetch <-chan time.Time // HLcap if len(pending) < maxPending { // HLcap startFetch = time.After(fetchDelay) // enable fetch case // HLcap } // HLcap // STOPCAP OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } select { case errc := <-s.closing: errc <- err close(s.updates) return // STARTDEDUPE OMIT case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() // HLfetch if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if !seen[item.GUID] { // HLdupe pending = append(pending, item) // HLdupe seen[item.GUID] = true // HLdupe } // HLdupe } // STOPDEDUPE OMIT case updates <- first: pending = pending[1:] } } }

// loop periodically fetches Items, sends them on s.updates, and exits // when Close is called. It extends dedupeLoop with logic to run // Fetch asynchronously. func (s *sub) loop() { const maxPending = 10 type fetchResult struct { fetched []Item next time.Time err error } // STARTFETCHDONE OMIT var fetchDone chan fetchResult // if non-nil, Fetch is running // HL // STOPFETCHDONE OMIT var pending []Item var next time.Time var err error var seen = make(map[string]bool) for { var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } // STARTFETCHIF OMIT var startFetch <-chan time.Time if fetchDone == nil && len(pending) < maxPending { // HLfetch startFetch = time.After(fetchDelay) // enable fetch case } // STOPFETCHIF OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } // STARTFETCHASYNC OMIT select { case <-startFetch: // HLfetch fetchDone = make(chan fetchResult, 1) // HLfetch go func() { fetched, next, err := s.fetcher.Fetch() fetchDone <- fetchResult{fetched, next, err} }() case result := <-fetchDone: // HLfetch fetchDone = nil // HLfetch // Use result.fetched, result.next, result.err // STOPFETCHASYNC OMIT fetched := result.fetched next, err = result.next, result.err if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if id := item.GUID; !seen[id] { // HLdupe pending = append(pending, item) seen[id] = true // HLdupe } } case errc := <-s.closing: errc <- err close(s.updates) return case updates <- first: pending = pending[1:] } } }

// naiveMerge is a version of Merge that doesn't quite work right. In // particular, the goroutines it starts may block forever on m.updates // if the receiver stops receiving. type naiveMerge struct { subs []Subscription updates chan Item }

// STARTNAIVEMERGE OMIT func NaiveMerge(subs ...Subscription) Subscription { m := &naiveMerge{ subs: subs, updates: make(chan Item), } // STARTNAIVEMERGELOOP OMIT for _, sub := range subs { go func(s Subscription) { for it := range s.Updates() { m.updates <- it // HL } }(sub) } // STOPNAIVEMERGELOOP OMIT return m }

// STOPNAIVEMERGE OMIT

// STARTNAIVEMERGECLOSE OMIT func (m *naiveMerge) Close() (err error) { for _, sub := range m.subs { if e := sub.Close(); err == nil && e != nil { err = e } } close(m.updates) // HL return }

// STOPNAIVEMERGECLOSE OMIT

func (m *naiveMerge) Updates() <-chan Item { return m.updates }

type merge struct { subs []Subscription updates chan Item quit chan struct{} errs chan error }

// STARTMERGESIG OMIT // Merge returns a Subscription that merges the item streams from subs. // Closing the merged subscription closes subs. func Merge(subs ...Subscription) Subscription { // STOPMERGESIG OMIT m := &merge{ subs: subs, updates: make(chan Item), quit: make(chan struct{}), errs: make(chan error), } // STARTMERGE OMIT for _, sub := range subs { go func(s Subscription) { for { var it Item select { case it = <-s.Updates(): case <-m.quit: // HL m.errs <- s.Close() // HL return // HL } select { case m.updates <- it: case <-m.quit: // HL m.errs <- s.Close() // HL return // HL } } }(sub) } // STOPMERGE OMIT return m }

func (m *merge) Updates() <-chan Item { return m.updates }

// STARTMERGECLOSE OMIT func (m *merge) Close() (err error) { close(m.quit) // HL for _ = range m.subs { if e := <-m.errs; e != nil { // HL err = e } } close(m.updates) // HL return }

// STOPMERGECLOSE OMIT

// NaiveDedupe converts a stream of Items that may contain duplicates // into one that doesn't. func NaiveDedupe(in <-chan Item) <-chan Item { out := make(chan Item) go func() { seen := make(map[string]bool) for it := range in { if !seen[it.GUID] { // BUG: this send blocks if the // receiver closes the Subscription // and stops receiving. out <- it // HL seen[it.GUID] = true } } close(out) }() return out }

type deduper struct { s Subscription updates chan Item closing chan chan error }

// Dedupe converts a Subscription that may send duplicate Items into // one that doesn't. func Dedupe(s Subscription) Subscription { d := &deduper{ s: s, updates: make(chan Item), closing: make(chan chan error), } go d.loop() return d }

func (d *deduper) loop() { in := d.s.Updates() // enable receive var pending Item var out chan Item // disable send seen := make(map[string]bool) for { select { case it := <-in: if !seen[it.GUID] { pending = it in = nil // disable receive out = d.updates // enable send seen[it.GUID] = true } case out <- pending: in = d.s.Updates() // enable receive out = nil // disable send case errc := <-d.closing: err := d.s.Close() errc <- err close(d.updates) return } } }

func (d *deduper) Close() error { errc := make(chan error) d.closing <- errc return <-errc }

func (d *deduper) Updates() <-chan Item { return d.updates }

// Fetch returns a Fetcher for Items from domain. func Fetch(domain string) Fetcher { return fakeFetch(domain) }

func fakeFetch(domain string) Fetcher { return &fakeFetcher{channel: domain} }

type fakeFetcher struct { channel string items []Item }

// FakeDuplicates causes the fake fetcher to return duplicate items. var FakeDuplicates bool

func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) { now := time.Now() next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond) item := Item{ Channel: f.channel, Title: fmt.Sprintf("Item %d", len(f.items)), } item.GUID = item.Channel + "/" + item.Title f.items = append(f.items, item) if FakeDuplicates { items = f.items } else { items = []Item{item} } return }

func NaiveSubscribe(fetcher Fetcher) Subscription { s := &naiveSub{ fetcher: fetcher, updates: make(chan Item), } go s.loop() return s }

type naiveSub struct { fetcher Fetcher updates chan Item closed bool err error }

func (s *naiveSub) Updates() <-chan Item { return s.updates }

func (s *naiveSub) loop() {

for {
    **if s.closed {**
        close(s.updates)
        return
    }
    items, next, err := s.fetcher.Fetch()
    if err != nil {
        **s.err = err**

        time.Sleep(10 * time.Second) // HLsleep
        continue
    }
    for _, item := range items {
        s.updates <- item // HLsend
    }
    if now := time.Now(); next.After(now) {
        time.Sleep(next.Sub(now)) // HLsleep
    }
}
// STOPNAIVE OMIT

}

func (s *naiveSub) Close() error { s.closed = true // HLsync return s.err // HLsync }

func init() { rand.Seed(time.Now().UnixNano()) }

func main() { // Subscribe to some feeds, and create a merged update stream. merged := Merge( NaiveSubscribe(Fetch("blog.golang.org")), NaiveSubscribe(Fetch("googleblog.blogspot.com")), NaiveSubscribe(Fetch("googledevelopers.blogspot.com")))

// Close the subscriptions after some time.
time.AfterFunc(3*time.Second, func() {
    fmt.Println("closed:", merged.Close())
})

// Print the stream.
for it := range merged.Updates() {
    fmt.Println(it.Channel, it.Title)
}

// The loops are still running.  Let the race detector notice.
time.Sleep(1 * time.Second)

panic("show me the stacks")

}

func (s *naiveSub) Close() error { s.closed = true return s.err }

20

Bug 2: time.Sleep may keep loop running

for {
    if s.closed {
        close(s.updates)
        return
    }
    items, next, err := s.fetcher.Fetch()
    if err != nil {
        s.err = err                 
        **time.Sleep(10 * time.Second)**
        continue
    }
    for _, item := range items {
        s.updates <- item
    }
    if now := time.Now(); next.After(now) {
        **time.Sleep(next.Sub(now))**
    }
}

21

Bug 3: loop may block forever on s.updates

for {
    if s.closed {
        close(s.updates)
        return
    }
    items, next, err := s.fetcher.Fetch()
    if err != nil {
        s.err = err                 
        time.Sleep(10 * time.Second)
        continue
    }
    for _, item := range items {
        **s.updates <- item**
    }
    if now := time.Now(); next.After(now) {
        time.Sleep(next.Sub(now))
    }
}

22

Solution

Change the body of loop to a select with three cases:

Structure: for-select loop

loop runs in its own goroutine.

select lets loop avoid blocking indefinitely in any one state.

func (s *sub) loop() { ... declare mutable state ... for { ... set up channels for cases ... select { case <-c1: ... read/write state ... case c2 <- x: ... read/write state ... case y := <-c3: ... read/write state ... } } }

The cases interact via local state in loop.

24

Case 1: Close

Close communicates with loop via s.closing.

type sub struct { closing chan chan error }

The service (loop) listens for requests on its channel (s.closing).

The client (Close) sends a request on s.closing: exit and reply with the error

In this case, the only thing in the request is the reply channel.

25

Case 1: Close

Close asks loop to exit and waits for a response.

func (s *sub) Close() error { errc := make(chan error) s.closing <- errc return <-errc }

loop handles Close by replying with the Fetch error and exiting.

var err error // set when Fetch fails
for {
    select {
    **case errc := <-s.closing:**
        **errc <- err**
        close(s.updates) // tells receiver we're done
        return
    }
}

26

Case 2: Fetch

Schedule the next Fetch after some delay.

var pending []Item // appended by fetch; consumed by send
var next time.Time // initially January 1, year 0
var err error
for {
    var fetchDelay time.Duration // initially 0 (no delay)
    if now := time.Now(); next.After(now) {
        fetchDelay = next.Sub(now)
    }
    startFetch := time.After(fetchDelay)

    select {
    case <-startFetch:
        var fetched []Item
        fetched, next, err = s.fetcher.Fetch()
        if err != nil {
            next = time.Now().Add(10 * time.Second)
            break
        }
        pending = append(pending, fetched...)
    }
}

27

Case 3: Send

Send the fetched items, one at a time.

var pending []Item // appended by fetch; consumed by send for { select { case s.updates <- pending[0]: pending = pending[1:] } }

Whoops. This crashes.

28

Select and nil channels

Sends and receives on nil channels block.

Select never selects a blocking case.

// +build ignore,OMIT

package main

import ( "fmt" "math/rand" "time" )

func init() { rand.Seed(time.Now().UnixNano()) }

func main() { a, b := make(chan string), make(chan string) go func() { a <- "a" }() go func() { b <- "b" }() if rand.Intn(2) == 0 { a = nil fmt.Println("nil a") } else { b = nil fmt.Println("nil b") } select { case s := <-a: fmt.Println("got", s) case s := <-b: fmt.Println("got", s) } }

29

Case 3: Send (fixed)

Enable send only when pending is non-empty.

var pending []Item // appended by fetch; consumed by send
for {
    var first Item
    **var updates chan Item**
    if len(pending) > 0 {
        first = pending[0]
        **updates = s.updates // enable send case**
    }

    select {
    case updates <- first:
        pending = pending[1:]
    }
}

30

Select

Put the three cases together:

    select {
    case errc := <-s.closing:
        errc <- err
        close(s.updates)
        return
    case <-startFetch:
        var fetched []Item
        fetched, next, err = s.fetcher.Fetch()
        if err != nil {
            next = time.Now().Add(10 * time.Second)
            break
        }
        pending = append(pending, fetched...)
    case updates <- first:
        pending = pending[1:]
    }

The cases interact via err, next, and pending.

No locks, no condition variables, no callbacks.

31

Bugs fixed

32

We can improve loop further

33

Issue: Fetch may return duplicates

var pending []Item
var next time.Time
var err error

    case <-startFetch:
        var fetched []Item
        **fetched, next, err = s.fetcher.Fetch()**
        if err != nil {
            next = time.Now().Add(10 * time.Second)
            break
        }
        **pending = append(pending, fetched...)**

34

Fix: Filter items before adding to pending

var pending []Item
var next time.Time
var err error
**var seen = make(map[string]bool) // set of item.GUIDs**

    case <-startFetch:
        var fetched []Item
        fetched, next, err = s.fetcher.Fetch()
        if err != nil {
            next = time.Now().Add(10 * time.Second)
            break
        }
        for _, item := range fetched {
            **if !seen[item.GUID] {**
                **pending = append(pending, item)**
                **seen[item.GUID] = true**
            **}**
        }

35

Issue: Pending queue grows without bound

    case <-startFetch:
        var fetched []Item
        fetched, next, err = s.fetcher.Fetch()
        if err != nil {
            next = time.Now().Add(10 * time.Second)
            break
        }
        for _, item := range fetched {
            **if !seen[item.GUID] {**
                **pending = append(pending, item)**
                **seen[item.GUID] = true**
            **}**
        }

36

Fix: Disable fetch case when too much pending

    var fetchDelay time.Duration
    if now := time.Now(); next.After(now) {
        fetchDelay = next.Sub(now)
    }
    **var startFetch <-chan time.Time**
    **if len(pending) < maxPending {**
        **startFetch = time.After(fetchDelay) // enable fetch case**
    **}**

Could instead drop older items from the head of pending.

37

Issue: Loop blocks on Fetch

    case <-startFetch:
        var fetched []Item
        **fetched, next, err = s.fetcher.Fetch()**
        if err != nil {
            next = time.Now().Add(10 * time.Second)
            break
        }
        for _, item := range fetched {
            if !seen[item.GUID] {
                pending = append(pending, item)
                seen[item.GUID] = true         
            }
        }

38

Fix: Run Fetch asynchronously

Add a new select case for fetchDone.

type fetchResult struct{ fetched []Item; next time.Time; err error }

var fetchDone chan fetchResult // if non-nil, Fetch is running

    var startFetch <-chan time.Time
    **if fetchDone == nil && len(pending) < maxPending {**
        startFetch = time.After(fetchDelay) // enable fetch case
    }

    select {
    **case <-startFetch:**
        **fetchDone = make(chan fetchResult, 1)**
        go func() {
            fetched, next, err := s.fetcher.Fetch()
            fetchDone <- fetchResult{fetched, next, err}
        }()
    **case result := <-fetchDone:**
        **fetchDone = nil**
        // Use result.fetched, result.next, result.err

39

Implemented Subscribe

Responsive. Cleans up. Easy to read and change.

Three techniques:

More details online, including Merge.

40

Conclusion

Concurrent programming can be tricky.

Go makes it easier:

41

Go Concurrency Patterns (2012)

go.dev/talks/2012/concurrency.slide

Concurrency is not parallelism

go.dev/s/concurrency-is-not-parallelism

Share memory by communicating

go.dev/doc/codewalk/sharemem

Go Tour (learn Go in your browser)

go.dev/tour

42

Thank you

Use the left and right arrow keys or click the left and right edges of the page to navigate between slides.
(Press 'H' or navigate to hide this message.)