[インデックス 18264] ファイルの概要
このコミットは、Goランタイムのスケジューラにおけるワークキューの実装を、従来のミューテックスで保護された無制限の配列から、ロックフリーの固定サイズリングバッファへと変更するものです。これにより、特にゴルーチンの生成やスケジューリングのパフォーマンスが向上しています。
コミット
commit 4722b1cbd3c734b67c0e3c1cd4458cdbd51e5844
Author: Dmitriy Vyukov <dvyukov@google.com>
Date: Thu Jan 16 12:17:00 2014 +0400
runtime: use lock-free ring for work queues
Use lock-free fixed-size ring for work queues
instead of an unbounded mutex-protected array.
The ring has single producer and multiple consumers.
If the ring overflows, work is put onto global queue.
benchmark old ns/op new ns/op delta
BenchmarkMatmult 7 5 -18.12%
BenchmarkMatmult-4 2 2 -18.98%
BenchmarkMatmult-16 1 0 -12.84%
BenchmarkCreateGoroutines 105 88 -16.10%
BenchmarkCreateGoroutines-4 376 219 -41.76%
BenchmarkCreateGoroutines-16 241 174 -27.80%
BenchmarkCreateGoroutinesParallel 103 87 -14.66%
BenchmarkCreateGoroutinesParallel-4 169 143 -15.38%
BenchmarkCreateGoroutinesParallel-16 158 151 -4.43%
R=golang-codereviews, rsc
CC=ddetlefs, devon.odell, golang-codereviews
https://golang.org/cl/46170044
GitHub上でのコミットページへのリンク
https://github.com/golang/go/commit/4722b1cbd3c734b67c0e3c1cd4458cdbd51e5844
元コミット内容
Goランタイムのワークキューに、無制限のミューテックス保護された配列の代わりに、ロックフリーの固定サイズリングバッファを使用する。このリングバッファは単一のプロデューサーと複数のコンシューマーを持つ。リングバッファがオーバーフローした場合は、グローバルキューにワークが配置される。
ベンチマーク結果は以下の通り。
BenchmarkMatmult
: 18%程度の改善BenchmarkCreateGoroutines
: 16%から41%程度の改善BenchmarkCreateGoroutinesParallel
: 4%から15%程度の改善
変更の背景
Goランタイムのスケジューラは、ゴルーチン(軽量スレッド)の効率的な管理と実行を担当しています。各P(Processor、論理プロセッサ)は、実行可能なゴルーチンを保持するためのローカルなワークキューを持っています。従来のワークキューは、ミューテックスによって保護された動的にサイズが変更される配列(スライス)として実装されていました。
この実装にはいくつかの課題がありました。
- ロックの競合: ゴルーチンの追加(
runqput
)や取得(runqget
)のたびにミューテックスのロックとアンロックが必要となり、特にマルチコア環境下でP間のゴルーチン奪い合い(ワークスチール)が発生する際に、ロックの競合がパフォーマンスのボトルネックとなる可能性がありました。 - 動的なサイズ変更: 配列のサイズが動的に変更されるため、メモリの再割り当てやコピーが発生し、オーバーヘッドが生じる可能性がありました。
- キャッシュ効率: 動的な配列はメモリ上で連続性が保証されにくく、キャッシュの局所性が低下する可能性がありました。
これらの課題を解決し、スケジューラのパフォーマンス、特にゴルーチンの生成とスケジューリングの効率を向上させるために、ロックフリーのデータ構造であるリングバッファの導入が検討されました。ロックフリーのデータ構造は、ミューテックスを使用せずに並行アクセスを可能にし、競合を減らすことでスケーラビリティを向上させます。
前提知識の解説
Goスケジューラ (M, P, G)
Goランタイムのスケジューラは、M(Machine)、P(Processor)、G(Goroutine)という3つの主要な要素で構成されるGPMモデルを採用しています。
- G (Goroutine): Goにおける軽量スレッドの単位です。関数呼び出しごとに生成され、非常に小さいスタックサイズで開始します。
- M (Machine): OSのスレッドに相当します。Goランタイムは、OSスレッド上でゴルーチンを実行します。MはPと関連付けられ、Pからゴルーチンを取得して実行します。
- P (Processor): 論理プロセッサを表します。実行可能なゴルーチンを保持するローカルなワークキューを持ち、Mにゴルーチンを提供します。
GOMAXPROCS
環境変数によってPの数が制御され、通常はCPUのコア数に設定されます。Pは、Mがゴルーチンを実行するためのコンテキストを提供し、Mがブロックされたり、システムコールを実行したりする際に、他のMにPを渡すことができます。
ワークキュー
各Pは、実行可能なゴルーチンを格納するためのローカルなワークキューを持っています。Mは、自身のPのローカルキューからゴルーチンを取得して実行します。ローカルキューが空になった場合、Mは以下の順序でゴルーチンを探します。
- グローバルな実行可能キュー
- 他のPのローカルキュー(ワークスチール)
- ネットワークポーラー
ロックフリーデータ構造
ロックフリーデータ構造は、ミューテックスやセマフォなどのロックメカニズムを使用せずに、複数のスレッドが同時にアクセスできるデータ構造です。これにより、ロックの競合によるパフォーマンスの低下を防ぎ、スケーラビリティを向上させます。ロックフリーデータ構造は、通常、アトミック操作(Compare-And-Swap: CASなど)を使用して、データの整合性を保証します。
リングバッファ (Ring Buffer / Circular Buffer)
リングバッファは、固定サイズの配列を円環状に利用するデータ構造です。先頭と末尾のポインタ(またはインデックス)を管理することで、データの追加と削除を行います。バッファが満杯になった場合は、最も古いデータが上書きされるか、追加が拒否されます。固定サイズであるため、メモリの再割り当てが不要で、キャッシュ効率が良いという特徴があります。
単一プロデューサー・複数コンシューマー (SPMC)
このコミットで採用されている「単一プロデューサー・複数コンシューマー」モデルは、データ構造に対して、データを追加するスレッド(プロデューサー)が1つ、データを消費するスレッド(コンシューマー)が複数存在することを指します。Goのローカルワークキューの場合、P自身がゴルーチンをローカルキューに入れる唯一のプロデューサーであり、P自身がゴルーチンを取り出すコンシューマーであると同時に、他のP(M)がワークスチールによってゴルーチンを奪う際のコンシューマーにもなります。SPMCモデルでは、プロデューサーとコンシューマーの間の同期を効率的に行うための特別なアルゴリズムが必要になります。
アトミック操作
アトミック操作は、不可分な操作であり、その操作が完了するまで他のスレッドから割り込まれることがありません。マルチスレッド環境で共有データに安全にアクセスするために使用されます。Goランタイムでは、runtime·atomicload
、runtime·atomicstore
、runtime·cas
などのアトミック操作が使用されます。
runtime·atomicload
: 指定されたアドレスから値をアトミックに読み込みます。runtime·atomicstore
: 指定されたアドレスに値をアトミックに書き込みます。runtime·cas
(Compare-And-Swap): 指定されたアドレスの現在の値が期待値と一致する場合にのみ、新しい値を書き込みます。これはロックフリープログラミングの基本的な構成要素です。
技術的詳細
このコミットの主要な変更点は、GoランタイムのP(Processor)構造体内のローカルワークキューの実装を、ミューテックス保護された動的配列から、ロックフリーの固定サイズリングバッファへと変更したことです。
P構造体の変更 (src/pkg/runtime/runtime.h
)
従来のP構造体では、ローカルワークキューはG** runq
(ゴルーチンポインタの配列)、int32 runqhead
、int32 runqtail
、int32 runqsize
で表現されていました。
変更後、runq
は固定サイズの配列G* runq[256]
となり、runqsize
は削除され、runqhead
とrunqtail
はuint32
型に変更されました。
--- a/src/pkg/runtime/runtime.h
+++ b/src/pkg/runtime/runtime.h
@@ -373,10 +373,9 @@ struct P
MCache* mcache;
// Queue of runnable goroutines.
- G** runq;
- int32 runqhead;
- int32 runqtail;
- int32 runqsize;
+ uint32 runqhead;
+ uint32 runqtail;
+ G* runq[256];
// Available G's (status == Gdead)
G* gfree;
この変更により、runq
はコンパイル時にサイズが決定される固定配列となり、動的なメモリ割り当てが不要になります。runqhead
とrunqtail
がuint32
になったのは、リングバッファのインデックスがオーバーフローしても、モジュロ演算(%nelem(p->runq)
)によって正しく循環するためです。
proc.c
におけるワークキュー操作の変更
src/pkg/runtime/proc.c
では、主に以下の関数が変更されました。
-
runqput(P *p, G *gp)
: Pのローカルキューにゴルーチンを追加する関数。- 従来のミューテックスロックが削除され、アトミック操作が導入されました。
p->runqhead
とp->runqtail
をアトミックに読み込み、リングバッファの空き容量を確認します。- 空きがある場合は、
p->runq[t%nelem(p->runq)] = gp;
でゴルーチンを配置し、runtime·atomicstore(&p->runqtail, t+1);
でrunqtail
をアトミックに更新します。 - キューが満杯の場合は、
runqputslow
関数を呼び出し、ゴルーチンとローカルキューの一部をグローバルキューに移動させます。
-
runqputslow(P *p, G *gp, uint32 h, uint32 t)
: ローカルキューが満杯になった際に、ゴルーチンとローカルキューの半分をグローバルキューに移動させる関数。- ローカルキューから半分程度のゴルーチンをバッチとして取得し、
runtime·cas(&p->runqhead, h, h+n)
でrunqhead
をアトミックに更新します。 - 取得したゴルーチンと新しく追加するゴルーチンを連結し、
globrunqputbatch
を使ってグローバルキューに一括で追加します。
- ローカルキューから半分程度のゴルーチンをバッチとして取得し、
-
runqget(P *p)
: Pのローカルキューからゴルーチンを取得する関数。- 従来のミューテックスロックが削除され、アトミック操作が導入されました。
p->runqhead
とp->runqtail
をアトミックに読み込み、キューが空でないことを確認します。gp = p->runq[h%nelem(p->runq)];
でゴルーチンを取得し、runtime·cas(&p->runqhead, h, h+1);
でrunqhead
をアトミックに更新します。CASが成功した場合のみゴルーチンを返します。これにより、複数のコンシューマー(ワークスチールなど)からの安全なアクセスを保証します。
-
runqgrow(P *p)
の削除: 従来の動的配列のサイズを拡張する関数が不要になったため、削除されました。 -
runqsteal(P *p, P *p2)
: 他のP(p2
)のローカルキューからゴルーチンを奪う(スチールする)関数。runqgrab
関数を呼び出して、p2
のローカルキューからゴルーチンのバッチを取得します。- 取得したゴルーチンを自身のローカルキューに追加します。ここでもアトミック操作が使用されます。
-
runqgrab(P *p, G **batch)
: ローカルキューからゴルーチンのバッチを取得する関数。ワークスチール時に使用されます。p->runqhead
とp->runqtail
をアトミックに読み込み、利用可能なゴルーチンの数を計算します。- キューの半分程度のゴルーチンをバッチとしてコピーし、
runtime·cas(&p->runqhead, h, h+n)
でrunqhead
をアトミックに更新します。
アトミック操作による同期
この変更の核心は、ミューテックスの代わりにアトミック操作(runtime·atomicload
, runtime·atomicstore
, runtime·cas
)を使用することで、ロックの競合を排除し、並行性を高めている点です。
runqput
では、runtime·atomicload(&p->runqhead)
でload-acquire
セマンティクス、runtime·atomicstore(&p->runqtail, t+1)
でstore-release
セマンティクスを使用しています。これにより、プロデューサーがアイテムを書き込んだ後、そのアイテムがコンシューマーから可視になることが保証されます。runqget
やrunqgrab
では、runtime·atomicload(&p->runqhead)
でload-acquire
セマンティクス、runtime·cas(&p->runqhead, h, h+1)
でcas-release
セマンティクスを使用しています。これにより、コンシューマーがアイテムを読み取る際に、そのアイテムがプロデューサーによって正しく書き込まれたことを確認し、他のコンシューマーとの競合を避けます。
パフォーマンスの向上
コミットメッセージに記載されているベンチマーク結果は、この変更がゴルーチンの生成とスケジューリングのパフォーマンスに顕著な改善をもたらしたことを示しています。特にBenchmarkCreateGoroutines
では最大41%以上の改善が見られ、これはゴルーチン生成時のワークキュー操作のオーバーヘッドが大幅に削減されたことを示唆しています。BenchmarkMatmult
のような計算集約型のベンチマークでも改善が見られるのは、スケジューリングの効率化が全体的なアプリケーションのパフォーマンスに寄与しているためと考えられます。
コアとなるコードの変更箇所
このコミットのコアとなる変更は、src/pkg/runtime/runtime.h
におけるP
構造体のrunq
メンバーの定義変更と、src/pkg/runtime/proc.c
におけるrunqput
、runqget
、runqsteal
、runqputslow
、runqgrab
といったワークキュー操作関数の実装変更です。
特に重要なのは、以下の部分です。
-
P
構造体のrunq
定義変更 (src/pkg/runtime/runtime.h
):--- a/src/pkg/runtime/runtime.h +++ b/src/pkg/runtime/runtime.h @@ -373,10 +373,9 @@ struct P MCache* mcache; // Queue of runnable goroutines. - G** runq; - int32 runqhead; - int32 runqtail; - int32 runqsize; + uint32 runqhead; + uint32 runqtail; + G* runq[256]; // Available G's (status == Gdead) G* gfree;
-
runqput
関数 (src/pkg/runtime/proc.c
): ミューテックスロックの削除と、アトミック操作によるリングバッファへの追加ロジック。--- a/src/pkg/runtime/proc.c +++ b/src/pkg/runtime/proc.c @@ -2699,78 +2711,98 @@ pidleget(void) // Try to put g on local runnable queue. // If it's full, put onto global queue. // Executed only by the owner P. static void runqput(P *p, G *gp) { - int32 h, t, s; + uint32 h, t; - runtime·lock(p); retry: - h = p->runqhead; + h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with consumers t = p->runqtail; - s = p->runqsize; - if(t == h-1 || (h == 0 && t == s-1)) { - runqgrow(p); - goto retry; + if(t - h < nelem(p->runq)) { + p->runq[t%nelem(p->runq)] = gp; + runtime·atomicstore(&p->runqtail, t+1); // store-release, makes the item available for consumption + return; } - p->runq[t++] = gp; - if(t == s) - t = 0; - p->runqtail = t; - runtime·unlock(p); + if(runqputslow(p, gp, h, t)) + return; + // the queue is not full, now the put above must suceed + goto retry; }
-
runqget
関数 (src/pkg/runtime/proc.c
): ミューテックスロックの削除と、アトミック操作によるリングバッファからの取得ロジック。--- a/src/pkg/runtime/proc.c +++ b/src/pkg/runtime/proc.c @@ -2779,57 +2811,24 @@ runqgrow(P *p) static G* runqsteal(P *p, P *p2) { - G *gp, *gp1; - int32 t, h, s, t2, h2, s2, c, i; + G *gp; + G *batch[nelem(p->runq)/2]; + uint32 t, h, n, i; - if(p2->runqhead == p2->runqtail) + n = runqgrab(p2, batch); + if(n == 0) \treturn nil; - // sort locks to prevent deadlocks - if(p < p2) - \truntime·lock(p); - runtime·lock(p2); - if(p2->runqhead == p2->runqtail) { - \truntime·unlock(p2); - \tif(p < p2) - \t\truntime·unlock(p); - \treturn nil; - } - if(p >= p2) - \truntime·lock(p); - // now we've locked both queues and know the victim is not empty - h = p->runqhead; + n--; + gp = batch[n]; + if(n == 0) + return gp; + h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with consumers t = p->runqtail; - s = p->runqsize; - h2 = p2->runqhead; - t2 = p2->runqtail; - s2 = p2->runqsize; - gp = p2->runq[h2++]; // return value - if(h2 == s2) - \th2 = 0; - // steal roughly half - if(t2 > h2) - \tc = (t2 - h2) / 2; - else - \tc = (s2 - h2 + t2) / 2; - // copy - for(i = 0; i != c; i++) { - \t// the target queue is full? - \tif(t == h-1 || (h == 0 && t == s-1)) - \t\tbreak; - \t// the victim queue is empty? - \tif(t2 == h2) - \t\tbreak; - \tgp1 = p2->runq[h2++]; - \tif(h2 == s2) - \t\th2 = 0; - \tp->runq[t++] = gp1; - \tif(t == s) - \t\tt = 0; - } - p->runqtail = t; - p2->runqhead = h2; - runtime·unlock(p2); - runtime·unlock(p); + if(t - h + n >= nelem(p->runq)) + runtime·throw("runqsteal: runq overflow"); + for(i=0; i<n; i++, t++) + p->runq[t%nelem(p->runq)] = batch[i]; + runtime·atomicstore(&p->runqtail, t); // store-release, makes the item available for consumption return gp; }
コアとなるコードの解説
P
構造体のrunq
定義変更
G** runq;
からG* runq[256];
へ:- これは、ローカルワークキューが動的にサイズ変更されるポインタの配列から、固定サイズ256のゴルーチンポインタの配列になったことを意味します。これにより、
mallocgc
による動的なメモリ割り当てやrunqgrow
による再割り当てが不要になり、メモリ管理のオーバーヘッドが削減されます。また、固定サイズであるため、キャッシュの局所性が向上し、アクセス速度が改善される可能性があります。
- これは、ローカルワークキューが動的にサイズ変更されるポインタの配列から、固定サイズ256のゴルーチンポインタの配列になったことを意味します。これにより、
int32 runqhead; int32 runqtail; int32 runqsize;
からuint32 runqhead; uint32 runqtail;
へ:runqsize
が削除されたのは、配列が固定サイズになったため、そのサイズはnelem(p->runq)
(配列の要素数、この場合は256)で常に既知だからです。runqhead
とrunqtail
がuint32
型になったのは、リングバッファのインデックスがuint32
の最大値を超えても、モジュロ演算(%nelem(p->runq)
)によって正しく循環し、インデックスのオーバーフローを気にせずに使用できるためです。これにより、インデックスが負になる可能性がなくなり、コードが簡潔になります。
runqput(P *p, G *gp)
関数
この関数は、Pのローカルキューにゴルーチンgp
を追加します。
static void
runqput(P *p, G *gp)
{
uint32 h, t;
retry:
h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with consumers
t = p->runqtail;
if(t - h < nelem(p->runq)) { // キューに空きがあるかチェック
p->runq[t%nelem(p->runq)] = gp; // リングバッファにゴルーチンを配置
runtime·atomicstore(&p->runqtail, t+1); // store-release, makes the item available for consumption
return;
}
// キューが満杯の場合
if(runqputslow(p, gp, h, t)) // runqputslowを呼び出し、グローバルキューに移動
return;
// runqputslowが失敗した場合(他のコンシューマーがアイテムを消費したためキューが満杯でなくなった場合など)
goto retry; // 再試行
}
- ロックフリー化: 従来のミューテックスロック(
runtime·lock(p)
とruntime·unlock(p)
)が削除されました。 - アトミック操作:
h = runtime·atomicload(&p->runqhead);
:runqhead
をアトミックに読み込みます。load-acquire
セマンティクスは、この読み込みより前のメモリ操作が、この読み込みより後のメモリ操作よりも先に完了することを保証します。これにより、他のコンシューマー(ワークスチールなど)が既に消費したアイテムの古い状態を読み込むことを防ぎます。runtime·atomicstore(&p->runqtail, t+1);
:runqtail
をアトミックに更新します。store-release
セマンティクスは、この書き込みより前のメモリ操作が、この書き込みより後のメモリ操作よりも先に完了することを保証します。これにより、プロデューサーがアイテムを書き込んだ後、そのアイテムが他のコンシューマーから可視になることが保証されます。
- リングバッファの利用:
t%nelem(p->runq)
というモジュロ演算を使って、リングバッファのインデックスを計算しています。 - オーバーフロー処理:
t - h < nelem(p->runq)
でキューに空きがあるかチェックし、空きがない場合はrunqputslow
を呼び出して、ゴルーチンとローカルキューの一部をグローバルキューに移動させます。これにより、ローカルキューが無限に大きくなることを防ぎ、固定サイズを維持します。
runqputslow(P *p, G *gp, uint32 h, uint32 t)
関数
この関数は、runqput
から呼び出され、ローカルキューが満杯になった場合に、ゴルーチンとローカルキューの半分をグローバルキューに移動させます。
static bool
runqputslow(P *p, G *gp, uint32 h, uint32 t)
{
G *batch[nelem(p->runq)/2+1]; // ローカルキューの半分+1のバッチ
uint32 n, i;
// First, grab a batch from local queue.
n = t-h;
n = n/2; // 半分のゴルーチンを取得
if(n != nelem(p->runq)/2)
runtime·throw("runqputslow: queue is not full"); // キューが満杯でない場合はエラー(デバッグ用)
for(i=0; i<n; i++)
batch[i] = p->runq[(h+i)%nelem(p->runq)]; // バッチにゴルーチンをコピー
if(!runtime·cas(&p->runqhead, h, h+n)) // cas-release, commits consume
return false; // CAS失敗(他のコンシューマーがheadを更新した)
batch[n] = gp; // 新しいゴルーチンをバッチの最後に追加
// Link the goroutines.
for(i=0; i<n; i++)
batch[i]->schedlink = batch[i+1]; // バッチ内のゴルーチンを連結
// Now put the batch on global queue.
runtime·lock(&runtime·sched); // グローバルキューへのアクセスはロックで保護
globrunqputbatch(batch[0], batch[n], n+1); // バッチをグローバルキューに一括追加
runtime·unlock(&runtime·sched);
return true;
}
- バッチ処理: ローカルキューの半分程度のゴルーチンをバッチとして取得し、新しいゴルーチン
gp
もそのバッチに含めます。 runtime·cas(&p->runqhead, h, h+n)
:runqhead
をアトミックに更新します。これはCompare-And-Swap
操作であり、現在のrunqhead
の値が期待値h
と一致する場合にのみ、h+n
に更新します。これにより、複数のコンシューマーが同時にrunqhead
を更新しようとした場合の競合を安全に解決します。CASが失敗した場合はfalse
を返し、runqput
で再試行されます。- グローバルキューへの移動: 取得したバッチは、
globrunqputbatch
関数を使ってグローバルキューに一括で追加されます。グローバルキューへのアクセスは、runtime·lock(&runtime·sched)
とruntime·unlock(&runtime·sched)
で保護されています。これは、グローバルキューが複数のPからアクセスされるため、ロックフリー化が難しい、または複雑になるためと考えられます。
runqget(P *p)
関数
この関数は、Pのローカルキューからゴルーチンを取得します。
static G*
runqget(P *p)
{
G *gp;
uint32 t, h;
for(;;) { // ロックフリーな取得のためループ
h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with other consumers
t = p->runqtail;
if(t == h) // キューが空の場合
return nil;
gp = p->runq[h%nelem(p->runq)]; // ゴルーチンを取得
if(runtime·cas(&p->runqhead, h, h+1)) // cas-release, commits consume
return gp; // CAS成功、ゴルーチンを返す
}
}
- ロックフリー化: 従来のミューテックスロックが削除されました。
- アトミック操作とCAS:
h = runtime·atomicload(&p->runqhead);
:runqhead
をアトミックに読み込みます。t = p->runqtail;
:runqtail
を読み込みます。if(t == h)
: キューが空かどうかをチェックします。gp = p->runq[h%nelem(p->runq)];
: リングバッファからゴルーチンを取得します。if(runtime·cas(&p->runqhead, h, h+1))
:runqhead
をアトミックにインクリメントします。このCAS操作が成功した場合のみ、取得したゴルーチンを返します。これにより、複数のコンシューマー(P自身やワークスチールを行う他のP)が同時に同じゴルーチンを取得しようとした場合の競合を安全に解決し、各ゴルーチンが一度だけ取得されることを保証します。CASが失敗した場合はループを続行し、再試行します。
runqsteal(P *p, P *p2)
関数
この関数は、他のP(p2
)のローカルキューからゴルーチンを奪います。
static G*
runqsteal(P *p, P *p2)
{
G *gp;
G *batch[nelem(p->runq)/2]; // バッチ配列
uint32 t, h, n, i;
n = runqgrab(p2, batch); // p2からゴルーチンのバッチを取得
if(n == 0)
return nil; // 取得できなかった場合
n--; // 1つは自身が実行するため
gp = batch[n]; // 自身が実行するゴルーチン
if(n == 0)
return gp; // 1つしか取得できなかった場合
h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with consumers
t = p->runqtail;
if(t - h + n >= nelem(p->runq)) // 自身のキューがオーバーフローするかチェック
runtime·throw("runqsteal: runq overflow");
for(i=0; i<n; i++, t++)
p->runq[t%nelem(p->runq)] = batch[i]; // 取得したバッチを自身のキューに追加
runtime·atomicstore(&p->runqtail, t); // store-release, makes the item available for consumption
return gp;
}
runqgrab
の利用: ワークスチールは、runqgrab
関数を呼び出して、ターゲットP(p2
)のローカルキューからゴルーチンのバッチをアトミックに取得します。- バッチの追加: 取得したバッチのうち1つは自身が実行し、残りを自身のローカルキューに追加します。ここでも
runtime·atomicstore(&p->runqtail, t);
を使ってアトミックにrunqtail
を更新します。
runqgrab(P *p, G **batch)
関数
この関数は、ローカルキューからゴルーチンのバッチを取得します。主にワークスチール時に使用されます。
static uint32
runqgrab(P *p, G **batch)
{
uint32 t, h, n, i;
for(;;) { // ロックフリーな取得のためループ
h = runtime·atomicload(&p->runqhead); // load-acquire, synchronize with other consumers
t = runtime·atomicload(&p->runqtail); // load-acquire, synchronize with the producer
n = t-h;
n = n - n/2; // 残りの半分を取得
if(n == 0)
break; // 取得できるゴルーチンがない場合
if(n > nelem(p->runq)/2) // read inconsistent h and t
continue; // 不整合なhとtを読み込んだ場合、再試行
for(i=0; i<n; i++)
batch[i] = p->runq[(h+i)%nelem(p->runq)]; // バッチにゴルーチンをコピー
if(runtime·cas(&p->runqhead, h, h+n)) // cas-release, commits consume
break; // CAS成功、ループを抜ける
}
return n; // 取得したゴルーチンの数を返す
}
- アトミックな読み込み:
runqhead
とrunqtail
をアトミックに読み込みます。 - バッチサイズの決定: キューに残っているゴルーチンの半分をバッチとして取得しようとします。
runtime·cas(&p->runqhead, h, h+n)
:runqhead
をアトミックに更新します。これにより、複数のコンシューマーが同時にバッチを取得しようとした場合の競合を安全に解決します。CASが失敗した場合はループを続行し、再試行します。
これらの変更により、Goランタイムのスケジューラは、ローカルワークキューの操作においてミューテックスのオーバーヘッドを排除し、アトミック操作と固定サイズリングバッファの利用によって、より高い並行性と効率を実現しています。特にゴルーチンの生成とスケジューリングの頻度が高いワークロードにおいて、そのパフォーマンス向上が期待されます。
関連リンク
- Go Scheduler: M, P, G - https://go.dev/doc/articles/go_scheduler.html (これは一般的なGoスケジューラの解説であり、このコミットに直接関連するものではありませんが、前提知識として有用です。)
- Goの並行処理とスケジューラについて - https://zenn.dev/link/articles/go-concurrency-scheduler (日本語の解説記事)
参考にした情報源リンク
- Goのソースコード (特に
src/runtime/proc.go
やsrc/runtime/runtime.h
の関連部分) - ロックフリーデータ構造に関する一般的な情報源 (例: Wikipedia, 並行プログラミングの書籍)
- アトミック操作に関する情報源 (例: CPUアーキテクチャのドキュメント、並行プログラミングの書籍)
- リングバッファに関する情報源 (例: データ構造の教科書)
- Goのコミット履歴と関連するコードレビュー (golang.org/cl/46170044)
- Dmitriy Vyukov氏のGoランタイムに関する他の貢献や論文 (Goのスケジューラや並行処理に関する彼の研究は多岐にわたります)
- Goのベンチマーク結果の解釈に関する情報