Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

[インデックス 1277] ファイルの概要

このコミットは、Go言語の標準ライブラリにsync.Mutex(ミューテックス)の初期実装を追加するものです。これにより、複数のゴルーチン(Goの軽量スレッド)が共有リソースに安全にアクセスするための基本的な同期プリミティブが提供されます。また、このミューテックスの実装を支える低レベルのセマフォ(semacquiresemrelease)がランタイムに追加されています。

コミット

commit bf3dd3f0efe5b45947a991e22660c62d4ce6b671
Author: Russ Cox <rsc@golang.org>
Date:   Thu Dec 4 12:51:36 2008 -0800

    add mutex.Mutex
    
    R=r
    DELTA=349  (348 added, 0 deleted, 1 changed)
    OCL=20380
    CL=20472

GitHub上でのコミットページへのリンク

https://github.com/golang/go/commit/bf3dd3f0efe5b45947a991e22660c62d4ce6b671

元コミット内容

add mutex.Mutex

R=r
DELTA=349  (348 added, 0 deleted, 1 changed)
OCL=20380
CL=20472

変更の背景

Go言語は並行処理を言語レベルでサポートすることを特徴としていますが、その初期段階では、ゴルーチン間の安全なデータ共有のための基本的な同期メカニズムが不足していました。このコミットは、そのギャップを埋めるために、最も基本的な同期プリミティブの一つであるミューテックス(相互排他ロック)を導入することを目的としています。

ミューテックスは、複数のゴルーチンが同時に共有データにアクセスする際に発生する競合状態(race condition)を防ぐために不可欠です。このコミットにより、開発者はsync.Mutexを使用してクリティカルセクションを保護し、データの整合性を保証できるようになります。

また、このミューテックスの実装は、Goランタイムが提供する低レベルのセマフォプリミティブ(semacquiresemrelease)の上に構築されています。これは、Goの並行処理モデルにおいて、より高レベルの同期メカニズムがどのように低レベルのシステムコールやアトミック操作に依存しているかを示す良い例です。

前提知識の解説

ミューテックス (Mutex)

ミューテックス(Mutual Exclusionの略)は、並行プログラミングにおいて共有リソースへのアクセスを制御するための同期プリミティブです。ミューテックスは「ロック」と「アンロック」の2つの状態を持ちます。あるゴルーチンが共有リソースにアクセスする前にミューテックスをロックし、アクセスが完了した後にアンロックします。ミューテックスがロックされている間は、他のゴルーチンはそのミューテックスをロックしようとするとブロックされ、ロックが解除されるまで待機します。これにより、一度に一つのゴルーチンだけが共有リソースにアクセスすることを保証し、競合状態を防ぎます。

セマフォ (Semaphore)

セマフォは、複数のプロセスやスレッドが共有リソースにアクセスする際の同期を制御するための抽象データ型です。セマフォは、非負の整数値を持ち、主に以下の2つの操作で制御されます。

  • P操作 (wait/acquire): セマフォの値を1減らします。値が0の場合、操作はブロックされ、セマフォの値が正になるまで待機します。
  • V操作 (signal/release): セマフォの値を1増やします。待機しているプロセスやスレッドがあれば、そのうちの一つを解放します。

このコミットでは、Goランタイムが提供する低レベルのセマフォ(semacquiresemrelease)がミューテックスの実装に利用されています。これは、ミューテックスが競合した場合にゴルーチンをスリープさせ、ロックが解放されたときにウェイクアップさせるためのメカニズムとして機能します。

アトミック操作 (Atomic Operations)

アトミック操作とは、複数の操作が不可分(分割不可能)であると保証される操作のことです。つまり、その操作が実行されている間は、他のスレッドやプロセスからその操作の中間状態が見えたり、割り込まれたりすることはありません。並行プログラミングにおいて、共有データの一貫性を保つために非常に重要です。

このコミットでは、特にcas(Compare-And-Swap)というアトミック操作が使用されています。casは、メモリ上の特定のアドレスにある値が期待する値と一致する場合にのみ、その値を新しい値に更新するという操作です。この操作はハードウェアレベルでアトミックに実行されるため、ロックなしで共有データを安全に更新するために利用されます。

CMPXCHGL (Compare and Exchange Long)

CMPXCHGLはx86アーキテクチャのCPU命令で、アトミックな比較交換操作を実行します。具体的には、指定されたメモリ位置(デスティネーションオペランド)の現在の値と、レジスタ(通常はEAXまたはRAX)の値を比較します。もし両者が一致すれば、メモリ位置の値を別のレジスタ(ソースオペランド)の値で更新します。一致しなかった場合、メモリ位置の現在の値がEAX(またはRAX)にロードされます。この命令は、マルチプロセッサ環境でのロックフリーなデータ構造の実装に不可欠です。

このコミットのsrc/lib/sync/asm_amd64.sファイルでは、cas関数がCMPXCHGL命令を使用して実装されており、Goのミューテックスが低レベルのアトミック操作に依存していることがわかります。

futex (Fast Userspace Mutex)

futexはLinuxカーネルが提供するシステムコールで、ユーザー空間での高速な同期プリミティブ(ミューテックス、セマフォなど)の実装を可能にします。競合がない場合はカーネルモードへの切り替えなしにユーザー空間で処理を完結させ、競合が発生した場合のみカーネルの助けを借りてスリープ/ウェイクアップを行います。これにより、同期操作のオーバーヘッドを大幅に削減できます。

src/runtime/sema.cのコメントには、「Linuxのfutexと同じ目標をターゲットにしているが、セマンティクスははるかに単純」と記載されており、Goのセマフォ実装がfutexのような効率的な同期メカニズムを目指していることが示唆されています。

技術的詳細

このコミットは、Go言語の同期プリミティブの基盤を築く重要な変更を含んでいます。

  1. sync.Mutexの導入:

    • src/lib/sync/mutex.goMutex構造体が定義され、Lock()Unlock()メソッドが追加されました。
    • Mutexkeysemaという2つのint32フィールドを持ちます。keyはミューテックスの状態(ロックされているか、競合しているか)を示すために使用され、semaは低レベルのセマフォ操作のためのアドレスとして機能します。
    • Lock()Unlock()は、xadd(アトミックな加算)とcas(比較交換)操作を組み合わせて、ロックの取得と解放を行います。競合が発生した場合は、sys.semacquiresys.semreleaseを呼び出してゴルーチンをスリープ/ウェイクアップさせます。
  2. 低レベルセマフォの実装 (src/runtime/sema.c):

    • Goランタイムにsema.cが追加され、sys.semacquiresys.semreleaseという2つの新しいシステムコールが実装されました。
    • これらの関数は、共有のuint32アドレスをセマフォとして使用し、ゴルーチンをブロックしたり、ウェイクアップしたりする機能を提供します。
    • セマフォの実装は、待機中のゴルーチンを管理するためのリンクリスト(semfirst, semlast)と、それらを保護するためのロック(semlock)を使用しています。
    • semacquireは、まずアトミックにセマフォを減らそうと試み(cansemacquire)、成功すればすぐにリターンします。失敗した場合は、ゴルーチンをキューに入れ、スリープさせます。
    • semreleaseは、アトミックにセマフォを増やし、待機中のゴルーチンをウェイクアップします。
    • sema.cのコメントには、このセマフォが「他の同期プリミティブの競合ケースで使用できるスリープとウェイクアップのプリミティブ」として意図されていることが明記されており、Linuxのfutexと同様の目標を持つことが示されています。
  3. アトミック操作 (src/lib/sync/asm_amd64.s):

    • asm_amd64.scas(Compare-And-Swap)関数のアセンブリ実装が追加されました。これは、CMPXCHGL命令を使用して、指定されたメモリ位置の値をアトミックに比較し、条件付きで交換します。
    • このcas関数は、sync.Mutexxadd関数内で使用され、ロックの状態を安全に更新するために不可欠です。
  4. システムコールとしての公開:

    • src/cmd/gc/sys.gosrc/cmd/gc/sysimport.cが更新され、sys.semacquiresys.semreleaseがGoコンパイラに認識されるシステムコールとしてエクスポートされました。これにより、Goコードからこれらの低レベルセマフォ操作を呼び出すことが可能になります。
  5. ビルドシステムの更新:

    • src/lib/Makefileが更新され、syncパッケージがビルド対象に追加されました。
    • src/lib/sync/Makefileが新規作成され、syncパッケージのビルド方法(Goソース、アセンブリソースのコンパイル、アーカイブの作成)が定義されました。
    • src/runtime/Makefileが更新され、sema.cがランタイムのビルドに含められるようになりました。
    • src/run.bashが更新され、syncパッケージのテストが実行されるようになりました。

これらの変更により、Go言語は基本的な同期プリミティブであるミューテックスを手に入れ、より複雑な並行プログラムを安全に構築するための基盤が確立されました。

コアとなるコードの変更箇所

src/lib/sync/mutex.go (新規ファイル)

package sync

package func cas(val *int32, old, new int32) bool

export type Mutex struct {
	key int32;
	sema int32;
}

func xadd(val *int32, delta int32) (new int32) {
	for {
		v := *val;
		if cas(val, v, v+delta) {
			return v+delta;
		}
	}
	panic("unreached")
}

func (m *Mutex) Lock() {
	if xadd(&m.key, 1) == 1 {
		// changed from 0 to 1; we hold lock
		return;
	}
	sys.semacquire(&m.sema);
}

func (m *Mutex) Unlock() {
	if xadd(&m.key, -1) == 0 {
		// changed from 1 to 0; no contention
		return;
	}
	sys.semrelease(&m.sema);
}

src/runtime/sema.c (新規ファイル)

// Semaphore implementation exposed to Go.
// Intended use is provide a sleep and wakeup
// primitive that can be used in the contended case
// of other synchronization primitives.
// Thus it targets the same goal as Linux's futex,
// but it has much simpler semantics.
//
// That is, don't think of these as semaphores.
// Think of them as a way to implement sleep and wakeup
// such that every sleep is paired with a single wakeup,
// even if, due to races, the wakeup happens before the sleep.
//
// See Mullender and Cox, ``Semaphores in Plan 9,''
// http://swtch.com/semaphore.pdf

#include "runtime.h"

typedef struct Sema Sema;
struct Sema
{
	uint32 *addr;
	G *g;
	Sema *prev;
	Sema *next;
};

// TODO: For now, a linked list; maybe a hash table of linked lists later.
static Sema *semfirst, *semlast;
static Lock semlock;

static void
semqueue(uint32 *addr, Sema *s)
{
	s->addr = addr;
	s->g = nil;

	lock(&semlock);
	s->prev = semlast;
	s->next = nil;
	if(semlast)
		semlast->next = s;
	else
		semfirst = s;
	semlast = s;
	unlock(&semlock);
}

static void
semdequeue(Sema *s)
{
	lock(&semlock);
	if(s->next)
		s->next->prev = s->prev;
	else
		semlast = s->prev;
	if(s->prev)
		s->prev->next = s->next;
	else
		semfirst = s->next;
	s->prev = nil;
	s->next = nil;
	unlock(&semlock);
}

static void
semwakeup(uint32 *addr)
{
	Sema *s;

	lock(&semlock);
	for(s=semfirst; s; s=s->next) {
		if(s->addr == addr && s->g) {
			ready(s->g);
			s->g = nil;
			break;
		}
	}
	unlock(&semlock);
}

// Step 1 of sleep: make ourselves available for wakeup.
// TODO(rsc): Maybe we can write a version without
// locks by using cas on s->g.  Maybe not: I need to
// think more about whether it would be correct.
static void
semsleep1(Sema *s)
{
	lock(&semlock);
	s->g = g;
	unlock(&semlock);
}

// Decided not to go through with it: undo step 1.
static void
semsleepundo1(Sema *s)
{
	lock(&semlock);
	if(s->g != nil) {
		s->g = nil;	// back ourselves out
	} else {
		// If s->g == nil already, semwakeup
		// already readied us.  Since we never stopped
		// running, readying us just set g->readyonstop.
		// Clear it.
		if(g->readyonstop == 0)
			*(int32*)0x555 = 555;
		g->readyonstop = 0;
	}
	unlock(&semlock);
}

// Step 2: wait for the wakeup.
static void
semsleep2(Sema *s)
{
	USED(s);
	g->status = Gwaiting;
	sys·gosched();
}

static int32
cansemacquire(uint32 *addr)
{
	uint32 v;

	while((v = *addr) > 0)
		if(cas(addr, v, v-1))
			return 1;
	return 0;
}

// func sys.semacquire(addr *uint32)
// For now has no return value.
// Might return an ok (not interrupted) bool in the future?
void
sys·semacquire(uint32 *addr)
{
	Sema s;

	// Easy case.
	if(cansemacquire(addr))
		return;

	// Harder case:
	//	queue
	//	try semacquire one more time, sleep if failed
	//	dequeue
	//	wake up one more guy to avoid races (TODO(rsc): maybe unnecessary?)
	semqueue(addr, &s);
	for(;;) {
		semsleep1(&s);
		if(cansemacquire(addr)) {
			semsleepundo1(&s);
			break;
		}
		semsleep2(&s);
	}
	semdequeue(&s);
	semwakeup(addr);
}

// func sys.semrelease(addr *uint32)
void
sys·semrelease(uint32 *addr)
{
	uint32 v;

	for(;;) {
		v = *addr;
		if(cas(addr, v, v+1))
			break;
	}
	semwakeup(addr);
}

src/lib/sync/asm_amd64.s (新規ファイル)

// func cas(val *int32, old, new int32) bool
// Atomically:
//	if *val == old {
//		*val = new;
//		return true;
//	}else
//		return false;
TEXT sync·cas(SB), 7, $0
	MOVQ	8(SP), BX
	MOVL	16(SP), AX
	MOVL	20(SP), CX
	LOCK
	CMPXCHGL	CX, 0(BX)
	JZ ok
	MOVL	$0, 24(SP)
	RET
ok:
	MOVL	$1, 24(SP)
	RET

コアとなるコードの解説

src/lib/sync/mutex.go

  • Mutex構造体:

    • key int32: ミューテックスの状態を表す整数。
      • 0: ロックされていない状態。
      • 1: ロックされているが競合なし(単一のゴルーチンがロックを保持)。
      • >1: ロックされており、かつ複数のゴルーチンがロックを待機している状態。
    • sema int32: 低レベルのセマフォ操作で使用されるアドレス。このアドレスを介して、ゴルーチンはスリープしたりウェイクアップしたりします。
  • xadd(val *int32, delta int32) (new int32)関数:

    • 指定されたint32ポインタvalが指す値にdeltaをアトミックに加算し、新しい値を返します。
    • 内部ではcas(Compare-And-Swap)ループを使用しており、*valの現在の値vを読み取り、v+deltaに更新しようと試みます。この操作は、他のゴルーチンによる同時変更があっても安全です。
  • Lock()メソッド:

    1. xadd(&m.key, 1)を呼び出し、m.keyの値を1増やします。
    2. もしxaddの戻り値が1であれば、これはm.key0から1に変化したことを意味します。つまり、ミューテックスはロックされておらず、現在のゴルーチンが最初にロックを取得したことになります。この場合、ロックは成功し、すぐにリターンします。
    3. もしxaddの戻り値が1でなければ、これはミューテックスが既にロックされているか、他のゴルーチンが既にロックを待機していることを意味します。この場合、現在のゴルーチンはsys.semacquire(&m.sema)を呼び出して、m.semaが指すセマフォでスリープします。
  • Unlock()メソッド:

    1. xadd(&m.key, -1)を呼び出し、m.keyの値を1減らします。
    2. もしxaddの戻り値が0であれば、これはm.key1から0に変化したことを意味します。つまり、ミューテックスはロックされておらず、競合もなかったことになります。この場合、アンロックは成功し、すぐにリターンします。
    3. もしxaddの戻り値が0でなければ、これはミューテックスがロックされており、かつ他のゴルーチンがロックを待機していることを意味します。この場合、現在のゴルーチンはsys.semrelease(&m.sema)を呼び出して、m.semaが指すセマフォで待機しているゴルーチンをウェイクアップします。

src/runtime/sema.c

このファイルは、Goランタイムにおける低レベルのセマフォの実装を提供します。

  • Sema構造体:

    • addr *uint32: このセマフォが関連付けられているuint32アドレス。
    • G *g: このセマフォで待機しているゴルーチンへのポインタ。
    • prev, next: 待機中のゴルーチンを管理するためのリンクリストのポインタ。
  • semqueue(uint32 *addr, Sema *s):

    • 指定されたSema構造体sを、addrに関連付けられた待機キュー(リンクリスト)の末尾に追加します。
  • semdequeue(Sema *s):

    • 指定されたSema構造体sを待機キューから削除します。
  • semwakeup(uint32 *addr):

    • 指定されたaddrに関連付けられた待機キューから、最初のゴルーチンを見つけてready(s->g)を呼び出し、実行可能状態にします。
  • semsleep1(Sema *s):

    • スリープの最初のステップ。現在のゴルーチンgs->gに設定し、ウェイクアップ可能であることを示します。
  • semsleepundo1(Sema *s):

    • スリープを取り消す際に使用されます。s->gnilに戻します。
  • semsleep2(Sema *s):

    • スリープの第2ステップ。現在のゴルーチンのステータスをGwaitingに設定し、sys·gosched()を呼び出してスケジューラに制御を渡し、ゴルーチンをスリープさせます。
  • cansemacquire(uint32 *addr):

    • 指定されたaddrが指すセマフォの値をアトミックに1減らそうと試みます。値が正の場合にのみ成功し、1を返します。これは、競合がない場合の高速パスです。
  • sys·semacquire(uint32 *addr):

    • Goコードから呼び出されるセマフォ取得関数。
    1. まずcansemacquire(addr)を試み、成功すればすぐにリターンします(競合なしの高速パス)。
    2. 失敗した場合(競合がある場合)、現在のゴルーチンをsemqueueで待機キューに追加します。
    3. ループ内でsemsleep1を呼び出し、再度cansemacquireを試みます。
    4. もしcansemacquireが成功すれば、semsleepundo1でスリープ状態を解除し、ループを抜けます。
    5. 失敗すればsemsleep2でゴルーチンをスリープさせます。
    6. スリープからウェイクアップされた後、semdequeueでキューから削除し、semwakeupを呼び出して他の待機ゴルーチンをウェイクアップする可能性があります(競合を避けるため)。
  • sys·semrelease(uint32 *addr):

    • Goコードから呼び出されるセマフォ解放関数。
    1. addrが指すセマフォの値をアトミックに1増やします。
    2. semwakeup(addr)を呼び出し、このセマフォで待機しているゴルーチンをウェイクアップします。

src/lib/sync/asm_amd64.s

このファイルは、syncパッケージが使用するアトミックなcas(Compare-And-Swap)関数のAMD64アセンブリ実装です。

  • TEXT sync·cas(SB), 7, $0:

    • syncパッケージのcas関数を定義します。
    • 引数: val *int32, old int32, new int32
    • 戻り値: bool
  • MOVQ 8(SP), BX: val(ポインタ)をBXレジスタにロードします。

  • MOVL 16(SP), AX: old(期待する値)をAXレジスタにロードします。CMPXCHGL命令は比較対象の値をAX(またはEAX/RAX)レジスタから取得します。

  • MOVL 20(SP), CX: new(新しい値)をCXレジスタにロードします。

  • LOCK: 次の命令(CMPXCHGL)がアトミックに実行されることを保証するためのプレフィックスです。マルチプロセッサ環境で重要です。

  • CMPXCHGL CX, 0(BX):

    • 0(BX)valが指すメモリ位置)の現在の値とAXレジスタの値(old)を比較します。
    • もし両者が一致すれば、0(BX)の値をCXレジスタの値(new)で更新します。
    • 比較が一致したかどうかは、CPUのフラグレジスタ(特にゼロフラグZF)に設定されます。
  • JZ ok: CMPXCHGL命令の結果、ゼロフラグZFがセットされていれば(つまり、比較が一致していれば)、okラベルにジャンプします。これはcas操作が成功したことを意味します。

  • MOVL $0, 24(SP): cas操作が失敗した場合、戻り値(スタック上の24バイトオフセット)に0(false)を設定します。

  • RET: 関数からリターンします。

  • ok:: cas操作が成功した場合の処理。

  • MOVL $1, 24(SP): 戻り値に1(true)を設定します。

  • RET: 関数からリターンします。

このアセンブリコードは、Goのsync.Mutexが、ハードウェアが提供する低レベルのアトミック操作を直接利用して、効率的かつ安全な同期を実現していることを示しています。

関連リンク

参考にした情報源リンク