reading-books
  • Introduction
  • Computer Systems
    • はじめて読むマシン語
    • プロセッサを支える技術
    • オペレーティングシステム入門
  • Architecture
    • データ指向アプリケーションデザイン
    • Web API The Good Parts
  • Code Reading
    • MINCSコードリーディング
  • In Progress
    • Goによる並行処理
    • ビッグデータを支える技術
Powered by GitBook
On this page
  • 概要
  • 本
  • かかった時間
  • 読む前の状態
  • この本を読んで達成したいことはなにか
  • 読む前後の変化
  • 読書メモ
  • 第 3 章:Goにおける並行処理の構成要素
  • 第 4 章:Goでの並行処理パターン

Was this helpful?

  1. In Progress

Goによる並行処理

概要

本

  • Goによる並行処理

かかった時間

  • 3.5 時間

読む前の状態

  • 前に一度通読している

  • 基本的なGoの構文(go, chan, select)などはわかる

この本を読んで達成したいことはなにか

  • Goの並行処理を正しく使うための知識(基本的なパターン、例外処理など)を再整理する

  • 並行処理の本質的な理解をする。具体的には以下の質問に答えられるようにする

    • メモリバリア命令の存在理由を説明できる

    • コンパイラのリオーダーを制御する必要性が説明できる

読む前後の変化

読書メモ

第 3 章:Goにおける並行処理の構成要素

3.1 ゴルーチン(Goroutine)

  • ゴルーチンはGoのプログラムで最も基本的な構成要素

  • ゴルーチンはコルーチンとして知られる高度な抽象化

    • コルーチンはプリエンプティブでない並行処理のサブルーチン。割り込みされることがない。

  • GoはM:Nスケジューラでゴルーチンを管理している

    • Mつのグリーンスレッド(ゴルーチンがスケジュールされる)をN個のOSのスレッドに対応させる

  • fork-joinモデル

  • ゴルチーンの中でクロージャーを実行するとき、同じアドレス空間上で実行される

    • 変数の参照など

  • ゴルーチンは生成のコストが小さい(数キロバイト程度)

    • 筆者のマシン(Go 1.13)で計測したときは8kb程度であった

package main

import (
    "fmt"
    "runtime"
    "sync"
)

func main() {
    memConsumed := func() uint64 {
        runtime.GC()
        var s runtime.MemStats
        runtime.ReadMemStats(&s)
        return s.Sys
    }

    var c <-chan interface{}
    var wg sync.WaitGroup
    noop := func() {
        wg.Done()
        <-c
    }
    const numGoroutines = 1e5
    wg.Add(numGoroutines)
    before := memConsumed()
    for i := 0; i < numGoroutines; i++ {
        go noop()
    }
    wg.Wait()
    after := memConsumed()
    fmt.Printf("%.3fkb", float64(after-before)/numGoroutines/1000)
}
8.939kb

ちなみにサンプルプログラムでは、チャネルをcloseすることで、チャネルから値が送信されるのを待っているゴルーチンにゼロ値を送信するテクニックを使っている。closeされたチャネルからはゼロ値が取得できるため。

3.3 チャネル(channel)

3.4 select文

  • プログラムの処理とチャネルを安全に組み合わせるもの

  • 書き込みや読み込みのチャネルはすべて同時に扱われ、どれが準備できたかを確認する

  • どれも実行できない場合は、select文でブロックされる

  • 選択しているすべてのチャネルがブロックしているときになにかしたい場合のためにdefault statementを用いることができる

  • 複数のチャネルから同時に読み込むことができる場合は一様な選択がされ、ランダムである。

例:selectの動作例

package main

import (
    "fmt"
    "time"
)

func main() {
    c := make(chan int)
    select {
    case num := <-c:
        fmt.Println(num)
    case n := <-time.After(1 * time.Second):
        fmt.Printf("Time out. %v\n", n.Format("2006/01/02 15:04:05"))
    }
}
Time out. 2019/12/29 10:45:14

例2:一様なランダムにselectされる例

package main

import (
    "fmt"
)

func main() {
    c1 := make(chan int)
    c2 := make(chan int)
    close(c1)
    close(c2)

    c1count, c2count := 0, 0

    for i := 0; i < 1000; i++ {
        select {
        case <-c1:
            c1count++
        case <-c2:
            c2count++
        }
    }

    fmt.Printf("c1Count : %d, c2Count : %d\n", c1count, c2count)
}
c1Count : 493, c2Count : 507

第 4 章:Goでの並行処理パターン

4.1 拘束

  • 並行処理で安全な操作をする方法

    • メモリを共有するための同期プリミティブ(sync.Mutexなど)

    • 通信による同期(チャネル)

    • イミュータブルなデータ

      • 新しいデータを作りたい場合はデータをコピーしてから操作する

    • 拘束によって保護されたデータ

      • データを唯一の並行プロセスからのみ得られることを確実にする考え方

      • アドホックとレキシカル

        • アドホック拘束とは、規約によって達成すること。静的解析などを利用しなければ規約を守り続けるのは難しい

        • レキシカル拘束とは、レキシカルスコープを使って適切なデータと並行処理のプリミティブだけを複数の並行プロセスが使えるように公開すること

          • チャネルは並行処理で安全に値を渡すことができるが、並行安全でないデータ構造の場合でもレキシカル拘束を用いることができる

      • 同期を使用せずに、拘束を使用する目的は、パフォーマンスの向上と開発者に対する可読性の向上。同期で発生する問題を避けること。

例1:レキシカル拘束の例

package main

import (
    "fmt"
)

func main() {
    // resultsチャネルへの書き込みを拘束して、他のゴルーチンからの書き込みを防いでいる
    chanOwner := func() <-chan int {
        results := make(chan int, 5)
        go func() {
            defer close(results)
            for i := 0; i < 5; i++ {
                results <- i
            }
        }()
        return results
    }

    // 消費者は読み込みのみに拘束する
    consumer := func(results <-chan int) {
        for result := range results {
            fmt.Printf("Recieved: %d\n", result)
        }
        fmt.Printf("Done recieving.\n")
    }

    results := chanOwner()
    consumer(results)
}

例2:レキシカル拘束の例(並行安全でない場合)

package main

import (
    "bytes"
    "fmt"
    "sync"
)

func main() {
    printData := func(wg *sync.WaitGroup, data []byte) {
        defer wg.Done()

        var buff bytes.Buffer
        for _, b := range data {
            fmt.Fprintf(&buff, "%c", b)
        }
        fmt.Println(buff.String())
    }

    var wg sync.WaitGroup
    wg.Add(2)
  data := []byte("golang")
  // ゴルーチンが処理できる範囲を拘束している
    go printData(&wg, data[:3])
    go printData(&wg, data[3:])

    wg.Wait()
}

4.2 for-selectループ

  • for-selectパターンは以下のパターンを指す

// 無限ループ or なんからのイテレーション
for {
  select {
  // チャネルに対する操作
  }
}
  • パターンを使うときのシナリオ

    • チャネルから繰り返しの変数を送出する

    • 停止シグナルを待つ無限ループ

defautl句を使うときの注意点として、他のケースが成立しないとき間、ビジーループになってしまう。なのでdefault句は書かずにselectでブロックしておくのが良いと思っている。

4.3 ゴルーチンリークを避ける

  • ゴルーチンの生成コストは小さいとはいえ、コストはかかる。ガベージコレクションもされない。

  • ゴルーチンが終了する場合

    • ゴルーチンが処理を完了する場合

    • 回復できないエラーによって処理を続けられない場合

    • 停止するように命令された場合

  • ゴルーチンが終了せずにメモリ上に生き残る場合はゴルーチンリークにつながり、メモリの使用に影響がある。

  • ゴルーチンの親から子へシグナルを送信してキャンセルできるようにすることでゴルーチンリークを避けることができる

例:処理を待っているGoroutineを親のGoroutineがキャンセルする例

package main

import (
    "fmt"
    "time"
)

func main() {
    doWork := func(done <-chan interface{}, strings <-chan string) <-chan interface{} {
        terminated := make(chan interface{})
        go func() {
            defer fmt.Println("dowork exited.")
            defer close(terminated)
            for {
                select {
                case s := <-strings:
                    fmt.Println(s)
                case <-done:
                    return
                }
            }
        }()
        return terminated
    }

    done := make(chan interface{})

    // Goroutineで動作
    terminated := doWork(done, nil)

    go func() {
        time.Sleep(3 * time.Second)
        fmt.Println("Canceling dowork goroutine...")
        close(done)
    }()

    // チャネルからのデータを受信できるようになるまでブロックする
    // この実装では close(done) をすることで doWork 関数内の Goroutine が return され
    // 同時に close された terminated チャネルが return される
    // よって terminated チャネルからゼロ値を受け取ることができるようになり main goroutine が終了する
    <-terminated
    fmt.Println("Done!")
}

チャネルをcloseすることでそのチャネルがアンブロッキングになり、ゼロ値を受信できるようにできるテクニックをここでも使っている。doWork関数内の無名関数はGoroutineで動作する。

  • ゴルーチンがゴルーチンの生成の責任を持っているのであれば、そのゴルーチンを停止できるようにする責任もある

  • doneChを渡す方法が基本的

4.4 orチャネル

  • 1つ以上のdoneチャネルを1つのdoneチャネルにまとめて、まとめているチャネルのうちどれか1つのチャネルが閉じられたら、まとめたチャネルも閉じられるようにしたい

    • selectを使うこともOKだが、いくつのチャネルがあるかわからない時がある

    • or チャネルパターンを用いることができる

  • システム内で複数のモジュールを組み合わせる際の継ぎ目として利用すると便利

    • コールスタック内でゴルーチンの木構造をキャンセルする条件が複数になる傾向があるためorチャネルパターンが便利

例:orチャネルパターンの実装例

package main

import (
    "fmt"
    "time"
)

func main() {
    var or func(channels ...<-chan interface{}) <-chan interface{}
    or = func(channels ...<-chan interface{}) <-chan interface{} {
        switch len(channels) {
        // 再帰関数の終了条件
        case 0:
            return nil
        case 1:
            return channels[0]
        }

        orDone := make(chan interface{})
        go func() {
            defer close(orDone)

            // それぞれのselectでどれか1つのチャネルからデータを受信できればselect句を抜けorDoneをreturnしてGoroutineが終了する
            switch len(channels) {
            case 2:
                select {
                case <-channels[0]:
                case <-channels[1]:
                }
            default:
                select {
                case <-channels[0]:
                case <-channels[1]:
                case <-channels[2]:
                // 3つ以上のチャネルがある場合は、チャネルsliceの先頭にorDoneチャネルを追加しつつ再帰的にor関数を呼び出す
                // これによって呼び元のorDoneチャネルがcloseされた場合は再帰的に伝播する
                case <-or(append(channels[3:], orDone)...):
                }
            }
        }()
        return orDone
    }

    sig := func(after time.Duration) <-chan interface{} {
        c := make(chan interface{})
        go func() {
            defer close(c)
            time.Sleep(after)
        }()
        return c
    }

    start := time.Now()
    <-or(
        sig(3*time.Hour),
        sig(1*time.Minute),
        sig(4*time.Second),
    )
    fmt.Printf("done after %v", time.Since(start))
}
done after 4.0003293s

4.5 エラーハンドリング

  • 誰がそのエラーを処理する責任を持つべきか

  • 一般的に並行プロセスはエラーを、プログラムの状態を完全に把握していて何をすべきかをより多くの情報に基づいて決定できる別の箇所へと送るべき

  • 取得されるであろう結果とエラーを対にする

  • エラーはゴルーチンからreturnされる値を構築する際の第一級市民として捉えられるべき

例:結果とエラーをまとめる例

package main

import (
    "fmt"
    "net/http"
)

type Result struct {
    Error    error
    Response *http.Response
}

func main() {
    checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result {
        results := make(chan Result)
        go func() {
            defer close(results)

            for _, url := range urls {
                var result Result
                resp, err := http.Get(url)
                result = Result{
                    Error:    err,
                    Response: resp,
                }
                select {
                case <-done:
                    return
                // resultsは呼び元の関数で受信される
                case results <- result:
                }
            }
        }()

        // Goroutineで処理されるためチャネルは呼び元にすぐにreturnされる
        // 関数の引数で与えられているurlsへのリクエストがすべて終了するとき処理が完了となる
        return results
    }

    done := make(chan interface{})
    defer close(done)

    urls := []string{"https://www.google.com", "https://badhost"}
    for result := range checkStatus(done, urls...) {
        if result.Error != nil {
            fmt.Printf("error: %v", result.Error)
            continue
        }
        fmt.Printf("Response: %v\n", result.Response.Status)
    }
}

関数の返り値を多値で (http.Response, error) などとしたいが、チャネルを用いる場合はこれができないので、上記の実装例のように構造体でまとめる必要があるのだろう。先にチャネルをreturnするが、非同期でGoroutineがHTTPリクエストを発行している。

4.6 パイプライン

  • パイプラインはシステムの抽象化に使える

    • データを受け取って、何からの処理を行って、どこかに渡すという一連の処理

    • 各ステージでの懸念事項を独立させることができる

    • パイプラインの性質

      • ステージは受け取るものと返すものが同じ型である

      • ステージは引き回せるように具体化されていなければならない

例:チャネルを使わないパイプラインの実装例

package main

import "fmt"

func main() {
    multiply := func(value, multiplier int) int {
        return value * multiplier
    }
    add := func(value, additive int) int {
        return value + additive
    }
    ints := []int{1, 2, 3, 4}
    for _, v := range ints {
        fmt.Println(multiply(add(multiply(v, 2), 1), 2))
    }
}

例:チャネルを使ったパイプラインの実装例

package main

import "fmt"

func main() {
    // done チャネルを各関数の実装で考慮することで close(done) で全体のパイプラインを制御することができる
    generator := func(done <-chan interface{}, integers ...int) <-chan int {
        intStream := make(chan int, len(integers))
        go func() {
            defer close(intStream)
            for _, i := range integers {
                select {
                case <-done:
                    return
                case intStream <- i:
                }
            }
        }()
        return intStream
    }

    multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
        multipliedStream := make(chan int)
        go func() {
            defer close(multipliedStream)
            for i := range intStream {
                select {
                case <-done:
                    return
                case multipliedStream <- i * multiplier:
                }
            }
        }()
        return multipliedStream
    }

    add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
        addedStream := make(chan int)
        go func() {
            defer close(addedStream)
            for i := range intStream {
                select {
                case <-done:
                    return
                case addedStream <- i + additive:
                }
            }
        }()
        return addedStream
    }

    done := make(chan interface{})
    defer close(done)

    intStream := generator(done, 1, 2, 3, 4)
    pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
    for v := range pipeline {
        fmt.Println(v)
    }
}

4.7 ファンアウト、ファンイン

  • ファンアウト

    • パイプラインからの入力を扱うために複数のゴルーチンを起動するプロセスを説明する用語

  • ファンイン

    • 複数の結果を1つのチャネルに結合するプロセスを説明する用語

  • ファンアウトを利用するシーン

    • そのステージがより前の計算結果に依存していない

    • 実行が長時間に及ぶ

4.8 or-doneチャネル

  • システムの完全に異なる部分から受け取ったチャネルを扱う場合

    • 例えばdoneChによるゴルーチンのキャンセル

  • コードの可読性が落ちる

for val := range myChan {
    // valに対する何らかの処理
}
loop:
for {
    select {
    case <-doneCh:
        break loop
    case myVal, ok := <- myChan:
        if ok == false {
            return
        }
        // myValに対する何らかの処理
    }
}
  • 以下のようにorDoneの処理を隠蔽化することができる

package main

func main() {
    // 4.5の実装に似ている。先にチャネルをreturnしてしまって、非同期でGoroutineがチャネルに対する処理を実施する
    orDone := func(done, c <-chan interface{}) <-chan interface{} {
        valStream := make(chan interface{})
        go func() {
            defer close(valStream)
            select {
            case <-done:
                return
            case v, ok := <-c:
                if ok == false {
                    return
                }
                select {
                case valStream <- v:
                case <-done:
                }
            }
        }()
        return valStream
    }

    for val := range orDone(done, myChan) {
        // valに対する何らかの処理
    }
}

4.9 teeチャネル

  • チャネルからのストリームを2つに分けて、同じ値を2つの異なる場所で使いたい場合。Unixコマンドのteeのイメージ

4.10 bridgeチャネル

  • チャネルのチャネルを崩して、単一のチャネルにする技

package main

import "fmt"

func main() {
    bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
        valStream := make(chan interface{})
        go func() {
            defer close(valStream)
            for {
                var stream <-chan interface{}
                select {
                // 元のチャネルのチャネルからチャネルを取得する
                case myStream, ok := <-chanStream:
                    if ok == false {
                        return
                    }
                    stream = myStream
                case <-done:
                    return
                }
                // 取得したチャネルから値を取得してreturnするチャネルにデータを送信する
                for val := range orDone(done, stream) {
                    select {
                    case valStream <- val:
                    case <-done:
                    }
                }
            }
        }()
        return valStream
    }

    genVals := func() <-chan <-chan interface{} {
        chanStream := make(chan (<-chan interface{}))
        go func() {
            defer close(chanStream)
            for i := 0; i < 10; i++ {
                // 値が1つ格納されているチャネルを生成
                stream := make(chan interface{}, 1)
                stream <- i
                close(stream)
                chanStream <- stream
            }
        }()
        return chanStream
    }

    for v := range bridge(nil, genVals()) {
        fmt.Printf("%v ", v)
    }
}

4.11 キュー

  • バッファ付きチャネル

    • 一種のキューだが、導入するのはプログラムを最適化する一番最後

    • キューの導入が早すぎると、デッドロックやライブロックなどの動機に関する問題を隠してしまう

    • キューの実用性は、あるステージの実行時間が他のステージの実行時間に影響を与えないようにステージを分離すること

      • ステージ内でのバッチによるリクエストが時間を節約する場合

      • ステージにおける遅延がシステムにフィードバックループを発生させる場合

    • キューは以下のどちらかのステージで実装されるべき

      • パイプラインの入り口

      • バッチ処理によって効率的になるステージの中

    • リトルの法則(L=λW)

      • L:システムの平均ユニット数

      • λ:ユニットの平均到達率

      • W:ユニットのシステム内での平均滞在時間

4.12 contextパッケージ

  • contextパッケージの2つの目的

    • コールグラフの各枝をキャンセルするAPIを提供

    • コールグラフを通じてリクエストに関するデータを渡すデータの置き場所を提供

  • 関数内でキャンセルするときのシーン

    • ゴルーチンの親がキャンセルをしたい場合

    • ゴルーチンが子がキャンセルをしたい場合

    • ゴルーチン内のブロックしている処理がキャンセルされるように中断できる必要があるとき

  • Contextインターフェースは内部状態を変更できるメソッドを持っていない

    • コールスタックの上記の関数が下位の関数によってコンテキストをキャンセルされてしまうことを防ぐ

例:contextを使ってゴルーチンをキャンセルする実装例

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    // 起点となる親のゴルーチンのため context.Background() で context を生成する
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := printGreeting(ctx); err != nil {
            fmt.Printf("cannot print greeting: %v\n", err)
            // 関数呼び出しがエラーになった場合は main Goroutine が context をキャンセル
            cancel()
        }
    }()
    wg.Wait()
}

func printGreeting(ctx context.Context) error {
    greeting, err := genGreeting(ctx)
    if err != nil {
        return err
    }
    fmt.Printf("%s world\n", greeting)
    return nil
}

func genGreeting(ctx context.Context) (string, error) {
    // 新たにタイムアウトするcontextを生成している
    // 上記のコールグラフから渡されたcontextも有効であるためcancel()された場合は、このcontextもキャンセルされる
    ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
    defer cancel()

    switch locale, err := locale(ctx); {
    case err != nil:
        return "", err
    case locale == "EN/US":
        return "hello", nil
    }
    return "", fmt.Errorf("unsupported locale")
}

func locale(ctx context.Context) (string, error) {
    select {
    case <-ctx.Done():
        return "", ctx.Err()
    case <-time.After(1 * time.Minute):
    }
    return "EN/US", nil
}
  • データバッグとしてのcontext(TODO)

PreviousIn ProgressNextビッグデータを支える技術

Last updated 5 years ago

Was this helpful?