Visualizing Concurrency
Go concurrency visually explained
Lviv, January 23, 2016
Modern world is concurrent
- Moore's Law is dead
- Multi-core CPUs
- ...
Software is not...
What is concurrency?
Concurrency (Wikipedia)
In computer science, concurrency ... is a property of systems in which several computations are executing simultaneously, and potentially interacting with each other.
Rob Pike - Concurrency is not parallelism
Concurrency vs parallelism
- Parallelism is simply running things in parallel.
- Concurrency is a way of structuring programs.
- Concurrent program can run either on single core or multiple cores.
Concurrency in Go
- Based on Tony Hoare's CSP'78 paper
- Channels
- Goroutines
- Select
Hello, concurrent world
package main
func main() {
// create new channel of type int
ch := make(chan int)
// start new anonymous goroutine
go func() {
// send 42 to channel
ch <- 42
}()
// read from channel
<-ch
}
Timer
package main
import "time"
func main() {
ch := time.Tick(100 * time.Millisecond)
<-ch
ch = time.After(1 * time.Second)
<-ch
i := 200 * time.Millisecond
ch = time.AfterFunc(i, func() {
println("Ran after 1 sec")
})
<-ch
}
Recurrent timers
package main
import "time"
func tick(d time.Duration) <-chan int {
c := make(chan int)
go func() {
time.Sleep(d)
c <- 1
}()
return c
}
func main() {
for i := 0; i < 24; i++ {
c := tick(1 * time.Second)
<-c
}
}
Ping-pong
func main() {
var Ball int
table := make(chan int)
go player(table)
go player(table)
table <- Ball
time.Sleep(1 * time.Second)
<-table
}
func player(table chan int) {
for {
ball := <-table
ball++
time.Sleep(100 * time.Millisecond)
table <- ball
}
}
Ping-pong #3
func main() {
var Ball int
table := make(chan int)
go player(table)
go player(table)
go player(table)
table <- Ball
time.Sleep(2 * time.Second)
<-table
}
func player(table chan int) {
for {
ball := <-table
ball++
time.Sleep(100e6)
table <- ball
}
}
Ping-pong #100
func main() {
var Ball int
table := make(chan int)
for i := 0; i < 100; i++ {
go player(table)
}
table <- Ball
time.Sleep(3 * time.Second)
<-table
}
func player(table chan int) {
for {
ball := <-table
ball++
time.Sleep(100 * time.Millisecond)
table <- ball
}
}
Fan-in
package main
import (
"fmt"
"time"
)
func producer1(ch chan int) {
var i int
for {
time.Sleep(100 * time.Millisecond)
ch <- i
i++
}
}
func producer2(ch chan int) {
var i int
for {
time.Sleep(300 * time.Millisecond)
ch <- i
i++
}
}
func reader(out chan int) {
for {
x := <-out
fmt.Println(x)
}
}
func main() {
ch := make(chan int)
go producer1(ch)
go producer2(ch)
out := make(chan int)
go reader(out)
for {
i := <-ch
out <- i
if i == 20 {
return
}
}
}
Servers
func handler(c net.Conn) {
c.Write([]byte("ok"))
c.Close()
}
func main() {
l, err := net.Listen("tcp", ":5000")
if err != nil {
panic(err)
}
for {
c, err := l.Accept()
if err != nil {
continue
}
go handler(c)
}
}
Servers
package main
import (
"fmt"
"net"
"time"
)
func main() {
l, err := net.Listen("tcp", ":5000")
if err != nil {
panic(err)
}
ch := make(chan string)
go logger(ch)
go server(l, ch)
time.Sleep(10 * time.Second)
}
func handler(c net.Conn, ch chan string) {
ch <- c.RemoteAddr().String()
c.Write([]byte("ok"))
c.Close()
}
func logger(ch chan string) {
for {
fmt.Println(<-ch)
}
}
func server(l net.Listener, ch chan string) {
for {
c, err := l.Accept()
if err != nil {
continue
}
go handler(c, ch)
}
}
Workers
func main() {
var wg sync.WaitGroup
wg.Add(36)
go pool(&wg, 36)
wg.Wait()
}
func pool(wg *sync.WaitGroup, n int) {
tasks := make(chan int)
for i := 0; i < n; i++ {
go worker(tasks, wg)
}
for i := 0; i < 50; i++ {
tasks <- i
}
close(tasks)
}
func worker(tasks <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
task, ok := <-tasks
if !ok {
return
}
d := time.Duration(task) * time.Millisecond
time.Sleep(d)
fmt.Println("processing task", task)
}
}
Workers
func main() {
var wg sync.WaitGroup
wg.Add(36)
go pool(&wg, 36)
wg.Wait()
}
func pool(wg *sync.WaitGroup, n int) {
tasks := make(chan int)
for i := 0; i < n; i++ {
go worker(tasks, wg)
}
for i := 0; i < 50; i++ {
tasks <- i
}
close(tasks)
}
func worker(tasks <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
task, ok := <-tasks
if !ok {
return
}
d := time.Duration(task) * time.Millisecond
time.Sleep(d)
fmt.Println("processing task", task)
}
}
Server+Worker
func handler(c net.Conn, ch chan string) {
ch <- c.RemoteAddr().String()
time.Sleep(100 * time.Millisecond)
c.Write([]byte("ok"))
c.Close()
}
func main() {
l, err := net.Listen("tcp", ":5000")
if err != nil {
panic(err)
}
ch := make(chan string)
go pool(ch, 4)
go server(l, ch)
time.Sleep(2 * time.Second)
}
func worker(wch chan int, results chan int) {
for {
data := <-wch
results <- data
}
}
func parse(results chan int) {
for { <-results }
}
func pool(ch chan string, n int) {
wch := make(chan int)
results := make(chan int)
for i := 0; i < n; i++ {
go worker(wch, results)
}
go parse(results)
for {
addr := <-ch
wch <- 42*len(addr)
}
}
Concurrent prime sieve
package main
import "fmt"
func main() {
ch := make(chan int)
go Generate(ch)
for i := 0; i < 10; i++ {
prime := <-ch
fmt.Println(prime)
out := make(chan int)
go Filter(ch, out, prime)
ch = out
}
}
func Generate(ch chan<- int) {
for i := 2; ; i++ {
ch <- i
}
}
func Filter(ch <-chan int, out chan<- int, prime int) {
for {
i := <-ch
if i%prime != 0 {
out <- i
}
}
}
GOMAXPROCS
GOMAXPROCS
const workers = 24
func pool(in, out chan int) {
for i := 0; i < workers; i++ {
go worker(in, out)
}
}
func worker(in, out chan int) {
for {
task := <-in
// heavy computation here
out <- task + 1
}
}
func generate(in chan int) {
for i := 0; i < 100; i++ {
in <- i
}
}
func print(out chan int) {
for i := 0; i < 100; i++ {
fmt.Println(<-out)
}
}
func main() {
runtime.GOMAXPROCS(1)
in := make(chan int)
out := make(chan int)
go pool(in, out)
go generate(in)
go print(out)
time.Sleep(1 * time.Second)
}
Go Concurrency Tracer
- Modify source AST
- Run and analyze trace
- Render w/ WebGL via Three.js
Go Concurrency Tracer
It turns this:
package main
func main() {
ch := make(chan int)
go func() {
ch <- 42
}()
<-ch
}
Go Concurrency Tracer
Into this:
...
func main() {
_encode(_cmd{Name: getGID("main"), Command: "create goroutine"})
defer _encode(_cmd{Name: getGID("main"), Command: "stop goroutine"})
ch := make(chan int)
go func() {
_encode(_cmd{Parent: "main", Name: getGID("go"), Command: "create goroutine"})
defer _encode(_cmd{Name: getGID("go"), Command: "stop goroutine"})
_encode(_cmd{From: getGID("go"), Channel: "ch", Value: 42, Command: "start send"})
ch <- 42
}()
_encode(_cmd{To: getGID("main"), Channel: "ch", Value: <-ch, Command: "start recv"})
}
func _encode(v _cmd) {
var out []byte
v.Time = time.Now().UnixNano()
out, _ = json.Marshal(v)
fmt.Fprintln(os.Stdout, "TRACE:", string(out))
}
...
Go Concurrency Tracer
Next, analyze raw trace:
{"t":1453171532768286484,"cmd":"create goroutine","name":"worker #27","parent":"pool #5"}
{"t":1453171532768328577,"cmd":"create goroutine","name":"worker #26","parent":"pool #5"}
{"t":1453171532768432492,"cmd":"create goroutine","name":"worker #29","parent":"pool #5"}
{"t":1453171532768435406,"cmd":"create goroutine","name":"worker #28","parent":"pool #5"}
{"t":1453171532768596996,"cmd":"create goroutine","name":"worker #20","parent":"pool #5"}
{"t":1453171532868018479,"cmd":"create goroutine","name":"print #32","parent":"main"}
{"t":1453171532868131248,"cmd":"create goroutine","name":"generate #31","parent":"main"}
{"t":1453171532879915073,"cmd":"start send","from":"generate #31","ch":"in","value":0}
{"t":1453171532880021908,"cmd":"start recv","to":"worker #30","ch":"in","value":0}
{"t":1453171532892937700,"cmd":"start send","from":"generate #31","ch":"in","value":1}
{"t":1453171532893072787,"cmd":"start recv","to":"worker #16","ch":"in","value":1}
{"t":1453171532904954369,"cmd":"start send","from":"generate #31","ch":"in","value":2}
{"t":1453171532905077725,"cmd":"start recv","to":"worker #17","ch":"in","value":2}
{"t":1453171532916999950,"cmd":"start send","from":"generate #31","ch":"in","value":3}
{"t":1453171532917084835,"cmd":"start recv","to":"worker #7","ch":"in","value":3}
{"t":1453171532929056797,"cmd":"start send","from":"generate #31","ch":"in","value":4}
{"t":1453171532929221817,"cmd":"start recv","to":"worker #18","ch":"in","value":4}
{"t":1453171532941104211,"cmd":"start send","from":"generate #31","ch":"in","value":5}
{"t":1453171532941265219,"cmd":"start recv","to":"worker #8","ch":"in","value":5}
{"t":1453171532952977981,"cmd":"start send","from":"generate #31","ch":"in","value":6}
Sample usage
Sample usage
Sample usage
TODO
- Make it work with any Go program
- Vizualize using Oculus Rift
Concurrency vs Parallelism
- What is parallelism?
- What is concurrency?
- Let's see now...