Go言語入門:効果的なGo -Parallelization-

スポンサーリンク
Go言語入門:効果的なGo -Parallelization- ノウハウ
Go言語入門:効果的なGo -Parallelization-
この記事は約28分で読めます。
よっしー
よっしー

こんにちは。よっしーです(^^)

本日は、Go言語を効果的に使うためのガイドラインについて解説しています。

スポンサーリンク

背景

Go言語を学び始めて、より良いコードを書きたいと思い、Go言語の公式ドキュメント「Effective Go」を知りました。これは、いわば「Goらしいコードの書き方指南書」になります。単に動くコードではなく、効率的で保守性の高いコードを書くためのベストプラクティスが詰まっているので、これを読んだ時の内容を備忘として残しました。

並列化

これらのアイデアの別の応用は、複数のCPUコアにわたって計算を並列化することです。計算が独立して実行できる別々の部分に分割できる場合、それは並列化でき、各部分が完了したときにシグナルを送るチャンネルを使用します。

アイテムのベクトルに対して実行する高価な操作があり、各アイテムに対する操作の値が独立している場合を考えてみましょう。この理想化された例のように。

type Vector []float64

// v[i], v[i+1] ... v[n-1]まで操作を適用する。
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
    for ; i < n; i++ {
        v[i] += u.Op(v[i])
    }
    c <- 1    // この部分が完了したことをシグナル
}

ループ内で部分を独立して起動し、CPUごとに1つずつ実行します。それらは任意の順序で完了できますが、問題ありません。全てのゴルーチンを起動した後、チャンネルを空にして完了シグナルをカウントするだけです。

const numCPU = 4 // CPUコア数

func (v Vector) DoAll(u Vector) {
    c := make(chan int, numCPU)  // バッファリングはオプションですが賢明です。
    for i := 0; i < numCPU; i++ {
        go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
    }
    // チャンネルを空にする。
    for i := 0; i < numCPU; i++ {
        <-c    // 1つのタスクの完了を待つ
    }
    // 全て完了。
}

numCPUの定数値を作成する代わりに、適切な値をランタイムに尋ねることができます。関数runtime.NumCPUはマシン内のハードウェアCPUコア数を返すので、次のように書けます:

var numCPU = runtime.NumCPU()

また、runtime.GOMAXPROCSという関数もあり、これはGoプログラムが同時に実行できるユーザー指定のコア数を報告(または設定)します。デフォルトではruntime.NumCPUの値ですが、同様の名前のシェル環境変数を設定するか、正の数でこの関数を呼び出すことでオーバーライドできます。ゼロで呼び出すと値をクエリするだけです。したがって、ユーザーのリソース要求を尊重したい場合は、次のように書くべきです:

var numCPU = runtime.GOMAXPROCS(0)

並行性のアイデア—独立して実行するコンポーネントとしてプログラムを構造化すること—と並列性—効率のために複数のCPUで計算を並列実行すること—を混同しないよう注意してください。Goの並行性機能により一部の問題を並列計算として構造化することが容易になりますが、Goは並列言語ではなく並行言語であり、すべての並列化問題がGoのモデルに適合するわけではありません。この区別については、このブログ投稿で引用されている講演を参照してください。

並列化とは?

並列化は、1つの大きな計算を複数の小さな部分に分割し、複数のCPUコアで同時に実行することで処理速度を向上させる技術です。

基本的な並列化の例

package main

import (
    "fmt"
    "math"
    "runtime"
    "time"
)

type Vector []float64

// 重い計算をシミュレートする操作
func (u Vector) Op(x float64) float64 {
    // 計算集約的な操作をシミュレート
    result := x
    for i := 0; i < 1000; i++ {
        result = math.Sin(result) * math.Cos(result)
    }
    return result * 0.1
}

// 範囲を指定して部分的に処理
func (v Vector) DoSome(start, end int, u Vector, c chan int) {
    fmt.Printf("範囲 [%d:%d] 処理開始\n", start, end)
    
    for i := start; i < end; i++ {
        v[i] += u.Op(v[i])
    }
    
    fmt.Printf("範囲 [%d:%d] 処理完了\n", start, end)
    c <- 1 // 完了シグナル
}

// 全体を並列処理
func (v Vector) DoAll(u Vector) {
    numCPU := runtime.NumCPU()
    fmt.Printf("使用するCPUコア数: %d\n", numCPU)
    
    c := make(chan int, numCPU) // バッファ付きチャンネル
    chunkSize := len(v) / numCPU
    
    start := time.Now()
    
    // 各CPUコアにタスクを分散
    for i := 0; i < numCPU; i++ {
        startIdx := i * chunkSize
        endIdx := startIdx + chunkSize
        
        // 最後のチャンクは残りを全て処理
        if i == numCPU-1 {
            endIdx = len(v)
        }
        
        go v.DoSome(startIdx, endIdx, u, c)
    }
    
    // 全タスクの完了を待機
    for i := 0; i < numCPU; i++ {
        <-c
    }
    
    elapsed := time.Since(start)
    fmt.Printf("並列処理完了: %v\n", elapsed)
}

// 比較用:シーケンシャル処理
func (v Vector) DoSequential(u Vector) {
    fmt.Println("シーケンシャル処理開始")
    start := time.Now()
    
    for i := 0; i < len(v); i++ {
        v[i] += u.Op(v[i])
    }
    
    elapsed := time.Since(start)
    fmt.Printf("シーケンシャル処理完了: %v\n", elapsed)
}

func basicParallelizationExample() {
    fmt.Println("=== 基本的な並列化の例 ===")
    
    size := 10000
    v1 := make(Vector, size)
    v2 := make(Vector, size)
    u := make(Vector, size)
    
    // 初期データを設定
    for i := 0; i < size; i++ {
        v1[i] = float64(i) * 0.001
        v2[i] = float64(i) * 0.001
        u[i] = 1.0
    }
    
    // シーケンシャル処理
    v1.DoSequential(u)
    
    fmt.Println()
    
    // 並列処理
    v2.DoAll(u)
    
    // 結果の検証
    equal := true
    for i := 0; i < size; i++ {
        if math.Abs(v1[i]-v2[i]) > 1e-10 {
            equal = false
            break
        }
    }
    
    fmt.Printf("結果の一致: %v\n", equal)
}

func main() {
    basicParallelizationExample()
}

実用的な例:画像処理の並列化

package main

import (
    "fmt"
    "math"
    "runtime"
    "sync"
    "time"
)

// 画像を表現する構造体
type Image struct {
    Width  int
    Height int
    Data   [][]float64
}

func NewImage(width, height int) *Image {
    data := make([][]float64, height)
    for i := range data {
        data[i] = make([]float64, width)
    }
    return &Image{Width: width, Height: height, Data: data}
}

// ガウシアンフィルタを適用(重い処理)
func (img *Image) ApplyGaussianFilter(startRow, endRow int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    fmt.Printf("行 [%d:%d] フィルタ処理開始\n", startRow, endRow)
    
    // ガウシアンカーネル
    kernel := [][]float64{
        {1, 2, 1},
        {2, 4, 2},
        {1, 2, 1},
    }
    kernelSum := 16.0
    
    // 新しいデータ配列(この範囲のみ)
    newData := make([][]float64, endRow-startRow)
    for i := range newData {
        newData[i] = make([]float64, img.Width)
    }
    
    for y := startRow; y < endRow; y++ {
        for x := 0; x < img.Width; x++ {
            var sum float64
            
            // カーネルを適用
            for ky := -1; ky <= 1; ky++ {
                for kx := -1; kx <= 1; kx++ {
                    py := y + ky
                    px := x + kx
                    
                    // 境界チェック
                    if py >= 0 && py < img.Height && px >= 0 && px < img.Width {
                        sum += img.Data[py][px] * kernel[ky+1][kx+1]
                    }
                }
            }
            
            newData[y-startRow][x] = sum / kernelSum
        }
    }
    
    // 結果をコピーバック
    for y := startRow; y < endRow; y++ {
        copy(img.Data[y], newData[y-startRow])
    }
    
    fmt.Printf("行 [%d:%d] フィルタ処理完了\n", startRow, endRow)
}

func (img *Image) ProcessParallel() {
    fmt.Println("並列画像処理開始")
    
    numCPU := runtime.NumCPU()
    fmt.Printf("使用CPUコア数: %d\n", numCPU)
    
    var wg sync.WaitGroup
    chunkSize := img.Height / numCPU
    
    start := time.Now()
    
    for i := 0; i < numCPU; i++ {
        startRow := i * chunkSize
        endRow := startRow + chunkSize
        
        if i == numCPU-1 {
            endRow = img.Height
        }
        
        wg.Add(1)
        go img.ApplyGaussianFilter(startRow, endRow, &wg)
    }
    
    wg.Wait()
    elapsed := time.Since(start)
    fmt.Printf("並列処理完了: %v\n", elapsed)
}

func (img *Image) ProcessSequential() {
    fmt.Println("シーケンシャル画像処理開始")
    start := time.Now()
    
    var wg sync.WaitGroup
    wg.Add(1)
    img.ApplyGaussianFilter(0, img.Height, &wg)
    
    elapsed := time.Since(start)
    fmt.Printf("シーケンシャル処理完了: %v\n", elapsed)
}

func imageProcessingExample() {
    fmt.Println("=== 画像処理並列化の例 ===")
    
    // テスト用画像を作成
    img1 := NewImage(1000, 1000)
    img2 := NewImage(1000, 1000)
    
    // 初期データを設定(ランダムなパターン)
    for y := 0; y < img1.Height; y++ {
        for x := 0; x < img1.Width; x++ {
            value := math.Sin(float64(x)*0.01) * math.Cos(float64(y)*0.01)
            img1.Data[y][x] = value
            img2.Data[y][x] = value
        }
    }
    
    // シーケンシャル処理
    img1.ProcessSequential()
    
    fmt.Println()
    
    // 並列処理
    img2.ProcessParallel()
}

func main() {
    imageProcessingExample()
}

行列計算の並列化

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

type Matrix [][]float64

func NewMatrix(rows, cols int) Matrix {
    matrix := make(Matrix, rows)
    for i := range matrix {
        matrix[i] = make([]float64, cols)
    }
    return matrix
}

// 行列の一部を計算
func multiplyRows(a, b Matrix, result Matrix, startRow, endRow int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    fmt.Printf("行 [%d:%d] 計算開始\n", startRow, endRow)
    
    n := len(b[0]) // bの列数
    m := len(b)    // bの行数
    
    for i := startRow; i < endRow; i++ {
        for j := 0; j < n; j++ {
            sum := 0.0
            for k := 0; k < m; k++ {
                sum += a[i][k] * b[k][j]
            }
            result[i][j] = sum
        }
    }
    
    fmt.Printf("行 [%d:%d] 計算完了\n", startRow, endRow)
}

func (a Matrix) MultiplyParallel(b Matrix) Matrix {
    rows := len(a)
    cols := len(b[0])
    result := NewMatrix(rows, cols)
    
    numCPU := runtime.NumCPU()
    fmt.Printf("行列乗算(並列): %d CPUコア使用\n", numCPU)
    
    var wg sync.WaitGroup
    chunkSize := rows / numCPU
    
    start := time.Now()
    
    for i := 0; i < numCPU; i++ {
        startRow := i * chunkSize
        endRow := startRow + chunkSize
        
        if i == numCPU-1 {
            endRow = rows
        }
        
        wg.Add(1)
        go multiplyRows(a, b, result, startRow, endRow, &wg)
    }
    
    wg.Wait()
    elapsed := time.Since(start)
    fmt.Printf("並列行列乗算完了: %v\n", elapsed)
    
    return result
}

func (a Matrix) MultiplySequential(b Matrix) Matrix {
    rows := len(a)
    cols := len(b[0])
    common := len(b)
    result := NewMatrix(rows, cols)
    
    fmt.Println("行列乗算(シーケンシャル)開始")
    start := time.Now()
    
    for i := 0; i < rows; i++ {
        for j := 0; j < cols; j++ {
            sum := 0.0
            for k := 0; k < common; k++ {
                sum += a[i][k] * b[k][j]
            }
            result[i][j] = sum
        }
    }
    
    elapsed := time.Since(start)
    fmt.Printf("シーケンシャル行列乗算完了: %v\n", elapsed)
    
    return result
}

func matrixMultiplicationExample() {
    fmt.Println("=== 行列乗算並列化の例 ===")
    
    size := 500
    a := NewMatrix(size, size)
    b := NewMatrix(size, size)
    
    // 行列を初期化
    for i := 0; i < size; i++ {
        for j := 0; j < size; j++ {
            a[i][j] = float64(i + j)
            b[i][j] = float64(i * j)
        }
    }
    
    // シーケンシャル計算
    result1 := a.MultiplySequential(b)
    
    fmt.Println()
    
    // 並列計算
    result2 := a.MultiplyParallel(b)
    
    // 結果の検証
    equal := true
    for i := 0; i < size && equal; i++ {
        for j := 0; j < size && equal; j++ {
            if result1[i][j] != result2[i][j] {
                equal = false
            }
        }
    }
    
    fmt.Printf("結果の一致: %v\n", equal)
}

func main() {
    matrixMultiplicationExample()
}

CPU設定の動的調整

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func heavyComputation(id, iterations int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    start := time.Now()
    
    // 重い計算をシミュレート
    sum := 0.0
    for i := 0; i < iterations; i++ {
        sum += float64(i) * 0.001
    }
    
    elapsed := time.Since(start)
    fmt.Printf("ワーカー %d 完了: %v (結果: %.2f)\n", id, elapsed, sum)
}

func testWithDifferentCPUSettings() {
    fmt.Println("=== CPU設定の比較 ===")
    
    originalMaxProcs := runtime.GOMAXPROCS(0)
    fmt.Printf("元のGOMAXPROCS: %d\n", originalMaxProcs)
    fmt.Printf("利用可能なCPUコア数: %d\n", runtime.NumCPU())
    
    iterations := 50000000
    numWorkers := 8
    
    // 異なるGOMAXPROCS設定でテスト
    for _, maxProcs := range []int{1, 2, 4, runtime.NumCPU()} {
        fmt.Printf("\n--- GOMAXPROCS = %d ---\n", maxProcs)
        
        runtime.GOMAXPROCS(maxProcs)
        
        var wg sync.WaitGroup
        start := time.Now()
        
        for i := 0; i < numWorkers; i++ {
            wg.Add(1)
            go heavyComputation(i, iterations, &wg)
        }
        
        wg.Wait()
        totalTime := time.Since(start)
        
        fmt.Printf("総実行時間: %v\n", totalTime)
    }
    
    // 元の設定に戻す
    runtime.GOMAXPROCS(originalMaxProcs)
}

func demonstrateRuntimeFunctions() {
    fmt.Println("=== Runtime関数の例 ===")
    
    fmt.Printf("runtime.NumCPU(): %d\n", runtime.NumCPU())
    fmt.Printf("runtime.GOMAXPROCS(0): %d\n", runtime.GOMAXPROCS(0))
    fmt.Printf("runtime.NumGoroutine(): %d\n", runtime.NumGoroutine())
    
    // 推奨される設定方法
    numCPU := runtime.GOMAXPROCS(0) // ユーザー設定を尊重
    fmt.Printf("使用する並列度: %d\n", numCPU)
    
    // または、利用可能なCPU数を使用
    // numCPU := runtime.NumCPU()
}

func main() {
    demonstrateRuntimeFunctions()
    fmt.Println()
    testWithDifferentCPUSettings()
}

並行性 vs 並列性の理解

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

// 並行性の例:独立したタスクが協調
func concurrencyExample() {
    fmt.Println("=== 並行性の例 ===")
    fmt.Println("複数の独立したタスクが協調して動作")
    
    var wg sync.WaitGroup
    
    // タスク1: データを生成
    dataChan := make(chan int, 5)
    wg.Add(1)
    go func() {
        defer wg.Done()
        defer close(dataChan)
        
        for i := 1; i <= 10; i++ {
            dataChan <- i
            fmt.Printf("生成: %d\n", i)
            time.Sleep(100 * time.Millisecond)
        }
    }()
    
    // タスク2: データを処理
    processedChan := make(chan int, 5)
    wg.Add(1)
    go func() {
        defer wg.Done()
        defer close(processedChan)
        
        for data := range dataChan {
            processed := data * data
            processedChan <- processed
            fmt.Printf("処理: %d -> %d\n", data, processed)
            time.Sleep(150 * time.Millisecond)
        }
    }()
    
    // タスク3: 結果を保存
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        var results []int
        for processed := range processedChan {
            results = append(results, processed)
            fmt.Printf("保存: %d\n", processed)
        }
        fmt.Printf("最終結果: %v\n", results)
    }()
    
    wg.Wait()
}

// 並列性の例:同じタスクを複数のCPUで分散実行
func parallelismExample() {
    fmt.Println("\n=== 並列性の例 ===")
    fmt.Println("同じ計算を複数のCPUコアで分散実行")
    
    data := make([]int, 1000000)
    for i := range data {
        data[i] = i + 1
    }
    
    numCPU := runtime.NumCPU()
    fmt.Printf("使用CPUコア数: %d\n", numCPU)
    
    var wg sync.WaitGroup
    results := make([]int64, numCPU)
    chunkSize := len(data) / numCPU
    
    start := time.Now()
    
    for i := 0; i < numCPU; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            
            startIdx := workerID * chunkSize
            endIdx := startIdx + chunkSize
            if workerID == numCPU-1 {
                endIdx = len(data)
            }
            
            var sum int64
            for j := startIdx; j < endIdx; j++ {
                sum += int64(data[j])
            }
            
            results[workerID] = sum
            fmt.Printf("ワーカー %d: 範囲[%d:%d] 合計=%d\n", 
                workerID, startIdx, endIdx, sum)
        }(i)
    }
    
    wg.Wait()
    
    // 部分結果を合計
    var totalSum int64
    for _, partialSum := range results {
        totalSum += partialSum
    }
    
    elapsed := time.Since(start)
    fmt.Printf("並列計算完了: 総和=%d, 時間=%v\n", totalSum, elapsed)
}

func main() {
    concurrencyExample()
    parallelismExample()
    
    fmt.Println("\n=== 並行性 vs 並列性 ===")
    fmt.Println("並行性: プログラムの構造 - 独立したコンポーネントが協調")
    fmt.Println("並列性: 実行の方法 - 複数のCPUで同時実行")
    fmt.Println("Goは並行言語であり、並列実行もサポートします")
}

重要なポイント

1. 並列化の基本原則

  • 独立性: 各部分が独立して実行可能
  • 分割: 全体を均等な部分に分割
  • 同期: 全部分の完了を待機

2. CPU設定

runtime.NumCPU()        // ハードウェアCPUコア数
runtime.GOMAXPROCS(0)   // 使用可能なGoプロセス数(推奨)

3. 並列化パターン

  • 範囲分割: データを範囲で分割
  • チャンク処理: 固定サイズの塊で処理
  • 完了待ち: チャンネルやWaitGroupで同期

4. 適用例

  • 数値計算: ベクトル・行列演算
  • 画像処理: ピクセル単位の独立処理
  • データ処理: 大量データの変換・集計

5. 並行性 vs 並列性

  • 並行性(Concurrency): プログラムの構造化方法
  • 並列性(Parallelism): 実行の最適化方法
  • Goの位置づけ: 並行言語(並列実行もサポート)

6. 注意点

  • オーバーヘッド: 小さなタスクでは並列化の効果が薄い
  • メモリアクセス: キャッシュ効率を考慮
  • 負荷分散: 均等な分割が重要

7. パフォーマンス考慮事項

  • Amdahlの法則: 並列化可能な部分の割合が重要
  • スケーラビリティ: CPUコア数に応じた性能向上
  • メモリ帯域: CPUコア数が多いほどメモリがボトルネックに

並列化は、Goの並行性機能を活用した計算集約的タスクの最適化手法です。適切に使用することで、マルチコアCPUの性能を最大限に活用できます。

おわりに 

本日は、Go言語を効果的に使うためのガイドラインについて解説しました。

よっしー
よっしー

何か質問や相談があれば、コメントをお願いします。また、エンジニア案件の相談にも随時対応していますので、お気軽にお問い合わせください。

それでは、また明日お会いしましょう(^^)

コメント

タイトルとURLをコピーしました