CLOVER🍀 (original) (raw)
これは、なにをしたくて書いたもの?
Kubernetesでいろいろ試そうと思うとプライベートレジストリーが欲しくなったりするものですが、割と面倒な気がします。
MicroK8sだとregistryアドオンで多少簡単に導入できそうだったので試してみることにしました。
How to use the built-in registry
MicroK8sのregistryアドオン
registryアドオンを使ったレジストリーの使い方はこちら。
How to use the built-in registry
アドオンを使って導入するプライベートレジストリーは、組み込みレジストリーと呼んでいるみたいです。
流れとしては以下になります。
- MicroK8sのregistryアドオンを有効にする
- Dockerデーモンに対して
insecure-registries
の設定をする
レジストリーの用意が簡単なのがよいところでしょうか。試してみましょう。
ちなみに、組み込みではないプライベートレジストリーについてのドキュメントはこちらで、
How to work with a private registry
アドオンの一覧はこちらです。
環境
今回の環境はこちら。
$ microk8s version MicroK8s v1.31.2 revision 7394
$ microk8s kubectl version Client Version: v1.31.2 Kustomize Version: v5.4.2 Server Version: v1.31.2
Dockerのバージョン。
$ docker version Client: Docker Engine - Community Version: 27.3.1 API version: 1.47 Go version: go1.22.7 Git commit: ce12230 Built: Fri Sep 20 11:41:00 2024 OS/Arch: linux/amd64 Context: default
Server: Docker Engine - Community Engine: Version: 27.3.1 API version: 1.47 (minimum version 1.24) Go version: go1.22.7 Git commit: 41ca978 Built: Fri Sep 20 11:41:00 2024 OS/Arch: linux/amd64 Experimental: false containerd: Version: 1.7.23 GitCommit: 57f17b0a6295a39009d861b89e3b3b87b005ca27 runc: Version: 1.1.14 GitCommit: v1.1.14-0-g2c9f560 docker-init: Version: 0.19.0 GitCommit: de40ad0
MicroK8sのregistryアドオンを使う
それでは、MicroK8sのregistryアドオンを使ってみましょう。
まずはregistryアドオンを有効にします。
$ microk8s enable registry
ログ。
Infer repository core for addon registry Infer repository core for addon hostpath-storage Enabling default storage class. WARNING: Hostpath storage is not suitable for production environments. A hostpath volume can grow beyond the size limit set in the volume claim manifest.
deployment.apps/hostpath-provisioner created storageclass.storage.k8s.io/microk8s-hostpath created serviceaccount/microk8s-hostpath created clusterrole.rbac.authorization.k8s.io/microk8s-hostpath created clusterrolebinding.rbac.authorization.k8s.io/microk8s-hostpath created Storage will be available soon. The registry will be created with the size of 20Gi. Default storage class will be used. namespace/container-registry created persistentvolumeclaim/registry-claim created deployment.apps/registry created service/registry created configmap/local-registry-hosting configured
レジストリーの容量は、デフォルトで20Gです。
もっと大きなレジストリーとしたい場合は以下のように指定するみたいです。
$ microk8s enable registry:size=40Gi
microk8s status
で見てみると
$ microk8s status
registry
が現れます。32000ポートでリッスンしているようです。
addons: enabled: dns # (core) CoreDNS ha-cluster # (core) Configure high availability on the current node helm # (core) Helm - the package manager for Kubernetes helm3 # (core) Helm 3 - the package manager for Kubernetes hostpath-storage # (core) Storage class; allocates storage from host directory registry # (core) Private image registry exposed on localhost:32000 storage # (core) Alias to hostpath-storage add-on, deprecated
では、プライベートレジストリーにpushするイメージを作成します。nginxをベースにしましょう。
Dockerfile
FROM nginx:1.27.2-bookworm
といっても、イメージ名を変えたくらいですが。イメージ名のドメイン名の部分はlocalhost:32000
にします。
$ docker image build -t localhost:32000/nginx:1.27.2-bookworm .
作成できました。
$ docker image ls REPOSITORY TAG IMAGE ID CREATED SIZE localhost:32000/nginx 1.27.2-bookworm 60c8a892f36f 6 weeks ago 192MB
pushしてみましょう。
$ docker image push localhost:32000/nginx:1.27.2-bookworm
insecure-registries
の設定を入れていませんが、pushできてしまいました…。
The push refers to repository [localhost:32000/nginx] 61ef4e878aac: Pushed a0c145a29c8d: Pushed a1fe8b721bb1: Pushed 19b722697f76: Pushed ffe4285e2906: Pushed 7dca41ff1486: Pushed c3548211b826: Pushed 1.27.2-bookworm: digest: sha256:2ebf3d369d813bcc6a531ba43e1859bd91ad5c8217ae33b821f5ffada06a6cd4 size: 1778
ドキュメントでは、一部のDockerのバージョンでは失敗する可能性があるくらいに書かれていましたが…。
Pushing to this insecure registry may fail in some versions of Docker unless the daemon is explicitly configured to trust this registry.
今回は気にせず進めてみましょう。
pushしたイメージを使って、Deploymentを作成。
$ microk8s kubectl create deployment nginx --image=localhost:32000/nginx:1.27.2-bookworm deployment.apps/nginx created
Podが動いていることを確認。
$ microk8s kubectl get all NAME READY STATUS RESTARTS AGE pod/nginx-75fd58947c-fhtnj 1/1 Running 0 17s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kubernetes ClusterIP 10.152.183.1 443/TCP 72m
NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/nginx 1/1 1 1 17s
NAME DESIRED CURRENT READY AGE replicaset.apps/nginx-75fd58947c 1 1 1 17s
Serviceも作成しましょう。
$ microk8s kubectl expose deployment nginx --port 80 service/nginx exposed
確認。
$ microk8s kubectl get all NAME READY STATUS RESTARTS AGE pod/nginx-75fd58947c-fhtnj 1/1 Running 0 72s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kubernetes ClusterIP 10.152.183.1 443/TCP 73m service/nginx ClusterIP 10.152.183.208 80/TCP 29s
NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/nginx 1/1 1 1 72s
NAME DESIRED CURRENT READY AGE replicaset.apps/nginx-75fd58947c 1 1 1 72s
アクセスしてみます。
$ curl 10.152.183.208
Welcome to nginx!Welcome to nginx!
If you see this page, the nginx web server is successfully installed and working. Further configuration is required.
For online documentation and support please refer to
nginx.org.
Commercial support is available at
nginx.com.
Thank you for using nginx.
OKですね。
後片付け。
$ microk8s kubectl delete service nginx $ microk8s kubectl delete deployment nginx
$ microk8s kubectl get all NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kubernetes ClusterIP 10.152.183.1 443/TCP 84m
オマケ
今回は不要でしたが、insecure-registries
を設定する場合は/etc/docker/daemon.json
に以下のように設定して
/etc/docker/daemon.json
{ "insecure-registries" : ["localhost:32000"] }
Dockerデーモンを再起動します。
$ sudo systemctl restart docker
おわりに
MicroK8sのregistryアドオンを使って、組み込みレジストリー(プライベートレジストリー)を使ってみました。
かなり簡単に導入できたので驚きました。Kubernetesを使ってちょっと確認をしたい時や開発用途などで便利なのではないでしょうか。
これは、なにをしたくて書いたもの?
Pythonで、マルチスレッドに関する標準ライブラリーを知っておきたいなと思いまして。
ちなみにスレッド自体は過去にも扱っています。
Pythonのスレッドは、ネイティブスレッドなのか? - CLOVER🍀
PythonのTCPServer/HTTPServerをマルチスレッドで使う - CLOVER🍀
今回はロックやセマフォといったスレッド間に関する標準ライブラリーを見ていきます。
threadingライブラリー
threadingライブラリーのページはこちら。
threading --- スレッドベースの並列処理 — Python 3.10.15 ドキュメント
こちらには以下のAPIやクラスが含まれています。
- スレッドローカルデータ … スレッドごとに固有の値を設定する
- Lock … ロック、アンロックが可能で、特定のスレッドがロックを獲得している時に他のスレッドがロックを獲得しようとすると、先に獲得されたロックがアンロックされるまで待機する
- RLock … 再入可能ロック(Reentrant Lock)。Lockと異なり同じスレッドが再帰的にロックを獲得可能
- Condition … ロックに関連付けられたうえで、
wait
/notify
(notify_all
)でスレッドの待機/起動を操作できるオブジェクト - Semaphore … いわゆるセマフォで、ある範囲に対して同時に実行できるスレッド数を制限する仕組み
- Event … あるスレッドがイベントを発信し、他のスレッドはイベントの発信を待つというスレッド間通信を行う仕組み
- Timer … 一定時間後にスレッドを実行する仕組み
- Barrier … 複数のスレッドの待ち合わせを行う仕組み
なお、Lock
、RLock
、Condition
、Semaphore
はコンテキストマネージャーとして使えます。
具体的にはロックの獲得をacquire
で行い、解放にrelease
を使うものはコンテキストマネージャーとして使えるようになっていて、with
ブロックに入る時にacquire
が呼び出されwith
ブロックを抜ける時にrelease
が呼び出されます。
ところで、PythonにおけるスレッドはGILがあるので1プロセス内で同時に実行できるスレッドはひとつだけです。Pythonでマルチスレッドが
有効なのはIOバウンドな処理を並列して実行したい時ですね。
CPython 実装の詳細: CPython は Global Interpreter Lock のため、ある時点で Python コードを実行できるスレッドは1つに限られます (ただし、いくつかのパフォーマンスが強く求められるライブラリはこの制限を克服しています)。アプリケーションにマルチコアマシンの計算能力をより良く利用させたい場合は、 multiprocessing モジュールや concurrent.futures.ProcessPoolExecutor の利用をお勧めします。 ただし、I/Oバウンドなタスクを並行して複数走らせたい場合においては、 マルチスレッドは正しい選択肢です。
ちなみにスレッド自体を直接扱うのではなく、concurrent.futures
のThreadPoolExecutor
を使うのがよいと思います。
concurrent.futures -- 並列タスク実行 — Python 3.10.15 ドキュメント
今回はこのあたりを試してみたいと思います。
環境
今回の環境はこちら。
$ python3 --version Python 3.10.12
$ pip3 --version pip 22.0.2 from /usr/lib/python3/dist-packages/pip (python 3.10)
準備
確認はpytestで行いたいと思います。型チェックにmypyも入れておきます。
$ pip3 install pytest mypy
インストールされたライブラリーの一覧。
$ pip3 list Package Version
exceptiongroup 1.2.2 iniconfig 2.0.0 mypy 1.13.0 mypy-extensions 1.0.0 packaging 24.2 pip 22.0.2 pluggy 1.5.0 pytest 8.3.3 setuptools 59.6.0 tomli 2.1.0 typing_extensions 4.12.2
動作確認はpytestを使ったテストコードで行いますが、雛形はこちらです。
tests/test_threading.py
from concurrent.futures import ThreadPoolExecutor import datetime import threading import time
def log(message: str) -> None: print(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {threading.current_thread().name} - {message}")
またテスト中に標準出力への書き出しを行うので、pytestは--capture=no
オプションを指定して実行します。
$ pytest --capture=no
では、試していってみましょう。
スレッドローカルデータ
最初はスレッドローカルデータから。
スレッドローカルデータは、そのスレッド固有のデータを持たせる仕組みです。あたかも単一スレッド前提のような使い方をするコードで
複数スレッドで実行しても、それぞれのデータが独立して扱えるので便利です。
サンプルコード。
def test_thread_local_data() -> None: results = {}
localdata = threading.local()
def thread1() -> None:
time.sleep(3)
localdata.mydata = "Hello from thread1"
time.sleep(2)
log(f"thread1 data = {localdata.mydata}")
assert localdata.mydata == "Hello from thread1"
results[threading.current_thread().name] = "done"
def thread2() -> None:
time.sleep(2)
localdata.mydata = "Hello from thread2"
time.sleep(3)
log(f"thread2 data = {localdata.mydata}")
assert localdata.mydata == "Hello from thread2"
results[threading.current_thread().name] = "done"
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(thread1))
futures.append(executor.submit(thread2))
[f.result() for f in futures]
assert len(results) == 2
スレッドローカルデータは、threading.local
で取得したオブジェクトで表現されます。
localdata = threading.local()
スレッドローカルデータは辞書のように扱えます。
同じオブジェクトに各スレッドが同じ属性に書き込んでいますが、それぞれのスレッドが設定した値がしっかり残っています。
def thread1() -> None:
time.sleep(3)
localdata.mydata = "Hello from thread1"
time.sleep(2)
log(f"thread1 data = {localdata.mydata}")
assert localdata.mydata == "Hello from thread1"
results[threading.current_thread().name] = "done"
def thread2() -> None:
time.sleep(2)
localdata.mydata = "Hello from thread2"
time.sleep(3)
log(f"thread2 data = {localdata.mydata}")
assert localdata.mydata == "Hello from thread2"
results[threading.current_thread().name] = "done"
標準出力の結果。
[2024-11-16 20:32:29] ThreadPoolExecutor-0_0 - thread1 data = Hello from thread1 [2024-11-16 20:32:29] ThreadPoolExecutor-0_1 - thread2 data = Hello from thread2
このように、スレッドごとに固有の値を管理できる仕組みです。
ちなみにスレッドローカルデータはlocal
を継承することで独自のスレッドローカルデータを作れたりするのですが、ドキュメントにほとんど
説明がありません。APIの説明もないですね。
詳しくはソースコードを見ること、だそうです。
詳細と例題については、 _threading_local モジュールのドキュメンテーション文字列を参照してください。
https://github.com/python/cpython/blob/v3.10.12/Lib/_threading_local.py
Lock、RLock
次はLock
とRLock
です。
まずはLock
から。
def test_lock() -> None: lock = threading.Lock()
results = {}
def with_lock() -> None:
log("try lock")
lock.acquire()
try:
log("start")
time.sleep(2)
log("end")
results[threading.current_thread().name] = "done"
finally:
lock.release()
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(with_lock))
futures.append(executor.submit(with_lock))
[f.result() for f in futures]
assert len(results) == 2
Lock
を作成して
lock = threading.Lock()
Lock#acquire
でロックを獲得できます。ロックを獲得できるスレッドはひとつだけで、他のスレッドがLock#acquire
を呼び出した場合はLock#release
でロックが解放されるまで待たされることになります。
def with_lock() -> None:
log("try lock")
lock.acquire()
try:
log("start")
time.sleep(2)
log("end")
results[threading.current_thread().name] = "done"
finally:
lock.release()
なのでfinally
で確実にロックを解放する必要があります。
標準出力に書き出された結果を見ると、最初にロックを取得したスレッドがロックを開放するまで2つ目のスレッドが待たされているのが
確認できます。
[2024-11-16 20:41:09] ThreadPoolExecutor-0_0 - try lock [2024-11-16 20:41:09] ThreadPoolExecutor-0_0 - start [2024-11-16 20:41:09] ThreadPoolExecutor-0_1 - try lock [2024-11-16 20:41:11] ThreadPoolExecutor-0_0 - end [2024-11-16 20:41:11] ThreadPoolExecutor-0_1 - start [2024-11-16 20:41:13] ThreadPoolExecutor-0_1 - end
なお、Lock
はコンテキストマネージャーに対応しているのでwith
を使ってシンプルに書くことができます。
with lock:
log("start")
time.sleep(2)
log("end")
results[threading.current_thread().name] = "done"
こちらの方がLock#release
の呼び出し忘れなどがなくてよいでしょう。以降はwith
でロックを扱います。
なお、Lock
を使ったロックの場合、ロックを獲得したスレッドであってもロック解放前にLock#acquire
を呼び出した場合はロックを取得できず
待たされることになります。
つまり、以下のようなコードを書いてしまうとロックを取得したスレッドが止まってしまいます。
def test_lock_reentrant() -> None: lock = threading.Lock()
results = {}
def with_lock() -> None:
log("try lock")
with lock:
log("start")
time.sleep(2)
log("reentrant lock")
with lock:
log("do something")
log("release reentrant lock")
log("end")
results[threading.current_thread().name] = "done"
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(with_lock))
futures.append(executor.submit(with_lock))
[f.result() for f in futures]
assert len(results) == 2
実行した場合は、標準出力がここで停止します。
[2024-11-16 20:44:08] ThreadPoolExecutor-0_0 - try lock [2024-11-16 20:44:08] ThreadPoolExecutor-0_0 - start [2024-11-16 20:44:08] ThreadPoolExecutor-0_1 - try lock [2024-11-16 20:44:10] ThreadPoolExecutor-0_0 - reentrant lock
つまり、Lock
は最入可能ではありません。
最入可能なロックが必要な場合はRLock
を使います。
先程のコードをRLock
を使って書き直したものがこちらです。
def test_rlock() -> None: lock = threading.RLock()
results = {}
def with_lock() -> None:
with lock:
log("start")
time.sleep(2)
log("reentrant lock")
with lock:
log("do something")
log("release reentrant lock")
log("end")
results[threading.current_thread().name] = "done"
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(with_lock))
futures.append(executor.submit(with_lock))
[f.result() for f in futures]
assert len(results) == 2
Lock
でインスタンスを作成していたところをRLock
にするだけで、あとはLock
と使い方は同じですね。
lock = threading.RLock()
ただしRLock
は最入可能なので、先ほどはLock
で動作しなかったひとつのスレッドが同じロックインスタンスに対して2回acquire
を
呼び出すようなコードであっても
def with_lock() -> None:
with lock:
log("start")
time.sleep(2)
log("reentrant lock")
with lock:
log("do something")
log("release reentrant lock")
log("end")
results[threading.current_thread().name] = "done"
このように動くようになります。
[2024-11-16 20:50:49] ThreadPoolExecutor-0_0 - start [2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - reentrant lock [2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - do something [2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - release reentrant lock [2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - end [2024-11-16 20:50:51] ThreadPoolExecutor-0_1 - start [2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - reentrant lock [2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - do something [2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - release reentrant lock [2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - end
Condition
Conditionはロックに関連付けられるオボジェクトで、スレッドを待機させたり起こしたりできます。
サンプルはこちら。
def test_condition() -> None: condition = threading.Condition()
results = {}
def wait_task() -> None:
with condition:
log("waiting...")
condition.wait()
log("wakeup")
results[threading.current_thread().name] = "done"
def notify_task() -> None:
with condition:
log("notify")
condition.notify_all()
log("done")
results[threading.current_thread().name] = "done"
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(wait_task))
futures.append(executor.submit(wait_task))
time.sleep(3)
futures.append(executor.submit(notify_task))
[f.result() for f in futures]
assert len(results) == 3
Condition
はコンストラクターでインスタンスを取得しますが、引数を指定しない場合は内部的にRLock
のインスタンスを作成します。
condition = threading.Condition()
引数を指定する場合は、Lock
またはRLock
のインスタンスを渡す必要があります。
Condition#wait
でスレッドを待機させます。Condition
に対する操作は、ロックを獲得したうえで行う必要があります。
def wait_task() -> None:
with condition:
log("waiting...")
condition.wait()
log("wakeup")
results[threading.current_thread().name] = "done"
そしてCondition#nofity
またはCondition#notify_all
で待機しているスレッドを起こすことができます。
def notify_task() -> None:
with condition:
log("notify")
condition.notify_all()
log("done")
results[threading.current_thread().name] = "done"
Condition#nofity
ではひとつまたは指定した数のスレッドを、Condition#notify_all
では待機しているスレッドすべてを起こすことができます。
標準出力の結果はこちら。
[2024-11-16 20:59:00] ThreadPoolExecutor-0_0 - waiting... [2024-11-16 20:59:00] ThreadPoolExecutor-0_1 - waiting... [2024-11-16 20:59:03] ThreadPoolExecutor-0_2 - notify [2024-11-16 20:59:03] ThreadPoolExecutor-0_2 - done [2024-11-16 20:59:03] ThreadPoolExecutor-0_0 - wakeup [2024-11-16 20:59:03] ThreadPoolExecutor-0_1 - wakeup
ちなみに、Condition#wait_for
という引数に指定した関数の戻り値がTrue
になるとスレッドが起きるようにするAPIもあるようです。
Semaphore
Semaphore
は、ある範囲を同時に実行できるスレッドの数を制限する仕組みです。
サンプルコードはこちら。
def test_semaphore() -> None: semaphore = threading.Semaphore(2)
results = {}
def with_semaphore() -> None:
log("acquire semaphore")
with semaphore:
log("enter semaphore")
time.sleep(2)
log("leave semaphore")
results[threading.current_thread().name] = "done"
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(with_semaphore))
futures.append(executor.submit(with_semaphore))
futures.append(executor.submit(with_semaphore))
futures.append(executor.submit(with_semaphore))
[f.result() for f in futures]
assert len(results) == 4
Semaphore
は、コンストラクターに同時に実行できるスレッド数を指定してインスタンスを生成します。ここでは2を指定しています。
semaphore = threading.Semaphore(2)
あとはLock
やRLock
のようにロックしたい範囲を指定して使います。
def with_semaphore() -> None:
log("acquire semaphore")
with semaphore:
log("enter semaphore")
time.sleep(2)
log("leave semaphore")
results[threading.current_thread().name] = "done"
今回は4つのスレッドを実行しているのですが、最初に入った2つのスレッドのどちらかが抜けるまでは3つ目、4つ目のスレッドはロックを
獲得できず待機しています。
[2024-11-16 21:02:53] ThreadPoolExecutor-0_0 - acquire semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_0 - enter semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_1 - acquire semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_1 - enter semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_2 - acquire semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_3 - acquire semaphore [2024-11-16 21:02:55] ThreadPoolExecutor-0_0 - leave semaphore [2024-11-16 21:02:55] ThreadPoolExecutor-0_2 - enter semaphore [2024-11-16 21:02:55] ThreadPoolExecutor-0_1 - leave semaphore [2024-11-16 21:02:55] ThreadPoolExecutor-0_3 - enter semaphore [2024-11-16 21:02:57] ThreadPoolExecutor-0_3 - leave semaphore [2024-11-16 21:02:57] ThreadPoolExecutor-0_2 - leave semaphore
Event
Event
は、Event
というオブジェクトを仲介してあるスレッドがイベントを発信した時に、Event
からの通知を待っているスレッドを起動する
しくみです。
サンプルコードはこちら。
def test_event() -> None: event = threading.Event()
results = {}
def wait_event() -> None:
log("wait...")
event.wait()
log("wake up")
results[threading.current_thread().name] = "done"
def set_event() -> None:
log("before set event")
time.sleep(2)
event.set()
log("after set event")
results[threading.current_thread().name] = "done"
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(wait_event))
futures.append(executor.submit(wait_event))
time.sleep(2)
futures.append(executor.submit(set_event))
[f.result() for f in futures]
assert len(results) == 3
Event
の作成。
event = threading.Event()
待機するスレッドは、Event#wait
で通知を待ちます。
def wait_event() -> None:
log("wait...")
event.wait()
log("wake up")
results[threading.current_thread().name] = "done"
そしてイベントを送るスレッドは、Event#set
で待機しているスレッドをEvent#wait
から抜けさせることができます。
def set_event() -> None:
log("before set event")
time.sleep(2)
event.set()
log("after set event")
results[threading.current_thread().name] = "done"
実行結果。
[2024-11-16 21:08:51] ThreadPoolExecutor-0_0 - wait... [2024-11-16 21:08:51] ThreadPoolExecutor-0_1 - wait... [2024-11-16 21:08:53] ThreadPoolExecutor-0_2 - before set event [2024-11-16 21:08:55] ThreadPoolExecutor-0_2 - after set event [2024-11-16 21:08:55] ThreadPoolExecutor-0_0 - wake up [2024-11-16 21:08:55] ThreadPoolExecutor-0_1 - wake up
2つのスレッドがEvent#set
を待っていることがわかります。
Barrier
Barrier
を使うと、複数のスレッドの待ち合わせができるようになります。
サンプルコードはこちら。
def test_barrier() -> None: barrier = threading.Barrier(3)
results = {}
def thread1() -> None:
log("thread1 waiting 3sec...")
time.sleep(3)
barrier.wait()
log("thread1 wakeup")
results["thread1"] = "done"
def thread2() -> None:
log("thread2 waiting 2sec...")
time.sleep(2)
barrier.wait()
log("thread2 wakeup")
results["thread2"] = "done"
def thread3() -> None:
log("thread3 waiting 5sec...")
time.sleep(5)
barrier.wait()
log("thread3 wakeup")
results["thread3"] = "done"
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(thread1))
futures.append(executor.submit(thread2))
futures.append(executor.submit(thread3))
[f.result() for f in futures]
assert results["thread1"] == "done"
assert results["thread2"] == "done"
assert results["thread3"] == "done"
Barrier
は、コンストラクターに待ち合わせるスレッドの数を指定してインスタンスを生成します。
barrier = threading.Barrier(3)
あとはBarrier#wait
を呼び出すとそこで待機し、コンストラクターに指定した数のスレッドがBarrier#wait
の呼び出しに到達すると
動き始めます。
def thread1() -> None:
log("thread1 waiting 3sec...")
time.sleep(3)
barrier.wait()
log("thread1 wakeup")
results["thread1"] = "done"
def thread2() -> None:
log("thread2 waiting 2sec...")
time.sleep(2)
barrier.wait()
log("thread2 wakeup")
results["thread2"] = "done"
def thread3() -> None:
log("thread3 waiting 5sec...")
time.sleep(5)
barrier.wait()
log("thread3 wakeup")
results["thread3"] = "done"
つまり、こういう動作結果になります。
[2024-11-16 21:13:53] ThreadPoolExecutor-0_0 - thread1 waiting 3sec... [2024-11-16 21:13:53] ThreadPoolExecutor-0_1 - thread2 waiting 2sec... [2024-11-16 21:13:53] ThreadPoolExecutor-0_2 - thread3 waiting 5sec... [2024-11-16 21:13:58] ThreadPoolExecutor-0_2 - thread3 wakeup [2024-11-16 21:13:58] ThreadPoolExecutor-0_1 - thread2 wakeup [2024-11-16 21:13:58] ThreadPoolExecutor-0_0 - thread1 wakeup
最後のスレッドがBarrier#wait
を呼び出すまで他のスレッドが待機し、最後のスレッドがBarrier#wait
を呼び出したところで待機していた
スレッドすべてが一気に動き出します。
Timer
最後はTimer
です。これは、指定した時間の後にタスクを実行する仕組みですね。
サンプルコードはこちら。スレッドの待ち合わせにはBarrier
を使いました。
def test_timer() -> None: results = {}
barrier = threading.Barrier(2)
def task() -> None:
log("execute task")
results[threading.current_thread().name] = "done"
barrier.wait()
log("register task")
timer = threading.Timer(3, task)
timer.start()
barrier.wait()
assert len(results) == 1
Timer
は、コンストラクターにタスクを起動するまでの秒数と起動するタスクを関数として指定します。
timer = threading.Timer(3, task)
実行結果はこちら。3秒後にタスクが実行されています。
[2024-11-16 21🔞39] MainThread - register task [2024-11-16 21🔞42] Thread-1 - execute task
こんなところでしょうか。
おわりに
Pythonでスレッドに関する標準ライブラリーをいろいろ試してみました。
使う頻度はそう多くないと思いますが、マルチスレッドを扱う時には押さえておいた方がよさそうなものばかりなので覚えておきましょう。