Visualizing Concurrency

Go concurrency visually explained


Lviv, January 23, 2016
Ivan Danyliuk@idanyliuk

Modern world is concurrent

  • Moore's Law is dead
  • Multi-core CPUs
  • ...

Software is not...

What is concurrency?

Concurrency (Wikipedia)
In computer science, concurrency ... is a property of systems in which several computations are executing simultaneously, and potentially interacting with each other.

Rob Pike - Concurrency is not parallelism


Concurrency vs parallelism

  • Parallelism is simply running things in parallel.
  • Concurrency is a way of structuring programs.
  • Concurrent program can run either on single core or multiple cores.

Concurrency in Go

  • Based on Tony Hoare's CSP'78 paper
  • Channels
  • Goroutines
  • Select

Hello, concurrent world

package main

func main() {
	// create new channel of type int
	ch := make(chan int)

	// start new anonymous goroutine
	go func() {
		// send 42 to channel
		ch <- 42
	}()
	// read from channel
	<-ch
}

Timer

package main

import "time"

func main() {
	ch := time.Tick(100 * time.Millisecond)
	<-ch

	ch = time.After(1 * time.Second)
	<-ch

	i := 200 * time.Millisecond
	ch = time.AfterFunc(i, func() {
		println("Ran after 1 sec")
	})
	<-ch
}

Recurrent timers

package main

import "time"

func tick(d time.Duration) <-chan int {
	c := make(chan int)
	go func() {
		time.Sleep(d)
		c <- 1
	}()
	return c
}

func main() {
	for i := 0; i < 24; i++ {
		c := tick(1 * time.Second)
		<-c
	}
}

Ping-pong

func main() {
	var Ball int
	table := make(chan int)
	go player(table)
	go player(table)

	table <- Ball
	time.Sleep(1 * time.Second)
	<-table
}

func player(table chan int) {
	for {
		ball := <-table
		ball++
		time.Sleep(100 * time.Millisecond)
		table <- ball
	}
}

Ping-pong

Ping-pong #3

func main() {
	var Ball int
	table := make(chan int)
	go player(table)
	go player(table)
	go player(table)

	table <- Ball
	time.Sleep(2 * time.Second)
	<-table
}

func player(table chan int) {
	for {
		ball := <-table
		ball++
		time.Sleep(100e6)
		table <- ball
	}
}

Ping-pong #3

Ping-pong #100

func main() {
	var Ball int
	table := make(chan int)
	for i := 0; i < 100; i++ {
		go player(table)
	}

	table <- Ball
	time.Sleep(3 * time.Second)
	<-table
}

func player(table chan int) {
	for {
		ball := <-table
		ball++
		time.Sleep(100 * time.Millisecond)
		table <- ball
	}
}

Ping-pong #100

Fan-in

package main

import (
	"fmt"
	"time"
)

func producer1(ch chan int) {
	var i int
	for {
		time.Sleep(100 * time.Millisecond)
		ch <- i
		i++
	}
}

func producer2(ch chan int) {
	var i int
	for {
		time.Sleep(300 * time.Millisecond)
		ch <- i
		i++
	}
}

func reader(out chan int) {
	for {
		x := <-out
		fmt.Println(x)
	}
}

func main() {
	ch := make(chan int)
	go producer1(ch)
	go producer2(ch)
	out := make(chan int)
	go reader(out)
	for {
		i := <-ch
		out <- i
		if i == 20 {
			return
		}
	}
}

Fan in

Servers

func handler(c net.Conn) {
	c.Write([]byte("ok"))
	c.Close()
}

func main() {
	l, err := net.Listen("tcp", ":5000")
	if err != nil {
		panic(err)
	}
	for {
		c, err := l.Accept()
		if err != nil {
			continue
		}
		go handler(c)
	}
}

Servers

Servers

package main

import (
	"fmt"
	"net"
	"time"
)

func main() {
	l, err := net.Listen("tcp", ":5000")
	if err != nil {
		panic(err)
	}

	ch := make(chan string)

	go logger(ch)
	go server(l, ch)

	time.Sleep(10 * time.Second)
}
func handler(c net.Conn, ch chan string) {
	ch <- c.RemoteAddr().String()
	c.Write([]byte("ok"))
	c.Close()
}

func logger(ch chan string) {
	for {
		fmt.Println(<-ch)
	}
}

func server(l net.Listener, ch chan string) {
	for {
		c, err := l.Accept()
		if err != nil {
			continue
		}
		go handler(c, ch)
	}
}

Servers

Workers

func main() {
	var wg sync.WaitGroup
	wg.Add(36)
	go pool(&wg, 36)
	wg.Wait()
}

func pool(wg *sync.WaitGroup, n int) {
	tasks := make(chan int)
	for i := 0; i < n; i++ {
		go worker(tasks, wg)
	}
	for i := 0; i < 50; i++ {
		tasks <- i
	}
	close(tasks)
}
func worker(tasks <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		task, ok := <-tasks
		if !ok {
			return
		}
		d := time.Duration(task) * time.Millisecond
		time.Sleep(d)
		fmt.Println("processing task", task)
	}
}

Workers

Workers

func main() {
	var wg sync.WaitGroup
	wg.Add(36)
	go pool(&wg, 36)
	wg.Wait()
}

func pool(wg *sync.WaitGroup, n int) {
	tasks := make(chan int)
	for i := 0; i < n; i++ {
		go worker(tasks, wg)
	}
	for i := 0; i < 50; i++ {
		tasks <- i
	}
	close(tasks)
}
func worker(tasks <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		task, ok := <-tasks
		if !ok {
			return
		}
		d := time.Duration(task) * time.Millisecond
		time.Sleep(d)
		fmt.Println("processing task", task)
	}
}

Workers

Server+Worker

func handler(c net.Conn, ch chan string) {
	ch <- c.RemoteAddr().String()
	time.Sleep(100 * time.Millisecond)
	c.Write([]byte("ok"))
	c.Close()
}

func main() {
	l, err := net.Listen("tcp", ":5000")
	if err != nil {
		panic(err)
	}
	ch := make(chan string)

	go pool(ch, 4)
	go server(l, ch)
	time.Sleep(2 * time.Second)
}
func worker(wch chan int, results chan int) {
	for {
		data := <-wch
		results <- data
	}
}

func parse(results chan int) {
	for { <-results }
}

func pool(ch chan string, n int) {
	wch := make(chan int)
	results := make(chan int)
	for i := 0; i < n; i++ {
		go worker(wch, results)
	}
	go parse(results)
	for {
		addr := <-ch
		wch <- 42*len(addr)
	}
}

Server+Worker

Concurrent prime sieve

package main

import "fmt"

func main() {
	ch := make(chan int)
	go Generate(ch)
	for i := 0; i < 10; i++ {
		prime := <-ch
		fmt.Println(prime)
		out := make(chan int)
		go Filter(ch, out, prime)
		ch = out
	}
}
func Generate(ch chan<- int) {
	for i := 2; ; i++ {
		ch <- i
	}
}

func Filter(ch <-chan int, out chan<- int, prime int) {
	for {
		i := <-ch
		if i%prime != 0 {
			out <- i
		}
	}
}

Concurrent prime sieve

GOMAXPROCS

GOMAXPROCS

const workers = 24

func pool(in, out chan int) {
	for i := 0; i < workers; i++ {
		go worker(in, out)
	}
}

func worker(in, out chan int) {
	for {
		task := <-in
		// heavy computation here
		out <- task + 1
	}
}
func generate(in chan int) {
	for i := 0; i < 100; i++ {
		in <- i
	}
}

func print(out chan int) {
	for i := 0; i < 100; i++ {
		fmt.Println(<-out)
	}
}

func main() {
	runtime.GOMAXPROCS(1)

	in := make(chan int)
	out := make(chan int)
	go pool(in, out)
	go generate(in)
	go print(out)

	time.Sleep(1 * time.Second)
}

GOMAXPROCS = 1

GOMAXPROCS = 24

Goroutines leak

Goroutines leak

Go Concurrency Tracer

  • Modify source AST
  • Run and analyze trace
  • Render w/ WebGL via Three.js

Go Concurrency Tracer

It turns this:
package main

func main() {
	ch := make(chan int)

	go func() {
		ch <- 42
	}()

	<-ch
}

Go Concurrency Tracer

Into this:
...
func main() {
	_encode(_cmd{Name: getGID("main"), Command: "create goroutine"})
	defer _encode(_cmd{Name: getGID("main"), Command: "stop goroutine"})
	ch := make(chan int)
	go func() {
		_encode(_cmd{Parent: "main", Name: getGID("go"), Command: "create goroutine"})
		defer _encode(_cmd{Name: getGID("go"), Command: "stop goroutine"})
		_encode(_cmd{From: getGID("go"), Channel: "ch", Value: 42, Command: "start send"})
		ch <- 42
	}()
	_encode(_cmd{To: getGID("main"), Channel: "ch", Value: <-ch, Command: "start recv"})

}

func _encode(v _cmd) {
	var out []byte
	v.Time = time.Now().UnixNano()
	out, _ = json.Marshal(v)
	fmt.Fprintln(os.Stdout, "TRACE:", string(out))
}
...

Go Concurrency Tracer

Next, analyze raw trace:
{"t":1453171532768286484,"cmd":"create goroutine","name":"worker #27","parent":"pool #5"}
{"t":1453171532768328577,"cmd":"create goroutine","name":"worker #26","parent":"pool #5"}
{"t":1453171532768432492,"cmd":"create goroutine","name":"worker #29","parent":"pool #5"}
{"t":1453171532768435406,"cmd":"create goroutine","name":"worker #28","parent":"pool #5"}
{"t":1453171532768596996,"cmd":"create goroutine","name":"worker #20","parent":"pool #5"}
{"t":1453171532868018479,"cmd":"create goroutine","name":"print #32","parent":"main"}
{"t":1453171532868131248,"cmd":"create goroutine","name":"generate #31","parent":"main"}
{"t":1453171532879915073,"cmd":"start send","from":"generate #31","ch":"in","value":0}
{"t":1453171532880021908,"cmd":"start recv","to":"worker #30","ch":"in","value":0}
{"t":1453171532892937700,"cmd":"start send","from":"generate #31","ch":"in","value":1}
{"t":1453171532893072787,"cmd":"start recv","to":"worker #16","ch":"in","value":1}
{"t":1453171532904954369,"cmd":"start send","from":"generate #31","ch":"in","value":2}
{"t":1453171532905077725,"cmd":"start recv","to":"worker #17","ch":"in","value":2}
{"t":1453171532916999950,"cmd":"start send","from":"generate #31","ch":"in","value":3}
{"t":1453171532917084835,"cmd":"start recv","to":"worker #7","ch":"in","value":3}
{"t":1453171532929056797,"cmd":"start send","from":"generate #31","ch":"in","value":4}
{"t":1453171532929221817,"cmd":"start recv","to":"worker #18","ch":"in","value":4}
{"t":1453171532941104211,"cmd":"start send","from":"generate #31","ch":"in","value":5}
{"t":1453171532941265219,"cmd":"start recv","to":"worker #8","ch":"in","value":5}
{"t":1453171532952977981,"cmd":"start send","from":"generate #31","ch":"in","value":6}

Sample usage

Implementing a Concurrent Floodfill with Golang

Sample usage

Sample usage

FloodFill

TODO

  • Make it work with any Go program
  • Vizualize using Oculus Rift

Concurrency vs Parallelism

  • What is parallelism?
  • What is concurrency?
  • Let's see now...

That is parallelism

That is parallelism

And that is concurrency

That is concurrency

That is concurrency

That is concurrency

That is concurrency

Links






Thank you