Go言語入門:よくある質問 -Concurrency Vol.1-

スポンサーリンク
Go言語入門:よくある質問 -Concurrency Vol.1- ノウハウ
Go言語入門:よくある質問 -Concurrency Vol.1-
この記事は約29分で読めます。
よっしー
よっしー

こんにちは。よっしーです(^^)

本日は、Go言語のよくある質問 について解説しています。

スポンサーリンク

背景

Go言語を学んでいると「なんでこんな仕様になっているんだろう?」「他の言語と違うのはなぜ?」といった疑問が湧いてきませんか。Go言語の公式サイトにあるFAQページには、そんな疑問に対する開発チームからの丁寧な回答がたくさん載っているんです。ただ、英語で書かれているため読むのに少しハードルがあるのも事実で、今回はこのFAQを日本語に翻訳して、Go言語への理解を深めていけたらと思い、これを読んだ時の内容を備忘として残しました。

Concurrency

どの操作がアトミックですか?mutexについてはどうですか?

Goでの操作のアトミック性についての説明は、Go Memory Model文書で見つけることができます。

低レベルの同期とアトミックプリミティブは、syncとsync/atomicパッケージで利用できます。これらのパッケージは、参照カウントのインクリメントや小規模な相互排他の保証などの単純なタスクに適しています。

並行サーバー間の調整などの高レベル操作では、高レベル技術がより良いプログラムにつながることがあり、Goはgoroutineとチャネルを通してこのアプローチをサポートしています。例えば、特定のデータ片に対して常に一度に一つのgoroutineだけが責任を持つようにプログラムを構造化できます。そのアプローチは、元のGoの格言によって要約されています:

メモリを共有することで通信するな。代わりに、通信することでメモリを共有せよ。

この概念の詳細な議論については、Share Memory By Communicatingコードウォークとその関連記事を参照してください。

大規模な並行プログラムは、これら両方のツールキットから借用する可能性があります。

解説

この節では、Go言語における並行プログラミングでのアトミック操作とミューテックスについて説明されています。Goの並行プログラミングには、低レベルな同期プリミティブと高レベルなチャネルベースのアプローチという2つの主要な手法があります。

アトミック操作の基本

sync/atomic パッケージの使用

func demonstrateAtomicOperations() {
    fmt.Println("アトミック操作の例:")
    
    // アトミックカウンタの例
    var counter int64
    
    // 複数のgoroutineでアトミックに操作
    var wg sync.WaitGroup
    numGoroutines := 1000
    
    // 非アトミック操作(危険な例)
    var unsafeCounter int64
    
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            
            // アトミック操作(安全)
            atomic.AddInt64(&counter, 1)
            
            // 非アトミック操作(レースコンディション)
            unsafeCounter++
        }()
    }
    
    wg.Wait()
    
    fmt.Printf("アトミックカウンタ: %d\n", atomic.LoadInt64(&counter))
    fmt.Printf("非アトミックカウンタ: %d(期待値: %d)\n", unsafeCounter, numGoroutines)
    
    // 基本的なアトミック操作
    demonstrateBasicAtomicOps()
}

func demonstrateBasicAtomicOps() {
    fmt.Println("\n基本的なアトミック操作:")
    
    var val int64 = 10
    
    // Load - アトミックな読み取り
    loadedVal := atomic.LoadInt64(&val)
    fmt.Printf("Load: %d\n", loadedVal)
    
    // Store - アトミックな書き込み
    atomic.StoreInt64(&val, 20)
    fmt.Printf("Store後: %d\n", atomic.LoadInt64(&val))
    
    // Add - アトミックな加算
    newVal := atomic.AddInt64(&val, 5)
    fmt.Printf("Add(5)後: %d\n", newVal)
    
    // Swap - アトミックな交換
    oldVal := atomic.SwapInt64(&val, 100)
    fmt.Printf("Swap(100): 古い値=%d, 新しい値=%d\n", oldVal, atomic.LoadInt64(&val))
    
    // CompareAndSwap - 条件付き交換
    swapped := atomic.CompareAndSwapInt64(&val, 100, 200)
    fmt.Printf("CompareAndSwap(100->200): 成功=%t, 値=%d\n", swapped, atomic.LoadInt64(&val))
    
    swapped = atomic.CompareAndSwapInt64(&val, 100, 300)
    fmt.Printf("CompareAndSwap(100->300): 成功=%t, 値=%d\n", swapped, atomic.LoadInt64(&val))
}

アトミック操作の対応型

func demonstrateAtomicTypes() {
    fmt.Println("アトミック操作対応の型:")
    
    // 整数型
    var int32Val int32
    var int64Val int64
    var uint32Val uint32
    var uint64Val uint64
    var uintptrVal uintptr
    
    // ポインタ型
    var ptrVal unsafe.Pointer
    
    // アトミック操作の例
    atomic.StoreInt32(&int32Val, 42)
    atomic.StoreInt64(&int64Val, 1234567890)
    atomic.StoreUint32(&uint32Val, 99)
    atomic.StoreUint64(&uint64Val, 9876543210)
    atomic.StoreUintptr(&uintptrVal, 0xDEADBEEF)
    
    // ポインタのアトミック操作
    data := "Hello, World!"
    atomic.StorePointer(&ptrVal, unsafe.Pointer(&data))
    
    fmt.Printf("int32: %d\n", atomic.LoadInt32(&int32Val))
    fmt.Printf("int64: %d\n", atomic.LoadInt64(&int64Val))
    fmt.Printf("uint32: %d\n", atomic.LoadUint32(&uint32Val))
    fmt.Printf("uint64: %d\n", atomic.LoadUint64(&uint64Val))
    fmt.Printf("uintptr: 0x%X\n", atomic.LoadUintptr(&uintptrVal))
    
    // ポインタから値を取得
    loadedPtr := atomic.LoadPointer(&ptrVal)
    if loadedPtr != nil {
        str := *(*string)(loadedPtr)
        fmt.Printf("pointer: %s\n", str)
    }
    
    // atomic.Valueの使用(任意の型)
    var atomicValue atomic.Value
    
    atomicValue.Store("文字列値")
    if val := atomicValue.Load(); val != nil {
        fmt.Printf("atomic.Value: %s\n", val.(string))
    }
    
    atomicValue.Store(42)
    if val := atomicValue.Load(); val != nil {
        fmt.Printf("atomic.Value: %d\n", val.(int))
    }
}

Mutexによる同期

sync.Mutex の基本使用

type SafeCounter struct {
    mu    sync.Mutex
    value int
}

func (c *SafeCounter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *SafeCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func demonstrateMutex() {
    fmt.Println("Mutexによる同期:")
    
    counter := &SafeCounter{}
    var wg sync.WaitGroup
    
    // 複数のgoroutineで安全にカウンタを操作
    numGoroutines := 1000
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }
    
    wg.Wait()
    fmt.Printf("Mutexカウンタ: %d\n", counter.Value())
    
    // RWMutexの例
    demonstrateRWMutex()
}

type SafeMap struct {
    mu   sync.RWMutex
    data map[string]int
}

func NewSafeMap() *SafeMap {
    return &SafeMap{
        data: make(map[string]int),
    }
}

func (sm *SafeMap) Set(key string, value int) {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    sm.data[key] = value
}

func (sm *SafeMap) Get(key string) (int, bool) {
    sm.mu.RLock()  // 読み取り専用ロック
    defer sm.mu.RUnlock()
    value, exists := sm.data[key]
    return value, exists
}

func (sm *SafeMap) Delete(key string) {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    delete(sm.data, key)
}

func demonstrateRWMutex() {
    fmt.Println("\nRWMutexによる読み書き分離:")
    
    safeMap := NewSafeMap()
    var wg sync.WaitGroup
    
    // 書き込み処理
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            key := fmt.Sprintf("key%d", i)
            safeMap.Set(key, i*10)
            fmt.Printf("書き込み: %s = %d\n", key, i*10)
        }(i)
    }
    
    // 読み取り処理(より多くのgoroutine)
    for i := 0; i < 50; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            key := fmt.Sprintf("key%d", i%10)
            if value, exists := safeMap.Get(key); exists {
                fmt.Printf("読み取り: %s = %d\n", key, value)
            }
        }(i)
    }
    
    wg.Wait()
}

チャネルベースの並行プログラミング

“通信による共有”の実装

func demonstrateChannelCommunication() {
    fmt.Println("チャネルによる通信ベースの並行処理:")
    
    // ワーカープールパターン
    type Job struct {
        ID     int
        Data   string
        Result chan string
    }
    
    // ジョブを処理するワーカー
    worker := func(id int, jobs <-chan Job) {
        for job := range jobs {
            // 処理のシミュレーション
            processed := fmt.Sprintf("Worker%d processed: %s", id, job.Data)
            job.Result <- processed
        }
    }
    
    // ジョブチャネルを作成
    jobs := make(chan Job, 10)
    
    // 3つのワーカーを起動
    for i := 1; i <= 3; i++ {
        go worker(i, jobs)
    }
    
    // ジョブを送信
    var results []chan string
    for i := 1; i <= 5; i++ {
        result := make(chan string, 1)
        job := Job{
            ID:     i,
            Data:   fmt.Sprintf("data-%d", i),
            Result: result,
        }
        jobs <- job
        results = append(results, result)
    }
    
    // 結果を収集
    for i, result := range results {
        select {
        case res := <-result:
            fmt.Printf("Job %d result: %s\n", i+1, res)
        case <-time.After(time.Second):
            fmt.Printf("Job %d timeout\n", i+1)
        }
    }
    
    close(jobs)
}

データ所有権による並行制御

func demonstrateDataOwnership() {
    fmt.Println("データ所有権による並行制御:")
    
    // アクターパターンの実装
    type BankAccount struct {
        balance int
        ops     chan operation
        done    chan bool
    }
    
    type operation struct {
        kind   string
        amount int
        result chan int
    }
    
    // 銀行口座アクター
    func NewBankAccount(initialBalance int) *BankAccount {
        acc := &BankAccount{
            balance: initialBalance,
            ops:     make(chan operation, 10),
            done:    make(chan bool),
        }
        
        go acc.actor() // アクターを起動
        return acc
    }
    
    func (acc *BankAccount) actor() {
        for {
            select {
            case op := <-acc.ops:
                switch op.kind {
                case "deposit":
                    acc.balance += op.amount
                    op.result <- acc.balance
                case "withdraw":
                    if acc.balance >= op.amount {
                        acc.balance -= op.amount
                        op.result <- acc.balance
                    } else {
                        op.result <- -1 // 残高不足
                    }
                case "balance":
                    op.result <- acc.balance
                }
            case <-acc.done:
                return
            }
        }
    }
    
    func (acc *BankAccount) Deposit(amount int) int {
        result := make(chan int)
        acc.ops <- operation{"deposit", amount, result}
        return <-result
    }
    
    func (acc *BankAccount) Withdraw(amount int) int {
        result := make(chan int)
        acc.ops <- operation{"withdraw", amount, result}
        return <-result
    }
    
    func (acc *BankAccount) Balance() int {
        result := make(chan int)
        acc.ops <- operation{"balance", 0, result}
        return <-result
    }
    
    func (acc *BankAccount) Close() {
        acc.done <- true
    }
    
    // 使用例
    account := NewBankAccount(1000)
    defer account.Close()
    
    var wg sync.WaitGroup
    
    // 複数のgoroutineから並行アクセス
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            
            // 入金
            newBalance := account.Deposit(100)
            fmt.Printf("Goroutine %d: 入金後残高 = %d\n", i, newBalance)
            
            // 出金
            newBalance = account.Withdraw(50)
            if newBalance >= 0 {
                fmt.Printf("Goroutine %d: 出金後残高 = %d\n", i, newBalance)
            } else {
                fmt.Printf("Goroutine %d: 残高不足\n", i)
            }
        }(i)
    }
    
    wg.Wait()
    
    finalBalance := account.Balance()
    fmt.Printf("最終残高: %d\n", finalBalance)
}

パフォーマンス比較

atomic vs mutex vs channel の比較

func demonstratePerformanceComparison() {
    fmt.Println("パフォーマンス比較:")
    
    const iterations = 1000000
    
    // 1. Atomic操作
    benchmarkAtomic := func() time.Duration {
        var counter int64
        var wg sync.WaitGroup
        numGoroutines := 10
        
        start := time.Now()
        for i := 0; i < numGoroutines; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                for j := 0; j < iterations/numGoroutines; j++ {
                    atomic.AddInt64(&counter, 1)
                }
            }()
        }
        wg.Wait()
        return time.Since(start)
    }
    
    // 2. Mutex操作
    benchmarkMutex := func() time.Duration {
        var counter int64
        var mu sync.Mutex
        var wg sync.WaitGroup
        numGoroutines := 10
        
        start := time.Now()
        for i := 0; i < numGoroutines; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                for j := 0; j < iterations/numGoroutines; j++ {
                    mu.Lock()
                    counter++
                    mu.Unlock()
                }
            }()
        }
        wg.Wait()
        return time.Since(start)
    }
    
    // 3. チャネル操作
    benchmarkChannel := func() time.Duration {
        counter := make(chan int, 1)
        counter <- 0
        var wg sync.WaitGroup
        numGoroutines := 10
        
        start := time.Now()
        for i := 0; i < numGoroutines; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                for j := 0; j < iterations/numGoroutines; j++ {
                    count := <-counter
                    count++
                    counter <- count
                }
            }()
        }
        wg.Wait()
        return time.Since(start)
    }
    
    atomicTime := benchmarkAtomic()
    mutexTime := benchmarkMutex()
    channelTime := benchmarkChannel()
    
    fmt.Printf("Atomic操作: %v\n", atomicTime)
    fmt.Printf("Mutex操作: %v (%.1fx slower)\n", mutexTime, 
               float64(mutexTime)/float64(atomicTime))
    fmt.Printf("Channel操作: %v (%.1fx slower)\n", channelTime, 
               float64(channelTime)/float64(atomicTime))
    
    fmt.Println("\n使い分けのガイドライン:")
    guidelines := []string{
        "• 単純なカウンタ、フラグ → atomic",
        "• 複雑な状態の保護 → mutex",
        "• goroutine間の協調 → channel",
        "• 最高のパフォーマンス → atomic",
        "• 複雑な同期ロジック → channel",
    }
    
    for _, guideline := range guidelines {
        fmt.Println(guideline)
    }
}

実践的な応用例

複合的な並行プログラム

func demonstratePracticalConcurrency() {
    fmt.Println("実践的な並行プログラミング例:")
    
    // Webサーバーの接続カウンタ
    type ConnectionManager struct {
        // アトミック操作で管理
        activeConnections int64
        totalConnections  int64
        
        // Mutexで保護された複雑な状態
        mu        sync.RWMutex
        connInfo  map[string]time.Time
        
        // チャネルによる通知
        events    chan string
        shutdown  chan struct{}
    }
    
    func NewConnectionManager() *ConnectionManager {
        cm := &ConnectionManager{
            connInfo: make(map[string]time.Time),
            events:   make(chan string, 100),
            shutdown: make(chan struct{}),
        }
        
        // イベント処理用goroutine
        go cm.eventProcessor()
        
        return cm
    }
    
    func (cm *ConnectionManager) Connect(clientID string) {
        // アトミック操作でカウンタ更新
        atomic.AddInt64(&cm.activeConnections, 1)
        atomic.AddInt64(&cm.totalConnections, 1)
        
        // Mutexで保護された状態更新
        cm.mu.Lock()
        cm.connInfo[clientID] = time.Now()
        cm.mu.Unlock()
        
        // チャネルでイベント通知
        select {
        case cm.events <- fmt.Sprintf("CONNECT: %s", clientID):
        default:
            // イベントバッファが満杯の場合はスキップ
        }
    }
    
    func (cm *ConnectionManager) Disconnect(clientID string) {
        atomic.AddInt64(&cm.activeConnections, -1)
        
        cm.mu.Lock()
        delete(cm.connInfo, clientID)
        cm.mu.Unlock()
        
        select {
        case cm.events <- fmt.Sprintf("DISCONNECT: %s", clientID):
        default:
        }
    }
    
    func (cm *ConnectionManager) GetStats() (int64, int64, int) {
        active := atomic.LoadInt64(&cm.activeConnections)
        total := atomic.LoadInt64(&cm.totalConnections)
        
        cm.mu.RLock()
        infoCount := len(cm.connInfo)
        cm.mu.RUnlock()
        
        return active, total, infoCount
    }
    
    func (cm *ConnectionManager) eventProcessor() {
        for {
            select {
            case event := <-cm.events:
                fmt.Printf("Event: %s\n", event)
            case <-cm.shutdown:
                return
            }
        }
    }
    
    func (cm *ConnectionManager) Shutdown() {
        close(cm.shutdown)
    }
    
    // 使用例
    cm := NewConnectionManager()
    defer cm.Shutdown()
    
    var wg sync.WaitGroup
    
    // 複数のクライアントによる接続シミュレーション
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            clientID := fmt.Sprintf("client-%d", i)
            
            cm.Connect(clientID)
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            cm.Disconnect(clientID)
        }(i)
    }
    
    // 統計情報の監視
    wg.Add(1)
    go func() {
        defer wg.Done()
        ticker := time.NewTicker(50 * time.Millisecond)
        defer ticker.Stop()
        
        for i := 0; i < 10; i++ {
            <-ticker.C
            active, total, infoCount := cm.GetStats()
            fmt.Printf("Stats: Active=%d, Total=%d, Info=%d\n", 
                       active, total, infoCount)
        }
    }()
    
    wg.Wait()
    
    // 最終統計
    active, total, infoCount := cm.GetStats()
    fmt.Printf("Final Stats: Active=%d, Total=%d, Info=%d\n", 
               active, total, infoCount)
}

Go言語の並行プログラミングでは、用途に応じて最適な同期手法を選択することが重要です:

  • アトミック操作: 単純なプリミティブ値の操作に最適
  • Mutex: 複雑な状態の保護や複数の変数の整合性確保
  • チャネル: goroutine間の調整や「通信による共有」の実現

“Don’t communicate by sharing memory; share memory by communicating” の原則に従い、可能な限りチャネルベースの設計を採用し、パフォーマンスが重要な箇所でのみ低レベルプリミティブを使用することが推奨されます。

おわりに 

本日は、Go言語のよくある質問について解説しました。

よっしー
よっしー

何か質問や相談があれば、コメントをお願いします。また、エンジニア案件の相談にも随時対応していますので、お気軽にお問い合わせください。

それでは、また明日お会いしましょう(^^)

コメント

タイトルとURLをコピーしました