Go言語入門:よくある質問 -Concurrency Vol.2-

スポンサーリンク
Go言語入門:よくある質問 -Concurrency Vol.2- ノウハウ
Go言語入門:よくある質問 -Concurrency Vol.2-
この記事は約31分で読めます。
よっしー
よっしー

こんにちは。よっしーです(^^)

本日は、Go言語のよくある質問 について解説しています。

スポンサーリンク

背景

Go言語を学んでいると「なんでこんな仕様になっているんだろう?」「他の言語と違うのはなぜ?」といった疑問が湧いてきませんか。Go言語の公式サイトにあるFAQページには、そんな疑問に対する開発チームからの丁寧な回答がたくさん載っているんです。ただ、英語で書かれているため読むのに少しハードルがあるのも事実で、今回はこのFAQを日本語に翻訳して、Go言語への理解を深めていけたらと思い、これを読んだ時の内容を備忘として残しました。

Concurrency

なぜ私のプログラムはより多くのCPUでより速く動作しないのですか?

プログラムがより多くのCPUでより速く動作するかどうかは、それが解決している問題に依存します。Go言語はgoroutineやチャネルなどの並行プリミティブを提供しますが、並行性は根本的な問題が本質的に並列である場合にのみ並列化を可能にします。本質的に順次的な問題はより多くのCPUを追加することで高速化できませんが、並列で実行可能な部分に分割できる問題は、時には劇的に高速化できます。

時にはより多くのCPUを追加するとプログラムが遅くなることがあります。実際的には、有用な計算よりも同期や通信により多くの時間を費やすプログラムは、複数のOSスレッドを使用する際にパフォーマンスの低下を経験する可能性があります。これは、スレッド間でのデータの受け渡しがコンテキストの切り替えを含み、これには大きなコストがあり、そのコストはより多くのCPUで増加する可能性があるからです。例えば、Go仕様からの素数ふるいの例は、多くのgoroutineを起動するにもかかわらず、重要な並列性を持ちません。スレッド(CPU)の数を増やすことは、それを高速化するよりも遅くする可能性が高いです。

このトピックの詳細については、「Concurrency is not Parallelism」と題された講演を参照してください。

解説

この節では、なぜより多くのCPUを使ってもプログラムが速くならない場合があるのかについて説明されています。並行性と並列性の違い、そして実際のパフォーマンス制約について理解することが重要です。

並行性 vs 並列性

基本概念の理解

func demonstrateConcurrencyVsParallelism() {
    fmt.Println("並行性 vs 並列性の違い:")
    
    fmt.Println("並行性 (Concurrency):")
    fmt.Println("  - 複数のタスクを論理的に同時に扱う")
    fmt.Println("  - タスクが交互に実行される可能性")
    fmt.Println("  - 1つのCPUでも実現可能")
    
    fmt.Println("\n並列性 (Parallelism):")
    fmt.Println("  - 複数のタスクを物理的に同時に実行")
    fmt.Println("  - 複数のCPU/コアが必要")
    fmt.Println("  - 真の同時実行")
    
    // ランタイム情報の確認
    fmt.Printf("\n現在の環境:\n")
    fmt.Printf("  CPU数: %d\n", runtime.NumCPU())
    fmt.Printf("  GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
    fmt.Printf("  Goroutine数: %d\n", runtime.NumGoroutine())
    
    // 並行処理の例(必ずしも並列ではない)
    demonstrateConcurrentExecution()
}

func demonstrateConcurrentExecution() {
    fmt.Println("\n並行実行の例:")
    
    // シンプルな並行タスク
    tasks := []string{"タスクA", "タスクB", "タスクC", "タスクD"}
    var wg sync.WaitGroup
    
    start := time.Now()
    
    for i, task := range tasks {
        wg.Add(1)
        go func(id int, name string) {
            defer wg.Done()
            
            // CPU集約的な作業のシミュレーション
            sum := 0
            for j := 0; j < 1000000; j++ {
                sum += j
            }
            
            fmt.Printf("  %s (goroutine %d) 完了: sum=%d\n", name, id, sum)
        }(i, task)
    }
    
    wg.Wait()
    duration := time.Since(start)
    fmt.Printf("並行実行時間: %v\n", duration)
    
    // 順次実行との比較
    start = time.Now()
    for i, task := range tasks {
        sum := 0
        for j := 0; j < 1000000; j++ {
            sum += j
        }
        fmt.Printf("  %s (順次 %d) 完了: sum=%d\n", task, i, sum)
    }
    
    sequentialDuration := time.Since(start)
    fmt.Printf("順次実行時間: %v\n", sequentialDuration)
    fmt.Printf("速度比: %.2fx\n", float64(sequentialDuration)/float64(duration))
}

本質的に順次的な問題

並列化できない処理の例

func demonstrateSequentialProblems() {
    fmt.Println("本質的に順次的な問題の例:")
    
    // 1. フィボナッチ数列(依存関係がある)
    fibonacci := func(n int) int {
        if n <= 1 {
            return n
        }
        return fibonacci(n-1) + fibonacci(n-2)
    }
    
    // 順次計算
    start := time.Now()
    result := fibonacci(35)
    sequentialTime := time.Since(start)
    
    fmt.Printf("フィボナッチ(35) 順次: %d, 時間: %v\n", result, sequentialTime)
    
    // 並行で試してみる(効果なし)
    start = time.Now()
    resultChan := make(chan int, 1)
    go func() {
        resultChan <- fibonacci(35)
    }()
    result = <-resultChan
    concurrentTime := time.Since(start)
    
    fmt.Printf("フィボナッチ(35) 並行: %d, 時間: %v\n", result, concurrentTime)
    fmt.Printf("並行化の効果: %.2fx\n", float64(sequentialTime)/float64(concurrentTime))
    
    // 2. リンクリストの走査
    type Node struct {
        Value int
        Next  *Node
    }
    
    // リンクリストの作成
    head := &Node{Value: 1}
    current := head
    for i := 2; i <= 10000; i++ {
        current.Next = &Node{Value: i}
        current = current.Next
    }
    
    // 順次走査
    start = time.Now()
    sum := 0
    current = head
    for current != nil {
        sum += current.Value
        current = current.Next
    }
    sequentialTime = time.Since(start)
    
    fmt.Printf("リンクリスト走査 順次: sum=%d, 時間: %v\n", sum, sequentialTime)
    
    // 並行走査は困難(各ノードが前のノードに依存)
    fmt.Println("リンクリストは本質的に順次的で並列化困難")
}

並列化可能な問題

効果的に並列化できる処理

func demonstrateParallelizableProblems() {
    fmt.Println("並列化可能な問題の例:")
    
    // 1. 独立した計算の集合
    numbers := make([]int, 1000000)
    for i := range numbers {
        numbers[i] = rand.Intn(1000)
    }
    
    // 順次処理:各要素を2乗
    start := time.Now()
    sequentialResults := make([]int, len(numbers))
    for i, num := range numbers {
        sequentialResults[i] = num * num
    }
    sequentialTime := time.Since(start)
    
    // 並列処理:ワーカープールで分散
    start = time.Now()
    parallelResults := parallelSquare(numbers)
    parallelTime := time.Since(start)
    
    fmt.Printf("配列処理 順次: %v\n", sequentialTime)
    fmt.Printf("配列処理 並列: %v\n", parallelTime)
    fmt.Printf("速度向上: %.2fx\n", float64(sequentialTime)/float64(parallelTime))
    
    // 結果の検証
    equal := len(sequentialResults) == len(parallelResults)
    if equal {
        for i := 0; i < len(sequentialResults) && equal; i++ {
            if sequentialResults[i] != parallelResults[i] {
                equal = false
            }
        }
    }
    fmt.Printf("結果の一致: %t\n", equal)
    
    // 2. MapReduce パターン
    demonstrateMapReduce()
}

func parallelSquare(numbers []int) []int {
    numWorkers := runtime.NumCPU()
    chunkSize := len(numbers) / numWorkers
    if chunkSize == 0 {
        chunkSize = 1
    }
    
    results := make([]int, len(numbers))
    var wg sync.WaitGroup
    
    for i := 0; i < numWorkers; i++ {
        start := i * chunkSize
        end := start + chunkSize
        if i == numWorkers-1 {
            end = len(numbers) // 最後のワーカーは残りをすべて処理
        }
        
        if start >= len(numbers) {
            break
        }
        
        wg.Add(1)
        go func(start, end int) {
            defer wg.Done()
            for j := start; j < end; j++ {
                results[j] = numbers[j] * numbers[j]
            }
        }(start, end)
    }
    
    wg.Wait()
    return results
}

func demonstrateMapReduce() {
    fmt.Println("\nMapReduceパターンの例:")
    
    // 大きなテキストから単語数をカウント
    text := strings.Repeat("hello world go programming ", 100000)
    words := strings.Fields(text)
    
    // 順次処理
    start := time.Now()
    wordCount1 := make(map[string]int)
    for _, word := range words {
        wordCount1[word]++
    }
    sequentialTime := time.Since(start)
    
    // 並列MapReduce
    start = time.Now()
    wordCount2 := parallelWordCount(words)
    parallelTime := time.Since(start)
    
    fmt.Printf("単語カウント 順次: %v\n", sequentialTime)
    fmt.Printf("単語カウント 並列: %v\n", parallelTime)
    fmt.Printf("速度向上: %.2fx\n", float64(sequentialTime)/float64(parallelTime))
    
    // 結果の比較
    equal := len(wordCount1) == len(wordCount2)
    for word, count1 := range wordCount1 {
        if count2, exists := wordCount2[word]; !exists || count1 != count2 {
            equal = false
            break
        }
    }
    fmt.Printf("結果の一致: %t\n", equal)
}

func parallelWordCount(words []string) map[string]int {
    numWorkers := runtime.NumCPU()
    chunkSize := len(words) / numWorkers
    
    // Map フェーズ
    partialResults := make([]map[string]int, numWorkers)
    var wg sync.WaitGroup
    
    for i := 0; i < numWorkers; i++ {
        start := i * chunkSize
        end := start + chunkSize
        if i == numWorkers-1 {
            end = len(words)
        }
        
        wg.Add(1)
        go func(workerID, start, end int) {
            defer wg.Done()
            localCount := make(map[string]int)
            for j := start; j < end; j++ {
                localCount[words[j]]++
            }
            partialResults[workerID] = localCount
        }(i, start, end)
    }
    
    wg.Wait()
    
    // Reduce フェーズ
    finalResult := make(map[string]int)
    for _, partial := range partialResults {
        for word, count := range partial {
            finalResult[word] += count
        }
    }
    
    return finalResult
}

同期オーバーヘッドの問題

過度な同期によるパフォーマンス低下

func demonstrateSynchronizationOverhead() {
    fmt.Println("同期オーバーヘッドの問題:")
    
    // 1. 過度な mutex 使用
    type OverSynchronized struct {
        mu    sync.Mutex
        value int
    }
    
    func (os *OverSynchronized) increment() {
        os.mu.Lock()
        os.value++
        os.mu.Unlock()
    }
    
    func (os *OverSynchronized) get() int {
        os.mu.Lock()
        defer os.mu.Unlock()
        return os.value
    }
    
    // 高頻度の同期操作
    oversync := &OverSynchronized{}
    
    start := time.Now()
    var wg sync.WaitGroup
    numGoroutines := runtime.NumCPU() * 2
    
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 100000; j++ {
                oversync.increment()
                _ = oversync.get() // 不要な読み取り
            }
        }()
    }
    
    wg.Wait()
    oversyncTime := time.Since(start)
    
    // 2. アトミック操作による改善
    var atomicValue int64
    
    start = time.Now()
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 100000; j++ {
                atomic.AddInt64(&atomicValue, 1)
                _ = atomic.LoadInt64(&atomicValue)
            }
        }()
    }
    
    wg.Wait()
    atomicTime := time.Since(start)
    
    // 3. バッチ処理による改善
    start = time.Now()
    batchResults := make([]int64, numGoroutines)
    
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            localSum := int64(0)
            for j := 0; j < 100000; j++ {
                localSum++
            }
            batchResults[id] = localSum
        }(i)
    }
    
    wg.Wait()
    
    totalBatch := int64(0)
    for _, result := range batchResults {
        totalBatch += result
    }
    batchTime := time.Since(start)
    
    fmt.Printf("過度な同期: %v\n", oversyncTime)
    fmt.Printf("アトミック操作: %v (%.2fx faster)\n", atomicTime, 
               float64(oversyncTime)/float64(atomicTime))
    fmt.Printf("バッチ処理: %v (%.2fx faster)\n", batchTime, 
               float64(oversyncTime)/float64(batchTime))
    
    fmt.Printf("最終値: mutex=%d, atomic=%d, batch=%d\n", 
               oversync.get(), atomicValue, totalBatch)
}

コンテキストスイッチングのコスト

スレッド切り替えによるオーバーヘッド

func demonstrateContextSwitchingCost() {
    fmt.Println("コンテキストスイッチングのコスト:")
    
    // 1. 少数のgoroutineでの効率的な処理
    efficientWorkers := func(work []int) time.Duration {
        start := time.Now()
        numWorkers := runtime.NumCPU()
        chunkSize := len(work) / numWorkers
        
        var wg sync.WaitGroup
        results := make([]int64, numWorkers)
        
        for i := 0; i < numWorkers; i++ {
            wg.Add(1)
            go func(workerID int) {
                defer wg.Done()
                
                startIdx := workerID * chunkSize
                endIdx := startIdx + chunkSize
                if workerID == numWorkers-1 {
                    endIdx = len(work)
                }
                
                sum := int64(0)
                for j := startIdx; j < endIdx; j++ {
                    // CPU集約的な作業
                    sum += int64(work[j] * work[j])
                }
                results[workerID] = sum
            }(i)
        }
        
        wg.Wait()
        return time.Since(start)
    }
    
    // 2. 過剰な数のgoroutineでの非効率な処理
    inefficientWorkers := func(work []int) time.Duration {
        start := time.Now()
        var wg sync.WaitGroup
        results := make(chan int64, len(work))
        
        // 作業項目ごとにgoroutineを作成(非効率)
        for _, item := range work {
            wg.Add(1)
            go func(value int) {
                defer wg.Done()
                // 小さな作業にgoroutineを使用
                result := int64(value * value)
                results <- result
            }(item)
        }
        
        go func() {
            wg.Wait()
            close(results)
        }()
        
        // 結果を収集
        for range results {
            // 結果を処理
        }
        
        return time.Since(start)
    }
    
    // テストデータ
    work := make([]int, 10000)
    for i := range work {
        work[i] = rand.Intn(100)
    }
    
    efficientTime := efficientWorkers(work)
    inefficientTime := inefficientWorkers(work)
    
    fmt.Printf("効率的なワーカー: %v\n", efficientTime)
    fmt.Printf("非効率なワーカー: %v\n", inefficientTime)
    fmt.Printf("効率比: %.2fx\n", float64(inefficientTime)/float64(efficientTime))
    
    fmt.Println("\n最適化のポイント:")
    fmt.Println("• ワーカー数をCPU数に比例させる")
    fmt.Println("• 作業単位を適切なサイズにする")
    fmt.Println("• 過度なgoroutine作成を避ける")
    fmt.Println("• チャネル操作のオーバーヘッドを考慮する")
}

実際のボトルネック分析

パフォーマンス分析とプロファイリング

func demonstratePerformanceAnalysis() {
    fmt.Println("パフォーマンス分析の実例:")
    
    // CPU プロファイリングの例
    cpuProfile := func() {
        f, err := os.Create("cpu.prof")
        if err != nil {
            log.Fatal(err)
        }
        defer f.Close()
        
        pprof.StartCPUProfile(f)
        defer pprof.StopCPUProfile()
        
        // CPU集約的な処理
        data := make([]float64, 1000000)
        for i := range data {
            data[i] = math.Sin(float64(i)) * math.Cos(float64(i))
        }
    }
    
    // メモリプロファイリングの例
    memProfile := func() {
        // 大量のメモリ使用
        var data [][]byte
        for i := 0; i < 1000; i++ {
            data = append(data, make([]byte, 1024))
        }
        
        f, err := os.Create("mem.prof")
        if err != nil {
            log.Fatal(err)
        }
        defer f.Close()
        
        runtime.GC()
        pprof.WriteHeapProfile(f)
        
        // データを使用(GCによる削除を防ぐ)
        _ = len(data)
    }
    
    // 並行性のボトルネック分析
    analyzeBottlenecks := func() {
        fmt.Println("ボトルネック分析:")
        
        // I/O バウンドなタスク
        ioIntensiveTask := func() time.Duration {
            start := time.Now()
            var wg sync.WaitGroup
            
            for i := 0; i < 100; i++ {
                wg.Add(1)
                go func() {
                    defer wg.Done()
                    time.Sleep(time.Millisecond) // I/O のシミュレーション
                }()
            }
            
            wg.Wait()
            return time.Since(start)
        }
        
        // CPU バウンドなタスク
        cpuIntensiveTask := func() time.Duration {
            start := time.Now()
            var wg sync.WaitGroup
            
            for i := 0; i < runtime.NumCPU(); i++ {
                wg.Add(1)
                go func() {
                    defer wg.Done()
                    sum := 0
                    for j := 0; j < 10000000; j++ {
                        sum += j
                    }
                }()
            }
            
            wg.Wait()
            return time.Since(start)
        }
        
        ioTime := ioIntensiveTask()
        cpuTime := cpuIntensiveTask()
        
        fmt.Printf("I/O集約的タスク: %v\n", ioTime)
        fmt.Printf("CPU集約的タスク: %v\n", cpuTime)
        
        if ioTime < cpuTime {
            fmt.Println("→ I/Oバウンドでは並行性が効果的")
        } else {
            fmt.Println("→ CPU制約がボトルネック")
        }
    }
    
    fmt.Println("プロファイリング実行中...")
    cpuProfile()
    memProfile()
    analyzeBottlenecks()
    
    fmt.Println("\nプロファイリング結果の確認方法:")
    fmt.Println("go tool pprof cpu.prof")
    fmt.Println("go tool pprof mem.prof")
    fmt.Println("(pprof) top    # ホットスポット表示")
    fmt.Println("(pprof) web    # ブラウザで可視化")
}

最適化のガイドライン

効果的な並列化の指針

func demonstrateOptimizationGuidelines() {
    fmt.Println("並列化最適化のガイドライン:")
    
    guidelines := []struct {
        category string
        points   []string
    }{
        {
            "問題の性質を理解する",
            []string{
                "CPU集約的 vs I/O集約的",
                "独立性のある処理 vs 依存関係のある処理",
                "分割可能性の評価",
                "通信コストの見積もり",
            },
        },
        {
            "適切な並列度の選択",
            []string{
                "ワーカー数 ≈ CPU数(CPU集約的)",
                "ワーカー数 > CPU数(I/O集約的)",
                "作業単位の適切なサイズ",
                "オーバーヘッドとのバランス",
            },
        },
        {
            "同期の最小化",
            []string{
                "共有状態を避ける",
                "バッチ処理で同期回数を減らす",
                "アトミック操作を活用",
                "lock-freeアルゴリズムの検討",
            },
        },
        {
            "測定と改善",
            []string{
                "ベンチマークの実装",
                "プロファイリングによる分析",
                "ボトルネックの特定",
                "段階的な最適化",
            },
        },
    }
    
    for _, guideline := range guidelines {
        fmt.Printf("\n%s:\n", guideline.category)
        for _, point := range guideline.points {
            fmt.Printf("  • %s\n", point)
        }
    }
    
    // Amdahlの法則の実演
    fmt.Println("\nAmdahlの法則による理論的限界:")
    
    parallelPortions := []float64{0.5, 0.75, 0.9, 0.95, 0.99}
    cpuCounts := []int{2, 4, 8, 16, 32}
    
    for _, parallel := range parallelPortions {
        fmt.Printf("\n並列化可能部分: %.0f%%\n", parallel*100)
        for _, cpus := range cpuCounts {
            // Amdahlの法則: S = 1 / ((1-P) + P/N)
            sequential := 1.0 - parallel
            speedup := 1.0 / (sequential + parallel/float64(cpus))
            fmt.Printf("  %2dCPU: %.2fx speedup\n", cpus, speedup)
        }
    }
    
    fmt.Println("\n結論:")
    fmt.Println("• 並列化可能部分が少ないと、CPU数を増やしても効果は限定的")
    fmt.Println("• 95%以上が並列化可能でないと、大幅な性能向上は期待できない")
    fmt.Println("• 同期オーバーヘッドにより、実際の性能向上はさらに小さくなる")
}

まとめ

プログラムがより多くのCPUで速くならない理由:

  1. 問題の性質: 本質的に順次的な問題は並列化できない
  2. 同期オーバーヘッド: 過度な同期により性能が低下
  3. コンテキストスイッチング: スレッド切り替えのコスト
  4. Amdahlの法則: 順次部分が全体の性能を制限
  5. 適切でない並列度: CPU数と作業の不適切な配分

効果的な並列化のためには、問題の性質を理解し、適切な設計と測定に基づく最適化が不可欠です。

おわりに 

本日は、Go言語のよくある質問について解説しました。

よっしー
よっしー

何か質問や相談があれば、コメントをお願いします。また、エンジニア案件の相談にも随時対応していますので、お気軽にお問い合わせください。

それでは、また明日お会いしましょう(^^)

コメント

タイトルとURLをコピーしました