Advanced Go Concurrency Patterns (original) (raw)
Video
This talk was presented at Google I/O in May 2013.
2
Get ready
3
Go supports concurrency
In the language and runtime, not a library.
This changes how you structure your programs.
4
Goroutines and Channels
Goroutines are independently executing functions in the same address space.
Channels are typed values that allow goroutines to synchronize and exchange information.
c := make(chan int) go func() { c <- 3 }() n := <-c
For more on the basics, watch Go Concurrency Patterns (Pike, 2012).
5
Example: ping-pong
// +build ignore,OMIT
package main
import ( "fmt" "time" )
type Ball struct{ hits int }
func main() { table := make(chan *Ball) go player("ping", table) go player("pong", table)
table <- new(Ball) // game on; toss the ball
time.Sleep(1 * time.Second)
<-table // game over; grab the ball}
func player(name string, table chan *Ball) { for { ball := <-table ball.hits++ fmt.Println(name, ball.hits) time.Sleep(100 * time.Millisecond) table <- ball } }
6
Deadlock detection
// +build ignore,OMIT
package main
import ( "fmt" "time" )
type Ball struct{ hits int }
func main() { table := make(chan *Ball) go player("ping", table) go player("pong", table)
**// table <- new(Ball) // game on; toss the ball**
time.Sleep(1 * time.Second)
<-table // game over; grab the ball}
func player(name string, table chan *Ball) { for { ball := <-table ball.hits++ fmt.Println(name, ball.hits) time.Sleep(100 * time.Millisecond) table <- ball } }
7
Panic dumps the stacks
// +build ignore,OMIT
package main
import ( "fmt" "time" )
type Ball struct{ hits int }
func main() { table := make(chan *Ball) go player("ping", table) go player("pong", table)
table <- new(Ball) // game on; toss the ball
time.Sleep(1 * time.Second)
<-table // game over; grab the ball
**panic("show me the stacks")**}
func player(name string, table chan *Ball) { for { ball := <-table ball.hits++ fmt.Println(name, ball.hits) time.Sleep(100 * time.Millisecond) table <- ball } }
8
It's easy to go, but how to stop?
Long-lived programs need to clean up.
Let's look at how to write programs that handle communication, periodic events, and cancellation.
The core is Go's select statement: like a switch, but the decision is made based on the ability to communicate.
select { case xc <- x: // sent x on xc case y := <-yc: // received y from yc }
9
Example: feed reader
My favorite feed reader disappeared. I need a new one.
Why not write one?
Where do we start?
10
Find an RSS client
Searching pkg.go.dev for "rss" turns up several hits, including one that provides:
// Fetch fetches Items for uri and returns the time when the next // fetch should be attempted. On failure, Fetch returns an error. func Fetch(uri string) (items []Item, next time.Time, err error)
type Item struct{ Title, Channel, GUID string // a subset of RSS fields }
But I want a stream:
And I want multiple subscriptions.
11
Here's what we have
type Fetcher interface { Fetch() (items []Item, next time.Time, err error) }
func Fetch(domain string) Fetcher {...} // fetches Items from domain
12
Here's what we want
type Subscription interface { Updates() <-chan Item // stream of Items Close() error // shuts down the stream }
func Subscribe(fetcher Fetcher) Subscription {...} // converts Fetches to a stream
func Merge(subs ...Subscription) Subscription {...} // merges several streams
13
Example
//go:build ignore && OMIT // +build ignore,OMIT
// fakemain runs the Subscribe example with a fake RSS fetcher. package main
import ( "fmt" "math/rand" "time" )
// STARTITEM OMIT // An Item is a stripped-down RSS item. type Item struct{ Title, Channel, GUID string }
// STOPITEM OMIT
// STARTFETCHER OMIT // A Fetcher fetches Items and returns the time when the next fetch should be // attempted. On failure, Fetch returns a non-nil error. type Fetcher interface { Fetch() (items []Item, next time.Time, err error) }
// STOPFETCHER OMIT
// STARTSUBSCRIPTION OMIT // A Subscription delivers Items over a channel. Close cancels the // subscription, closes the Updates channel, and returns the last fetch error, // if any. type Subscription interface { Updates() <-chan Item Close() error }
// STOPSUBSCRIPTION OMIT
// STARTSUBSCRIBE OMIT // Subscribe returns a new Subscription that uses fetcher to fetch Items. func Subscribe(fetcher Fetcher) Subscription { s := &sub{ fetcher: fetcher, updates: make(chan Item), // for Updates closing: make(chan chan error), // for Close } go s.loop() return s }
// STOPSUBSCRIBE OMIT
// sub implements the Subscription interface. type sub struct { fetcher Fetcher // fetches items updates chan Item // sends items to the user closing chan chan error // for Close }
// STARTUPDATES OMIT func (s *sub) Updates() <-chan Item { return s.updates }
// STOPUPDATES OMIT
// STARTCLOSE OMIT // STARTCLOSESIG OMIT func (s *sub) Close() error { // STOPCLOSESIG OMIT errc := make(chan error) s.closing <- errc // HLchan return <-errc // HLchan }
// STOPCLOSE OMIT
// loopCloseOnly is a version of loop that includes only the logic // that handles Close. func (s *sub) loopCloseOnly() { // STARTCLOSEONLY OMIT var err error // set when Fetch fails for { select { case errc := <-s.closing: // HLchan errc <- err // HLchan close(s.updates) // tells receiver we're done return } } // STOPCLOSEONLY OMIT }
// loopFetchOnly is a version of loop that includes only the logic // that calls Fetch. func (s *sub) loopFetchOnly() { // STARTFETCHONLY OMIT var pending []Item // appended by fetch; consumed by send var next time.Time // initially January 1, year 0 var err error for { var fetchDelay time.Duration // initially 0 (no delay) if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay)
select {
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
}
}
// STOPFETCHONLY OMIT}
// loopSendOnly is a version of loop that includes only the logic for // sending items to s.updates. func (s *sub) loopSendOnly() { // STARTSENDONLY OMIT var pending []Item // appended by fetch; consumed by send for { var first Item var updates chan Item // HLupdates if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case // HLupdates }
select {
case updates <- first:
pending = pending[1:]
}
}
// STOPSENDONLY OMIT}
// mergedLoop is a version of loop that combines loopCloseOnly, // loopFetchOnly, and loopSendOnly. func (s *sub) mergedLoop() { // STARTFETCHVARS OMIT var pending []Item var next time.Time var err error // STOPFETCHVARS OMIT for { // STARTNOCAP OMIT var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay) // STOPNOCAP OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case }
// STARTSELECT OMIT
select {
case errc := <-s.closing: // HLcases
errc <- err
close(s.updates)
return
// STARTFETCHCASE OMIT
case <-startFetch: // HLcases
var fetched []Item
fetched, next, err = s.fetcher.Fetch() // HLfetch
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...) // HLfetch
// STOPFETCHCASE OMIT
case updates <- first: // HLcases
pending = pending[1:]
}
// STOPSELECT OMIT
}}
// dedupeLoop extends mergedLoop with deduping of fetched items. func (s *sub) dedupeLoop() { const maxPending = 10 // STARTSEEN OMIT var pending []Item var next time.Time var err error var seen = make(map[string]bool) // set of item.GUIDs // HLseen // STOPSEEN OMIT for { // STARTCAP OMIT var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } var startFetch <-chan time.Time // HLcap if len(pending) < maxPending { // HLcap startFetch = time.After(fetchDelay) // enable fetch case // HLcap } // HLcap // STOPCAP OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } select { case errc := <-s.closing: errc <- err close(s.updates) return // STARTDEDUPE OMIT case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() // HLfetch if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if !seen[item.GUID] { // HLdupe pending = append(pending, item) // HLdupe seen[item.GUID] = true // HLdupe } // HLdupe } // STOPDEDUPE OMIT case updates <- first: pending = pending[1:] } } }
// loop periodically fetches Items, sends them on s.updates, and exits // when Close is called. It extends dedupeLoop with logic to run // Fetch asynchronously. func (s *sub) loop() { const maxPending = 10 type fetchResult struct { fetched []Item next time.Time err error } // STARTFETCHDONE OMIT var fetchDone chan fetchResult // if non-nil, Fetch is running // HL // STOPFETCHDONE OMIT var pending []Item var next time.Time var err error var seen = make(map[string]bool) for { var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } // STARTFETCHIF OMIT var startFetch <-chan time.Time if fetchDone == nil && len(pending) < maxPending { // HLfetch startFetch = time.After(fetchDelay) // enable fetch case } // STOPFETCHIF OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } // STARTFETCHASYNC OMIT select { case <-startFetch: // HLfetch fetchDone = make(chan fetchResult, 1) // HLfetch go func() { fetched, next, err := s.fetcher.Fetch() fetchDone <- fetchResult{fetched, next, err} }() case result := <-fetchDone: // HLfetch fetchDone = nil // HLfetch // Use result.fetched, result.next, result.err // STOPFETCHASYNC OMIT fetched := result.fetched next, err = result.next, result.err if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if id := item.GUID; !seen[id] { // HLdupe pending = append(pending, item) seen[id] = true // HLdupe } } case errc := <-s.closing: errc <- err close(s.updates) return case updates <- first: pending = pending[1:] } } }
// naiveMerge is a version of Merge that doesn't quite work right. In // particular, the goroutines it starts may block forever on m.updates // if the receiver stops receiving. type naiveMerge struct { subs []Subscription updates chan Item }
// STARTNAIVEMERGE OMIT func NaiveMerge(subs ...Subscription) Subscription { m := &naiveMerge{ subs: subs, updates: make(chan Item), } // STARTNAIVEMERGELOOP OMIT for _, sub := range subs { go func(s Subscription) { for it := range s.Updates() { m.updates <- it // HL } }(sub) } // STOPNAIVEMERGELOOP OMIT return m }
// STOPNAIVEMERGE OMIT
// STARTNAIVEMERGECLOSE OMIT func (m *naiveMerge) Close() (err error) { for _, sub := range m.subs { if e := sub.Close(); err == nil && e != nil { err = e } } close(m.updates) // HL return }
// STOPNAIVEMERGECLOSE OMIT
func (m *naiveMerge) Updates() <-chan Item { return m.updates }
type merge struct { subs []Subscription updates chan Item quit chan struct{} errs chan error }
// STARTMERGESIG OMIT // Merge returns a Subscription that merges the item streams from subs. // Closing the merged subscription closes subs. func Merge(subs ...Subscription) Subscription { // STOPMERGESIG OMIT m := &merge{ subs: subs, updates: make(chan Item), quit: make(chan struct{}), errs: make(chan error), } // STARTMERGE OMIT for _, sub := range subs { go func(s Subscription) { for { var it Item select { case it = <-s.Updates(): case <-m.quit: // HL m.errs <- s.Close() // HL return // HL } select { case m.updates <- it: case <-m.quit: // HL m.errs <- s.Close() // HL return // HL } } }(sub) } // STOPMERGE OMIT return m }
func (m *merge) Updates() <-chan Item { return m.updates }
// STARTMERGECLOSE OMIT func (m *merge) Close() (err error) { close(m.quit) // HL for _ = range m.subs { if e := <-m.errs; e != nil { // HL err = e } } close(m.updates) // HL return }
// STOPMERGECLOSE OMIT
// NaiveDedupe converts a stream of Items that may contain duplicates // into one that doesn't. func NaiveDedupe(in <-chan Item) <-chan Item { out := make(chan Item) go func() { seen := make(map[string]bool) for it := range in { if !seen[it.GUID] { // BUG: this send blocks if the // receiver closes the Subscription // and stops receiving. out <- it // HL seen[it.GUID] = true } } close(out) }() return out }
type deduper struct { s Subscription updates chan Item closing chan chan error }
// Dedupe converts a Subscription that may send duplicate Items into // one that doesn't. func Dedupe(s Subscription) Subscription { d := &deduper{ s: s, updates: make(chan Item), closing: make(chan chan error), } go d.loop() return d }
func (d *deduper) loop() { in := d.s.Updates() // enable receive var pending Item var out chan Item // disable send seen := make(map[string]bool) for { select { case it := <-in: if !seen[it.GUID] { pending = it in = nil // disable receive out = d.updates // enable send seen[it.GUID] = true } case out <- pending: in = d.s.Updates() // enable receive out = nil // disable send case errc := <-d.closing: err := d.s.Close() errc <- err close(d.updates) return } } }
func (d *deduper) Close() error { errc := make(chan error) d.closing <- errc return <-errc }
func (d *deduper) Updates() <-chan Item { return d.updates }
// Fetch returns a Fetcher for Items from domain. func Fetch(domain string) Fetcher { return fakeFetch(domain) }
func fakeFetch(domain string) Fetcher { return &fakeFetcher{channel: domain} }
type fakeFetcher struct { channel string items []Item }
// FakeDuplicates causes the fake fetcher to return duplicate items. var FakeDuplicates bool
func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) { now := time.Now() next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond) item := Item{ Channel: f.channel, Title: fmt.Sprintf("Item %d", len(f.items)), } item.GUID = item.Channel + "/" + item.Title f.items = append(f.items, item) if FakeDuplicates { items = f.items } else { items = []Item{item} } return }
func init() { rand.Seed(time.Now().UnixNano()) }
// STARTMAIN OMIT
func main() { // Subscribe to some feeds, and create a merged update stream. merged := Merge( Subscribe(Fetch("blog.golang.org")), Subscribe(Fetch("googleblog.blogspot.com")), Subscribe(Fetch("googledevelopers.blogspot.com")))
// Close the subscriptions after some time.
time.AfterFunc(3*time.Second, func() {
fmt.Println("closed:", merged.Close())
})
// Print the stream.
for it := range merged.Updates() {
fmt.Println(it.Channel, it.Title)
}
panic("show me the stacks")}
// STOPMAIN OMIT
14
Subscribe
Subscribe creates a new Subscription that repeatedly fetches items until Close is called.
func Subscribe(fetcher Fetcher) Subscription { s := &sub{ fetcher: fetcher, updates: make(chan Item), // for Updates } go s.loop() return s }
// sub implements the Subscription interface. type sub struct { fetcher Fetcher // fetches items updates chan Item // delivers items to the user }
// loop fetches items using s.fetcher and sends them // on s.updates. loop exits when s.Close is called. func (s *sub) loop() {...}
15
Implementing Subscription
To implement the Subscription interface, define Updates and Close.
func (s *sub) Updates() <-chan Item { return s.updates }
func (s *sub) Close() error { // TODO: make loop exit // TODO: find out about any error return err }
16
What does loop do?
- periodically call
Fetch - send fetched items on the
Updateschannel - exit when
Closeis called, reporting any error 17
Naive Implementation
// naivemain runs the Subscribe example with the naive Subscribe // implementation and a fake RSS fetcher. //go:build ignore && OMIT // +build ignore,OMIT
package main
import ( "fmt" "math/rand" "time" )
// STARTITEM OMIT // An Item is a stripped-down RSS item. type Item struct{ Title, Channel, GUID string }
// STOPITEM OMIT
// STARTFETCHER OMIT // A Fetcher fetches Items and returns the time when the next fetch should be // attempted. On failure, Fetch returns a non-nil error. type Fetcher interface { Fetch() (items []Item, next time.Time, err error) }
// STOPFETCHER OMIT
// STARTSUBSCRIPTION OMIT // A Subscription delivers Items over a channel. Close cancels the // subscription, closes the Updates channel, and returns the last fetch error, // if any. type Subscription interface { Updates() <-chan Item Close() error }
// STOPSUBSCRIPTION OMIT
// STARTSUBSCRIBE OMIT // Subscribe returns a new Subscription that uses fetcher to fetch Items. func Subscribe(fetcher Fetcher) Subscription { s := &sub{ fetcher: fetcher, updates: make(chan Item), // for Updates closing: make(chan chan error), // for Close } go s.loop() return s }
// STOPSUBSCRIBE OMIT
// sub implements the Subscription interface. type sub struct { fetcher Fetcher // fetches items updates chan Item // sends items to the user closing chan chan error // for Close }
// STARTUPDATES OMIT func (s *sub) Updates() <-chan Item { return s.updates }
// STOPUPDATES OMIT
// STARTCLOSE OMIT // STARTCLOSESIG OMIT func (s *sub) Close() error { // STOPCLOSESIG OMIT errc := make(chan error) s.closing <- errc // HLchan return <-errc // HLchan }
// STOPCLOSE OMIT
// loopCloseOnly is a version of loop that includes only the logic // that handles Close. func (s *sub) loopCloseOnly() { // STARTCLOSEONLY OMIT var err error // set when Fetch fails for { select { case errc := <-s.closing: // HLchan errc <- err // HLchan close(s.updates) // tells receiver we're done return } } // STOPCLOSEONLY OMIT }
// loopFetchOnly is a version of loop that includes only the logic // that calls Fetch. func (s *sub) loopFetchOnly() { // STARTFETCHONLY OMIT var pending []Item // appended by fetch; consumed by send var next time.Time // initially January 1, year 0 var err error for { var fetchDelay time.Duration // initially 0 (no delay) if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay)
select {
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
}
}
// STOPFETCHONLY OMIT}
// loopSendOnly is a version of loop that includes only the logic for // sending items to s.updates. func (s *sub) loopSendOnly() { // STARTSENDONLY OMIT var pending []Item // appended by fetch; consumed by send for { var first Item var updates chan Item // HLupdates if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case // HLupdates }
select {
case updates <- first:
pending = pending[1:]
}
}
// STOPSENDONLY OMIT}
// mergedLoop is a version of loop that combines loopCloseOnly, // loopFetchOnly, and loopSendOnly. func (s *sub) mergedLoop() { // STARTFETCHVARS OMIT var pending []Item var next time.Time var err error // STOPFETCHVARS OMIT for { // STARTNOCAP OMIT var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay) // STOPNOCAP OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case }
// STARTSELECT OMIT
select {
case errc := <-s.closing: // HLcases
errc <- err
close(s.updates)
return
// STARTFETCHCASE OMIT
case <-startFetch: // HLcases
var fetched []Item
fetched, next, err = s.fetcher.Fetch() // HLfetch
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...) // HLfetch
// STOPFETCHCASE OMIT
case updates <- first: // HLcases
pending = pending[1:]
}
// STOPSELECT OMIT
}}
// dedupeLoop extends mergedLoop with deduping of fetched items. func (s *sub) dedupeLoop() { const maxPending = 10 // STARTSEEN OMIT var pending []Item var next time.Time var err error var seen = make(map[string]bool) // set of item.GUIDs // HLseen // STOPSEEN OMIT for { // STARTCAP OMIT var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } var startFetch <-chan time.Time // HLcap if len(pending) < maxPending { // HLcap startFetch = time.After(fetchDelay) // enable fetch case // HLcap } // HLcap // STOPCAP OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } select { case errc := <-s.closing: errc <- err close(s.updates) return // STARTDEDUPE OMIT case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() // HLfetch if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if !seen[item.GUID] { // HLdupe pending = append(pending, item) // HLdupe seen[item.GUID] = true // HLdupe } // HLdupe } // STOPDEDUPE OMIT case updates <- first: pending = pending[1:] } } }
// loop periodically fetches Items, sends them on s.updates, and exits // when Close is called. It extends dedupeLoop with logic to run // Fetch asynchronously. func (s *sub) loop() { const maxPending = 10 type fetchResult struct { fetched []Item next time.Time err error } // STARTFETCHDONE OMIT var fetchDone chan fetchResult // if non-nil, Fetch is running // HL // STOPFETCHDONE OMIT var pending []Item var next time.Time var err error var seen = make(map[string]bool) for { var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } // STARTFETCHIF OMIT var startFetch <-chan time.Time if fetchDone == nil && len(pending) < maxPending { // HLfetch startFetch = time.After(fetchDelay) // enable fetch case } // STOPFETCHIF OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } // STARTFETCHASYNC OMIT select { case <-startFetch: // HLfetch fetchDone = make(chan fetchResult, 1) // HLfetch go func() { fetched, next, err := s.fetcher.Fetch() fetchDone <- fetchResult{fetched, next, err} }() case result := <-fetchDone: // HLfetch fetchDone = nil // HLfetch // Use result.fetched, result.next, result.err // STOPFETCHASYNC OMIT fetched := result.fetched next, err = result.next, result.err if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if id := item.GUID; !seen[id] { // HLdupe pending = append(pending, item) seen[id] = true // HLdupe } } case errc := <-s.closing: errc <- err close(s.updates) return case updates <- first: pending = pending[1:] } } }
// naiveMerge is a version of Merge that doesn't quite work right. In // particular, the goroutines it starts may block forever on m.updates // if the receiver stops receiving. type naiveMerge struct { subs []Subscription updates chan Item }
// STARTNAIVEMERGE OMIT func NaiveMerge(subs ...Subscription) Subscription { m := &naiveMerge{ subs: subs, updates: make(chan Item), } // STARTNAIVEMERGELOOP OMIT for _, sub := range subs { go func(s Subscription) { for it := range s.Updates() { m.updates <- it // HL } }(sub) } // STOPNAIVEMERGELOOP OMIT return m }
// STOPNAIVEMERGE OMIT
// STARTNAIVEMERGECLOSE OMIT func (m *naiveMerge) Close() (err error) { for _, sub := range m.subs { if e := sub.Close(); err == nil && e != nil { err = e } } close(m.updates) // HL return }
// STOPNAIVEMERGECLOSE OMIT
func (m *naiveMerge) Updates() <-chan Item { return m.updates }
type merge struct { subs []Subscription updates chan Item quit chan struct{} errs chan error }
// STARTMERGESIG OMIT // Merge returns a Subscription that merges the item streams from subs. // Closing the merged subscription closes subs. func Merge(subs ...Subscription) Subscription { // STOPMERGESIG OMIT m := &merge{ subs: subs, updates: make(chan Item), quit: make(chan struct{}), errs: make(chan error), } // STARTMERGE OMIT for _, sub := range subs { go func(s Subscription) { for { var it Item select { case it = <-s.Updates(): case <-m.quit: // HL m.errs <- s.Close() // HL return // HL } select { case m.updates <- it: case <-m.quit: // HL m.errs <- s.Close() // HL return // HL } } }(sub) } // STOPMERGE OMIT return m }
func (m *merge) Updates() <-chan Item { return m.updates }
// STARTMERGECLOSE OMIT func (m *merge) Close() (err error) { close(m.quit) // HL for _ = range m.subs { if e := <-m.errs; e != nil { // HL err = e } } close(m.updates) // HL return }
// STOPMERGECLOSE OMIT
// NaiveDedupe converts a stream of Items that may contain duplicates // into one that doesn't. func NaiveDedupe(in <-chan Item) <-chan Item { out := make(chan Item) go func() { seen := make(map[string]bool) for it := range in { if !seen[it.GUID] { // BUG: this send blocks if the // receiver closes the Subscription // and stops receiving. out <- it // HL seen[it.GUID] = true } } close(out) }() return out }
type deduper struct { s Subscription updates chan Item closing chan chan error }
// Dedupe converts a Subscription that may send duplicate Items into // one that doesn't. func Dedupe(s Subscription) Subscription { d := &deduper{ s: s, updates: make(chan Item), closing: make(chan chan error), } go d.loop() return d }
func (d *deduper) loop() { in := d.s.Updates() // enable receive var pending Item var out chan Item // disable send seen := make(map[string]bool) for { select { case it := <-in: if !seen[it.GUID] { pending = it in = nil // disable receive out = d.updates // enable send seen[it.GUID] = true } case out <- pending: in = d.s.Updates() // enable receive out = nil // disable send case errc := <-d.closing: err := d.s.Close() errc <- err close(d.updates) return } } }
func (d *deduper) Close() error { errc := make(chan error) d.closing <- errc return <-errc }
func (d *deduper) Updates() <-chan Item { return d.updates }
// Fetch returns a Fetcher for Items from domain. func Fetch(domain string) Fetcher { return fakeFetch(domain) }
func fakeFetch(domain string) Fetcher { return &fakeFetcher{channel: domain} }
type fakeFetcher struct { channel string items []Item }
// FakeDuplicates causes the fake fetcher to return duplicate items. var FakeDuplicates bool
func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) { now := time.Now() next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond) item := Item{ Channel: f.channel, Title: fmt.Sprintf("Item %d", len(f.items)), } item.GUID = item.Channel + "/" + item.Title f.items = append(f.items, item) if FakeDuplicates { items = f.items } else { items = []Item{item} } return }
func NaiveSubscribe(fetcher Fetcher) Subscription { s := &naiveSub{ fetcher: fetcher, updates: make(chan Item), } go s.loop() return s }
type naiveSub struct { fetcher Fetcher updates chan Item closed bool err error }
func (s *naiveSub) Updates() <-chan Item { return s.updates }
func (s *naiveSub) loop() {
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
time.Sleep(10 * time.Second)
continue
}
for _, item := range items {
s.updates <- item
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now))
}
}}
func (s *naiveSub) Close() error { s.closed = true // HLsync return s.err // HLsync }
func init() { rand.Seed(time.Now().UnixNano()) }
func main() { // Subscribe to some feeds, and create a merged update stream. merged := Merge( NaiveSubscribe(Fetch("blog.golang.org")), NaiveSubscribe(Fetch("googleblog.blogspot.com")), NaiveSubscribe(Fetch("googledevelopers.blogspot.com")))
// Close the subscriptions after some time.
time.AfterFunc(3*time.Second, func() {
fmt.Println("closed:", merged.Close())
})
// Print the stream.
for it := range merged.Updates() {
fmt.Println(it.Channel, it.Title)
}
// The loops are still running. Let the race detector notice.
time.Sleep(1 * time.Second)
panic("show me the stacks")}
func (s *naiveSub) Close() error {
s.closed = true
return s.err
}
18
Bug 1: unsynchronized access to s.closed/s.err
for {
**if s.closed {**
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
**s.err = err**
time.Sleep(10 * time.Second)
continue
}
for _, item := range items {
s.updates <- item
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now))
}
}func (s *naiveSub) Close() error { s.closed = true return s.err }
19
Race Detector
go run -race naivemain.go
// naivemain runs the Subscribe example with the naive Subscribe // implementation and a fake RSS fetcher. //go:build ignore && OMIT // +build ignore,OMIT
package main
import ( "fmt" "math/rand" "time" )
// STARTITEM OMIT // An Item is a stripped-down RSS item. type Item struct{ Title, Channel, GUID string }
// STOPITEM OMIT
// STARTFETCHER OMIT // A Fetcher fetches Items and returns the time when the next fetch should be // attempted. On failure, Fetch returns a non-nil error. type Fetcher interface { Fetch() (items []Item, next time.Time, err error) }
// STOPFETCHER OMIT
// STARTSUBSCRIPTION OMIT // A Subscription delivers Items over a channel. Close cancels the // subscription, closes the Updates channel, and returns the last fetch error, // if any. type Subscription interface { Updates() <-chan Item Close() error }
// STOPSUBSCRIPTION OMIT
// STARTSUBSCRIBE OMIT // Subscribe returns a new Subscription that uses fetcher to fetch Items. func Subscribe(fetcher Fetcher) Subscription { s := &sub{ fetcher: fetcher, updates: make(chan Item), // for Updates closing: make(chan chan error), // for Close } go s.loop() return s }
// STOPSUBSCRIBE OMIT
// sub implements the Subscription interface. type sub struct { fetcher Fetcher // fetches items updates chan Item // sends items to the user closing chan chan error // for Close }
// STARTUPDATES OMIT func (s *sub) Updates() <-chan Item { return s.updates }
// STOPUPDATES OMIT
// STARTCLOSE OMIT // STARTCLOSESIG OMIT func (s *sub) Close() error { // STOPCLOSESIG OMIT errc := make(chan error) s.closing <- errc // HLchan return <-errc // HLchan }
// STOPCLOSE OMIT
// loopCloseOnly is a version of loop that includes only the logic // that handles Close. func (s *sub) loopCloseOnly() { // STARTCLOSEONLY OMIT var err error // set when Fetch fails for { select { case errc := <-s.closing: // HLchan errc <- err // HLchan close(s.updates) // tells receiver we're done return } } // STOPCLOSEONLY OMIT }
// loopFetchOnly is a version of loop that includes only the logic // that calls Fetch. func (s *sub) loopFetchOnly() { // STARTFETCHONLY OMIT var pending []Item // appended by fetch; consumed by send var next time.Time // initially January 1, year 0 var err error for { var fetchDelay time.Duration // initially 0 (no delay) if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay)
select {
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
}
}
// STOPFETCHONLY OMIT}
// loopSendOnly is a version of loop that includes only the logic for // sending items to s.updates. func (s *sub) loopSendOnly() { // STARTSENDONLY OMIT var pending []Item // appended by fetch; consumed by send for { var first Item var updates chan Item // HLupdates if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case // HLupdates }
select {
case updates <- first:
pending = pending[1:]
}
}
// STOPSENDONLY OMIT}
// mergedLoop is a version of loop that combines loopCloseOnly, // loopFetchOnly, and loopSendOnly. func (s *sub) mergedLoop() { // STARTFETCHVARS OMIT var pending []Item var next time.Time var err error // STOPFETCHVARS OMIT for { // STARTNOCAP OMIT var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay) // STOPNOCAP OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case }
// STARTSELECT OMIT
select {
case errc := <-s.closing: // HLcases
errc <- err
close(s.updates)
return
// STARTFETCHCASE OMIT
case <-startFetch: // HLcases
var fetched []Item
fetched, next, err = s.fetcher.Fetch() // HLfetch
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...) // HLfetch
// STOPFETCHCASE OMIT
case updates <- first: // HLcases
pending = pending[1:]
}
// STOPSELECT OMIT
}}
// dedupeLoop extends mergedLoop with deduping of fetched items. func (s *sub) dedupeLoop() { const maxPending = 10 // STARTSEEN OMIT var pending []Item var next time.Time var err error var seen = make(map[string]bool) // set of item.GUIDs // HLseen // STOPSEEN OMIT for { // STARTCAP OMIT var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } var startFetch <-chan time.Time // HLcap if len(pending) < maxPending { // HLcap startFetch = time.After(fetchDelay) // enable fetch case // HLcap } // HLcap // STOPCAP OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } select { case errc := <-s.closing: errc <- err close(s.updates) return // STARTDEDUPE OMIT case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() // HLfetch if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if !seen[item.GUID] { // HLdupe pending = append(pending, item) // HLdupe seen[item.GUID] = true // HLdupe } // HLdupe } // STOPDEDUPE OMIT case updates <- first: pending = pending[1:] } } }
// loop periodically fetches Items, sends them on s.updates, and exits // when Close is called. It extends dedupeLoop with logic to run // Fetch asynchronously. func (s *sub) loop() { const maxPending = 10 type fetchResult struct { fetched []Item next time.Time err error } // STARTFETCHDONE OMIT var fetchDone chan fetchResult // if non-nil, Fetch is running // HL // STOPFETCHDONE OMIT var pending []Item var next time.Time var err error var seen = make(map[string]bool) for { var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } // STARTFETCHIF OMIT var startFetch <-chan time.Time if fetchDone == nil && len(pending) < maxPending { // HLfetch startFetch = time.After(fetchDelay) // enable fetch case } // STOPFETCHIF OMIT var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } // STARTFETCHASYNC OMIT select { case <-startFetch: // HLfetch fetchDone = make(chan fetchResult, 1) // HLfetch go func() { fetched, next, err := s.fetcher.Fetch() fetchDone <- fetchResult{fetched, next, err} }() case result := <-fetchDone: // HLfetch fetchDone = nil // HLfetch // Use result.fetched, result.next, result.err // STOPFETCHASYNC OMIT fetched := result.fetched next, err = result.next, result.err if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if id := item.GUID; !seen[id] { // HLdupe pending = append(pending, item) seen[id] = true // HLdupe } } case errc := <-s.closing: errc <- err close(s.updates) return case updates <- first: pending = pending[1:] } } }
// naiveMerge is a version of Merge that doesn't quite work right. In // particular, the goroutines it starts may block forever on m.updates // if the receiver stops receiving. type naiveMerge struct { subs []Subscription updates chan Item }
// STARTNAIVEMERGE OMIT func NaiveMerge(subs ...Subscription) Subscription { m := &naiveMerge{ subs: subs, updates: make(chan Item), } // STARTNAIVEMERGELOOP OMIT for _, sub := range subs { go func(s Subscription) { for it := range s.Updates() { m.updates <- it // HL } }(sub) } // STOPNAIVEMERGELOOP OMIT return m }
// STOPNAIVEMERGE OMIT
// STARTNAIVEMERGECLOSE OMIT func (m *naiveMerge) Close() (err error) { for _, sub := range m.subs { if e := sub.Close(); err == nil && e != nil { err = e } } close(m.updates) // HL return }
// STOPNAIVEMERGECLOSE OMIT
func (m *naiveMerge) Updates() <-chan Item { return m.updates }
type merge struct { subs []Subscription updates chan Item quit chan struct{} errs chan error }
// STARTMERGESIG OMIT // Merge returns a Subscription that merges the item streams from subs. // Closing the merged subscription closes subs. func Merge(subs ...Subscription) Subscription { // STOPMERGESIG OMIT m := &merge{ subs: subs, updates: make(chan Item), quit: make(chan struct{}), errs: make(chan error), } // STARTMERGE OMIT for _, sub := range subs { go func(s Subscription) { for { var it Item select { case it = <-s.Updates(): case <-m.quit: // HL m.errs <- s.Close() // HL return // HL } select { case m.updates <- it: case <-m.quit: // HL m.errs <- s.Close() // HL return // HL } } }(sub) } // STOPMERGE OMIT return m }
func (m *merge) Updates() <-chan Item { return m.updates }
// STARTMERGECLOSE OMIT func (m *merge) Close() (err error) { close(m.quit) // HL for _ = range m.subs { if e := <-m.errs; e != nil { // HL err = e } } close(m.updates) // HL return }
// STOPMERGECLOSE OMIT
// NaiveDedupe converts a stream of Items that may contain duplicates // into one that doesn't. func NaiveDedupe(in <-chan Item) <-chan Item { out := make(chan Item) go func() { seen := make(map[string]bool) for it := range in { if !seen[it.GUID] { // BUG: this send blocks if the // receiver closes the Subscription // and stops receiving. out <- it // HL seen[it.GUID] = true } } close(out) }() return out }
type deduper struct { s Subscription updates chan Item closing chan chan error }
// Dedupe converts a Subscription that may send duplicate Items into // one that doesn't. func Dedupe(s Subscription) Subscription { d := &deduper{ s: s, updates: make(chan Item), closing: make(chan chan error), } go d.loop() return d }
func (d *deduper) loop() { in := d.s.Updates() // enable receive var pending Item var out chan Item // disable send seen := make(map[string]bool) for { select { case it := <-in: if !seen[it.GUID] { pending = it in = nil // disable receive out = d.updates // enable send seen[it.GUID] = true } case out <- pending: in = d.s.Updates() // enable receive out = nil // disable send case errc := <-d.closing: err := d.s.Close() errc <- err close(d.updates) return } } }
func (d *deduper) Close() error { errc := make(chan error) d.closing <- errc return <-errc }
func (d *deduper) Updates() <-chan Item { return d.updates }
// Fetch returns a Fetcher for Items from domain. func Fetch(domain string) Fetcher { return fakeFetch(domain) }
func fakeFetch(domain string) Fetcher { return &fakeFetcher{channel: domain} }
type fakeFetcher struct { channel string items []Item }
// FakeDuplicates causes the fake fetcher to return duplicate items. var FakeDuplicates bool
func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) { now := time.Now() next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond) item := Item{ Channel: f.channel, Title: fmt.Sprintf("Item %d", len(f.items)), } item.GUID = item.Channel + "/" + item.Title f.items = append(f.items, item) if FakeDuplicates { items = f.items } else { items = []Item{item} } return }
func NaiveSubscribe(fetcher Fetcher) Subscription { s := &naiveSub{ fetcher: fetcher, updates: make(chan Item), } go s.loop() return s }
type naiveSub struct { fetcher Fetcher updates chan Item closed bool err error }
func (s *naiveSub) Updates() <-chan Item { return s.updates }
func (s *naiveSub) loop() {
for {
**if s.closed {**
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
**s.err = err**
time.Sleep(10 * time.Second) // HLsleep
continue
}
for _, item := range items {
s.updates <- item // HLsend
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now)) // HLsleep
}
}
// STOPNAIVE OMIT}
func (s *naiveSub) Close() error { s.closed = true // HLsync return s.err // HLsync }
func init() { rand.Seed(time.Now().UnixNano()) }
func main() { // Subscribe to some feeds, and create a merged update stream. merged := Merge( NaiveSubscribe(Fetch("blog.golang.org")), NaiveSubscribe(Fetch("googleblog.blogspot.com")), NaiveSubscribe(Fetch("googledevelopers.blogspot.com")))
// Close the subscriptions after some time.
time.AfterFunc(3*time.Second, func() {
fmt.Println("closed:", merged.Close())
})
// Print the stream.
for it := range merged.Updates() {
fmt.Println(it.Channel, it.Title)
}
// The loops are still running. Let the race detector notice.
time.Sleep(1 * time.Second)
panic("show me the stacks")}
func (s *naiveSub) Close() error { s.closed = true return s.err }
20
Bug 2: time.Sleep may keep loop running
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
**time.Sleep(10 * time.Second)**
continue
}
for _, item := range items {
s.updates <- item
}
if now := time.Now(); next.After(now) {
**time.Sleep(next.Sub(now))**
}
}21
Bug 3: loop may block forever on s.updates
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
time.Sleep(10 * time.Second)
continue
}
for _, item := range items {
**s.updates <- item**
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now))
}
}22
Solution
Change the body of loop to a select with three cases:
Closewas called- it's time to call
Fetch - send an item on
s.updates23
Structure: for-select loop
loop runs in its own goroutine.
select lets loop avoid blocking indefinitely in any one state.
func (s *sub) loop() { ... declare mutable state ... for { ... set up channels for cases ... select { case <-c1: ... read/write state ... case c2 <- x: ... read/write state ... case y := <-c3: ... read/write state ... } } }
The cases interact via local state in loop.
24
Case 1: Close
Close communicates with loop via s.closing.
type sub struct { closing chan chan error }
The service (loop) listens for requests on its channel (s.closing).
The client (Close) sends a request on s.closing: exit and reply with the error
In this case, the only thing in the request is the reply channel.
25
Case 1: Close
Close asks loop to exit and waits for a response.
func (s *sub) Close() error { errc := make(chan error) s.closing <- errc return <-errc }
loop handles Close by replying with the Fetch error and exiting.
var err error // set when Fetch fails
for {
select {
**case errc := <-s.closing:**
**errc <- err**
close(s.updates) // tells receiver we're done
return
}
}26
Case 2: Fetch
Schedule the next Fetch after some delay.
var pending []Item // appended by fetch; consumed by send
var next time.Time // initially January 1, year 0
var err error
for {
var fetchDelay time.Duration // initially 0 (no delay)
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
select {
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
}
}27
Case 3: Send
Send the fetched items, one at a time.
var pending []Item // appended by fetch; consumed by send for { select { case s.updates <- pending[0]: pending = pending[1:] } }
Whoops. This crashes.
28
Select and nil channels
Sends and receives on nil channels block.
Select never selects a blocking case.
// +build ignore,OMIT
package main
import ( "fmt" "math/rand" "time" )
func init() { rand.Seed(time.Now().UnixNano()) }
func main() { a, b := make(chan string), make(chan string) go func() { a <- "a" }() go func() { b <- "b" }() if rand.Intn(2) == 0 { a = nil fmt.Println("nil a") } else { b = nil fmt.Println("nil b") } select { case s := <-a: fmt.Println("got", s) case s := <-b: fmt.Println("got", s) } }
29
Case 3: Send (fixed)
Enable send only when pending is non-empty.
var pending []Item // appended by fetch; consumed by send
for {
var first Item
**var updates chan Item**
if len(pending) > 0 {
first = pending[0]
**updates = s.updates // enable send case**
}
select {
case updates <- first:
pending = pending[1:]
}
}30
Select
Put the three cases together:
select {
case errc := <-s.closing:
errc <- err
close(s.updates)
return
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
case updates <- first:
pending = pending[1:]
} The cases interact via err, next, and pending.
No locks, no condition variables, no callbacks.
31
Bugs fixed
Bug 1: unsynchronized access to
s.closedands.errBug 2:
time.Sleepmay keep loop runningBug 3:
loopmay block forever sending ons.updatesselect { **case errc := <-s.closing:** errc <- err close(s.updates) return **case <-startFetch:** var fetched []Item fetched, next, err = s.fetcher.Fetch() if err != nil { next = time.Now().Add(10 * time.Second) break } pending = append(pending, fetched...) **case updates <- first:** pending = pending[1:] }
32
We can improve loop further
33
Issue: Fetch may return duplicates
var pending []Item
var next time.Time
var err error
case <-startFetch:
var fetched []Item
**fetched, next, err = s.fetcher.Fetch()**
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
**pending = append(pending, fetched...)**34
Fix: Filter items before adding to pending
var pending []Item
var next time.Time
var err error
**var seen = make(map[string]bool) // set of item.GUIDs**
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
**if !seen[item.GUID] {**
**pending = append(pending, item)**
**seen[item.GUID] = true**
**}**
}35
Issue: Pending queue grows without bound
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
**if !seen[item.GUID] {**
**pending = append(pending, item)**
**seen[item.GUID] = true**
**}**
}36
Fix: Disable fetch case when too much pending
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
**var startFetch <-chan time.Time**
**if len(pending) < maxPending {**
**startFetch = time.After(fetchDelay) // enable fetch case**
**}** Could instead drop older items from the head of pending.
37
Issue: Loop blocks on Fetch
case <-startFetch:
var fetched []Item
**fetched, next, err = s.fetcher.Fetch()**
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if !seen[item.GUID] {
pending = append(pending, item)
seen[item.GUID] = true
}
}38
Fix: Run Fetch asynchronously
Add a new select case for fetchDone.
type fetchResult struct{ fetched []Item; next time.Time; err error }
var fetchDone chan fetchResult // if non-nil, Fetch is running
var startFetch <-chan time.Time
**if fetchDone == nil && len(pending) < maxPending {**
startFetch = time.After(fetchDelay) // enable fetch case
}
select {
**case <-startFetch:**
**fetchDone = make(chan fetchResult, 1)**
go func() {
fetched, next, err := s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, next, err}
}()
**case result := <-fetchDone:**
**fetchDone = nil**
// Use result.fetched, result.next, result.err39
Implemented Subscribe
Responsive. Cleans up. Easy to read and change.
Three techniques:
for-selectloop- service channel, reply channels (
chan chan error) nilchannels inselectcases
More details online, including Merge.
40
Conclusion
Concurrent programming can be tricky.
Go makes it easier:
- channels convey data, timer events, cancellation signals
- goroutines serialize access to local mutable state
- stack traces & deadlock detector
- race detector
41
Links
Go Concurrency Patterns (2012)
go.dev/talks/2012/concurrency.slide
Concurrency is not parallelism
go.dev/s/concurrency-is-not-parallelism
Share memory by communicating
Go Tour (learn Go in your browser)
42
Thank you
Use the left and right arrow keys or click the left and right edges of the page to navigate between slides.
(Press 'H' or navigate to hide this message.)