概要
本
かかった時間
読む前の状態
基本的なGoの構文(go, chan, select)などはわかる
この本を読んで達成したいことはなにか
Goの並行処理を正しく使うための知識(基本的なパターン、例外処理など)を再整理する
並行処理の本質的な理解をする。具体的には以下の質問に答えられるようにする
コンパイラのリオーダーを制御する必要性が説明できる
読む前後の変化
読書メモ
第 3 章:Goにおける並行処理の構成要素
3.1 ゴルーチン(Goroutine)
ゴルーチンはGoのプログラムで最も基本的な構成要素
ゴルーチンはコルーチンとして知られる高度な抽象化
コルーチンはプリエンプティブでない並行処理のサブルーチン。割り込みされることがない。
GoはM:Nスケジューラでゴルーチンを管理している
Mつのグリーンスレッド(ゴルーチンがスケジュールされる)をN個のOSのスレッドに対応させる
ゴルチーンの中でクロージャーを実行するとき、同じアドレス空間上で実行される
ゴルーチンは生成のコストが小さい(数キロバイト程度)
筆者のマシン(Go 1.13)で計測したときは8kb程度であった
Copy package main
import (
"fmt"
"runtime"
"sync"
)
func main() {
memConsumed := func() uint64 {
runtime.GC()
var s runtime.MemStats
runtime.ReadMemStats(&s)
return s.Sys
}
var c <-chan interface{}
var wg sync.WaitGroup
noop := func() {
wg.Done()
<-c
}
const numGoroutines = 1e5
wg.Add(numGoroutines)
before := memConsumed()
for i := 0; i < numGoroutines; i++ {
go noop()
}
wg.Wait()
after := memConsumed()
fmt.Printf("%.3fkb", float64(after-before)/numGoroutines/1000)
}
ちなみにサンプルプログラムでは、チャネルをcloseすることで、チャネルから値が送信されるのを待っているゴルーチンにゼロ値を送信するテクニックを使っている。closeされたチャネルからはゼロ値が取得できるため。
3.3 チャネル(channel)
3.4 select文
プログラムの処理とチャネルを安全に組み合わせるもの
書き込みや読み込みのチャネルはすべて同時に扱われ、どれが準備できたかを確認する
どれも実行できない場合は、select文でブロックされる
選択しているすべてのチャネルがブロックしているときになにかしたい場合のためにdefault statementを用いることができる
複数のチャネルから同時に読み込むことができる場合は一様な選択がされ、ランダムである。
例:selectの動作例
Copy package main
import (
"fmt"
"time"
)
func main() {
c := make(chan int)
select {
case num := <-c:
fmt.Println(num)
case n := <-time.After(1 * time.Second):
fmt.Printf("Time out. %v\n", n.Format("2006/01/02 15:04:05"))
}
}
Copy Time out. 2019/12/29 10:45:14
例2:一様なランダムにselectされる例
Copy package main
import (
"fmt"
)
func main() {
c1 := make(chan int)
c2 := make(chan int)
close(c1)
close(c2)
c1count, c2count := 0, 0
for i := 0; i < 1000; i++ {
select {
case <-c1:
c1count++
case <-c2:
c2count++
}
}
fmt.Printf("c1Count : %d, c2Count : %d\n", c1count, c2count)
}
Copy c1Count : 493, c2Count : 507
第 4 章:Goでの並行処理パターン
4.1 拘束
並行処理で安全な操作をする方法
メモリを共有するための同期プリミティブ(sync.Mutexなど)
イミュータブルなデータ
新しいデータを作りたい場合はデータをコピーしてから操作する
拘束によって保護されたデータ
データを唯一の並行プロセスからのみ得られることを確実にする考え方
アドホックとレキシカル
アドホック拘束とは、規約によって達成すること。静的解析などを利用しなければ規約を守り続けるのは難しい
レキシカル拘束とは、レキシカルスコープを使って適切なデータと並行処理のプリミティブだけを複数の並行プロセスが使えるように公開すること
チャネルは並行処理で安全に値を渡すことができるが、並行安全でないデータ構造の場合でもレキシカル拘束を用いることができる
同期を使用せずに、拘束を使用する目的は、パフォーマンスの向上と開発者に対する可読性の向上。同期で発生する問題を避けること。
例1:レキシカル拘束の例
Copy package main
import (
"fmt"
)
func main() {
// resultsチャネルへの書き込みを拘束して、他のゴルーチンからの書き込みを防いでいる
chanOwner := func() <-chan int {
results := make(chan int, 5)
go func() {
defer close(results)
for i := 0; i < 5; i++ {
results <- i
}
}()
return results
}
// 消費者は読み込みのみに拘束する
consumer := func(results <-chan int) {
for result := range results {
fmt.Printf("Recieved: %d\n", result)
}
fmt.Printf("Done recieving.\n")
}
results := chanOwner()
consumer(results)
}
例2:レキシカル拘束の例(並行安全でない場合)
Copy package main
import (
"bytes"
"fmt"
"sync"
)
func main() {
printData := func(wg *sync.WaitGroup, data []byte) {
defer wg.Done()
var buff bytes.Buffer
for _, b := range data {
fmt.Fprintf(&buff, "%c", b)
}
fmt.Println(buff.String())
}
var wg sync.WaitGroup
wg.Add(2)
data := []byte("golang")
// ゴルーチンが処理できる範囲を拘束している
go printData(&wg, data[:3])
go printData(&wg, data[3:])
wg.Wait()
}
4.2 for-selectループ
for-selectパターンは以下のパターンを指す
Copy // 無限ループ or なんからのイテレーション
for {
select {
// チャネルに対する操作
}
}
defautl句を使うときの注意点として、他のケースが成立しないとき間、ビジーループになってしまう。なのでdefault句は書かずにselectでブロックしておくのが良いと思っている。
4.3 ゴルーチンリークを避ける
ゴルーチンの生成コストは小さいとはいえ、コストはかかる。ガベージコレクションもされない。
ゴルーチンが終了せずにメモリ上に生き残る場合はゴルーチンリークにつながり、メモリの使用に影響がある。
ゴルーチンの親から子へシグナルを送信してキャンセルできるようにすることでゴルーチンリークを避けることができる
例:処理を待っているGoroutineを親のGoroutineがキャンセルする例
Copy package main
import (
"fmt"
"time"
)
func main() {
doWork := func(done <-chan interface{}, strings <-chan string) <-chan interface{} {
terminated := make(chan interface{})
go func() {
defer fmt.Println("dowork exited.")
defer close(terminated)
for {
select {
case s := <-strings:
fmt.Println(s)
case <-done:
return
}
}
}()
return terminated
}
done := make(chan interface{})
// Goroutineで動作
terminated := doWork(done, nil)
go func() {
time.Sleep(3 * time.Second)
fmt.Println("Canceling dowork goroutine...")
close(done)
}()
// チャネルからのデータを受信できるようになるまでブロックする
// この実装では close(done) をすることで doWork 関数内の Goroutine が return され
// 同時に close された terminated チャネルが return される
// よって terminated チャネルからゼロ値を受け取ることができるようになり main goroutine が終了する
<-terminated
fmt.Println("Done!")
}
チャネルをcloseすることでそのチャネルがアンブロッキングになり、ゼロ値を受信できるようにできるテクニックをここでも使っている。doWork関数内の無名関数はGoroutineで動作する。
ゴルーチンがゴルーチンの生成の責任を持っているのであれば、そのゴルーチンを停止できるようにする責任もある
4.4 orチャネル
1つ以上のdoneチャネルを1つのdoneチャネルにまとめて、まとめているチャネルのうちどれか1つのチャネルが閉じられたら、まとめたチャネルも閉じられるようにしたい
selectを使うこともOKだが、いくつのチャネルがあるかわからない時がある
システム内で複数のモジュールを組み合わせる際の継ぎ目として利用すると便利
コールスタック内でゴルーチンの木構造をキャンセルする条件が複数になる傾向があるためorチャネルパターンが便利
例:orチャネルパターンの実装例
Copy package main
import (
"fmt"
"time"
)
func main() {
var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
// 再帰関数の終了条件
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
// それぞれのselectでどれか1つのチャネルからデータを受信できればselect句を抜けorDoneをreturnしてGoroutineが終了する
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
// 3つ以上のチャネルがある場合は、チャネルsliceの先頭にorDoneチャネルを追加しつつ再帰的にor関数を呼び出す
// これによって呼び元のorDoneチャネルがcloseされた場合は再帰的に伝播する
case <-or(append(channels[3:], orDone)...):
}
}
}()
return orDone
}
sig := func(after time.Duration) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}
start := time.Now()
<-or(
sig(3*time.Hour),
sig(1*time.Minute),
sig(4*time.Second),
)
fmt.Printf("done after %v", time.Since(start))
}
Copy done after 4.0003293s
4.5 エラーハンドリング
一般的に並行プロセスはエラーを、プログラムの状態を完全に把握していて何をすべきかをより多くの情報に基づいて決定できる別の箇所へと送るべき
エラーはゴルーチンからreturnされる値を構築する際の第一級市民として捉えられるべき
例:結果とエラーをまとめる例
Copy package main
import (
"fmt"
"net/http"
)
type Result struct {
Error error
Response *http.Response
}
func main() {
checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result {
results := make(chan Result)
go func() {
defer close(results)
for _, url := range urls {
var result Result
resp, err := http.Get(url)
result = Result{
Error: err,
Response: resp,
}
select {
case <-done:
return
// resultsは呼び元の関数で受信される
case results <- result:
}
}
}()
// Goroutineで処理されるためチャネルは呼び元にすぐにreturnされる
// 関数の引数で与えられているurlsへのリクエストがすべて終了するとき処理が完了となる
return results
}
done := make(chan interface{})
defer close(done)
urls := []string{"https://www.google.com", "https://badhost"}
for result := range checkStatus(done, urls...) {
if result.Error != nil {
fmt.Printf("error: %v", result.Error)
continue
}
fmt.Printf("Response: %v\n", result.Response.Status)
}
}
関数の返り値を多値で (http.Response, error) などとしたいが、チャネルを用いる場合はこれができないので、上記の実装例のように構造体でまとめる必要があるのだろう。先にチャネルをreturnするが、非同期でGoroutineがHTTPリクエストを発行している。
4.6 パイプライン
パイプラインはシステムの抽象化に使える
データを受け取って、何からの処理を行って、どこかに渡すという一連の処理
パイプラインの性質
ステージは引き回せるように具体化されていなければならない
例:チャネルを使わないパイプラインの実装例
Copy package main
import "fmt"
func main() {
multiply := func(value, multiplier int) int {
return value * multiplier
}
add := func(value, additive int) int {
return value + additive
}
ints := []int{1, 2, 3, 4}
for _, v := range ints {
fmt.Println(multiply(add(multiply(v, 2), 1), 2))
}
}
例:チャネルを使ったパイプラインの実装例
Copy package main
import "fmt"
func main() {
// done チャネルを各関数の実装で考慮することで close(done) で全体のパイプラインを制御することができる
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int, len(integers))
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}
multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
return
case multipliedStream <- i * multiplier:
}
}
}()
return multipliedStream
}
add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i + additive:
}
}
}()
return addedStream
}
done := make(chan interface{})
defer close(done)
intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
for v := range pipeline {
fmt.Println(v)
}
}
4.7 ファンアウト、ファンイン
ファンアウト
パイプラインからの入力を扱うために複数のゴルーチンを起動するプロセスを説明する用語
ファンイン
複数の結果を1つのチャネルに結合するプロセスを説明する用語
4.8 or-doneチャネル
システムの完全に異なる部分から受け取ったチャネルを扱う場合
Copy for val := range myChan {
// valに対する何らかの処理
}
Copy loop:
for {
select {
case <-doneCh:
break loop
case myVal, ok := <- myChan:
if ok == false {
return
}
// myValに対する何らかの処理
}
}
以下のようにorDoneの処理を隠蔽化することができる
Copy package main
func main() {
// 4.5の実装に似ている。先にチャネルをreturnしてしまって、非同期でGoroutineがチャネルに対する処理を実施する
orDone := func(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v:
case <-done:
}
}
}()
return valStream
}
for val := range orDone(done, myChan) {
// valに対する何らかの処理
}
}
4.9 teeチャネル
チャネルからのストリームを2つに分けて、同じ値を2つの異なる場所で使いたい場合。Unixコマンドのteeのイメージ
4.10 bridgeチャネル
チャネルのチャネルを崩して、単一のチャネルにする技
Copy package main
import "fmt"
func main() {
bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
var stream <-chan interface{}
select {
// 元のチャネルのチャネルからチャネルを取得する
case myStream, ok := <-chanStream:
if ok == false {
return
}
stream = myStream
case <-done:
return
}
// 取得したチャネルから値を取得してreturnするチャネルにデータを送信する
for val := range orDone(done, stream) {
select {
case valStream <- val:
case <-done:
}
}
}
}()
return valStream
}
genVals := func() <-chan <-chan interface{} {
chanStream := make(chan (<-chan interface{}))
go func() {
defer close(chanStream)
for i := 0; i < 10; i++ {
// 値が1つ格納されているチャネルを生成
stream := make(chan interface{}, 1)
stream <- i
close(stream)
chanStream <- stream
}
}()
return chanStream
}
for v := range bridge(nil, genVals()) {
fmt.Printf("%v ", v)
}
}
4.11 キュー
バッファ付きチャネル
一種のキューだが、導入するのはプログラムを最適化する一番最後
キューの導入が早すぎると、デッドロックやライブロックなどの動機に関する問題を隠してしまう
キューの実用性は、あるステージの実行時間が他のステージの実行時間に影響を与えないようにステージを分離すること
ステージ内でのバッチによるリクエストが時間を節約する場合
ステージにおける遅延がシステムにフィードバックループを発生させる場合
4.12 contextパッケージ
contextパッケージの2つの目的
コールグラフを通じてリクエストに関するデータを渡すデータの置き場所を提供
関数内でキャンセルするときのシーン
ゴルーチン内のブロックしている処理がキャンセルされるように中断できる必要があるとき
Contextインターフェースは内部状態を変更できるメソッドを持っていない
コールスタックの上記の関数が下位の関数によってコンテキストをキャンセルされてしまうことを防ぐ
例:contextを使ってゴルーチンをキャンセルする実装例
Copy package main
import (
"context"
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
// 起点となる親のゴルーチンのため context.Background() で context を生成する
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg.Add(1)
go func() {
defer wg.Done()
if err := printGreeting(ctx); err != nil {
fmt.Printf("cannot print greeting: %v\n", err)
// 関数呼び出しがエラーになった場合は main Goroutine が context をキャンセル
cancel()
}
}()
wg.Wait()
}
func printGreeting(ctx context.Context) error {
greeting, err := genGreeting(ctx)
if err != nil {
return err
}
fmt.Printf("%s world\n", greeting)
return nil
}
func genGreeting(ctx context.Context) (string, error) {
// 新たにタイムアウトするcontextを生成している
// 上記のコールグラフから渡されたcontextも有効であるためcancel()された場合は、このcontextもキャンセルされる
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
switch locale, err := locale(ctx); {
case err != nil:
return "", err
case locale == "EN/US":
return "hello", nil
}
return "", fmt.Errorf("unsupported locale")
}
func locale(ctx context.Context) (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(1 * time.Minute):
}
return "EN/US", nil
}