CLOVER🍀 (original) (raw)

これは、なにをしたくて書いたもの?

Kubernetesでいろいろ試そうと思うとプライベートレジストリーが欲しくなったりするものですが、割と面倒な気がします。

MicroK8sだとregistryアドオンで多少簡単に導入できそうだったので試してみることにしました。

How to use the built-in registry

MicroK8sのregistryアドオン

registryアドオンを使ったレジストリーの使い方はこちら。

How to use the built-in registry

アドオンを使って導入するプライベートレジストリーは、組み込みレジストリーと呼んでいるみたいです。

流れとしては以下になります。

レジストリーの用意が簡単なのがよいところでしょうか。試してみましょう。

ちなみに、組み込みではないプライベートレジストリーについてのドキュメントはこちらで、

How to work with a private registry

アドオンの一覧はこちらです。

MicroK8s Addons

環境

今回の環境はこちら。

$ microk8s version MicroK8s v1.31.2 revision 7394

$ microk8s kubectl version Client Version: v1.31.2 Kustomize Version: v5.4.2 Server Version: v1.31.2

Dockerのバージョン。

$ docker version Client: Docker Engine - Community Version: 27.3.1 API version: 1.47 Go version: go1.22.7 Git commit: ce12230 Built: Fri Sep 20 11:41:00 2024 OS/Arch: linux/amd64 Context: default

Server: Docker Engine - Community Engine: Version: 27.3.1 API version: 1.47 (minimum version 1.24) Go version: go1.22.7 Git commit: 41ca978 Built: Fri Sep 20 11:41:00 2024 OS/Arch: linux/amd64 Experimental: false containerd: Version: 1.7.23 GitCommit: 57f17b0a6295a39009d861b89e3b3b87b005ca27 runc: Version: 1.1.14 GitCommit: v1.1.14-0-g2c9f560 docker-init: Version: 0.19.0 GitCommit: de40ad0

MicroK8sのregistryアドオンを使う

それでは、MicroK8sのregistryアドオンを使ってみましょう。

まずはregistryアドオンを有効にします。

$ microk8s enable registry

ログ。

Infer repository core for addon registry Infer repository core for addon hostpath-storage Enabling default storage class. WARNING: Hostpath storage is not suitable for production environments. A hostpath volume can grow beyond the size limit set in the volume claim manifest.

deployment.apps/hostpath-provisioner created storageclass.storage.k8s.io/microk8s-hostpath created serviceaccount/microk8s-hostpath created clusterrole.rbac.authorization.k8s.io/microk8s-hostpath created clusterrolebinding.rbac.authorization.k8s.io/microk8s-hostpath created Storage will be available soon. The registry will be created with the size of 20Gi. Default storage class will be used. namespace/container-registry created persistentvolumeclaim/registry-claim created deployment.apps/registry created service/registry created configmap/local-registry-hosting configured

レジストリーの容量は、デフォルトで20Gです。

もっと大きなレジストリーとしたい場合は以下のように指定するみたいです。

$ microk8s enable registry:size=40Gi

microk8s statusで見てみると

$ microk8s status

registryが現れます。32000ポートでリッスンしているようです。

addons: enabled: dns # (core) CoreDNS ha-cluster # (core) Configure high availability on the current node helm # (core) Helm - the package manager for Kubernetes helm3 # (core) Helm 3 - the package manager for Kubernetes hostpath-storage # (core) Storage class; allocates storage from host directory registry # (core) Private image registry exposed on localhost:32000 storage # (core) Alias to hostpath-storage add-on, deprecated

では、プライベートレジストリーにpushするイメージを作成します。nginxをベースにしましょう。

Dockerfile

FROM nginx:1.27.2-bookworm

といっても、イメージ名を変えたくらいですが。イメージ名のドメイン名の部分はlocalhost:32000にします。

$ docker image build -t localhost:32000/nginx:1.27.2-bookworm .

作成できました。

$ docker image ls REPOSITORY TAG IMAGE ID CREATED SIZE localhost:32000/nginx 1.27.2-bookworm 60c8a892f36f 6 weeks ago 192MB

pushしてみましょう。

$ docker image push localhost:32000/nginx:1.27.2-bookworm

insecure-registriesの設定を入れていませんが、pushできてしまいました…。

The push refers to repository [localhost:32000/nginx] 61ef4e878aac: Pushed a0c145a29c8d: Pushed a1fe8b721bb1: Pushed 19b722697f76: Pushed ffe4285e2906: Pushed 7dca41ff1486: Pushed c3548211b826: Pushed 1.27.2-bookworm: digest: sha256:2ebf3d369d813bcc6a531ba43e1859bd91ad5c8217ae33b821f5ffada06a6cd4 size: 1778

ドキュメントでは、一部のDockerのバージョンでは失敗する可能性があるくらいに書かれていましたが…。

Pushing to this insecure registry may fail in some versions of Docker unless the daemon is explicitly configured to trust this registry.

今回は気にせず進めてみましょう。

pushしたイメージを使って、Deploymentを作成。

$ microk8s kubectl create deployment nginx --image=localhost:32000/nginx:1.27.2-bookworm deployment.apps/nginx created

Podが動いていることを確認。

$ microk8s kubectl get all NAME READY STATUS RESTARTS AGE pod/nginx-75fd58947c-fhtnj 1/1 Running 0 17s

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kubernetes ClusterIP 10.152.183.1 443/TCP 72m

NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/nginx 1/1 1 1 17s

NAME DESIRED CURRENT READY AGE replicaset.apps/nginx-75fd58947c 1 1 1 17s

Serviceも作成しましょう。

$ microk8s kubectl expose deployment nginx --port 80 service/nginx exposed

確認。

$ microk8s kubectl get all NAME READY STATUS RESTARTS AGE pod/nginx-75fd58947c-fhtnj 1/1 Running 0 72s

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kubernetes ClusterIP 10.152.183.1 443/TCP 73m service/nginx ClusterIP 10.152.183.208 80/TCP 29s

NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/nginx 1/1 1 1 72s

NAME DESIRED CURRENT READY AGE replicaset.apps/nginx-75fd58947c 1 1 1 72s

アクセスしてみます。

$ curl 10.152.183.208

Welcome to nginx!

Welcome to nginx!

If you see this page, the nginx web server is successfully installed and working. Further configuration is required.

For online documentation and support please refer to nginx.org.
Commercial support is available at nginx.com.

Thank you for using nginx.

OKですね。

後片付け。

$ microk8s kubectl delete service nginx $ microk8s kubectl delete deployment nginx

$ microk8s kubectl get all NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kubernetes ClusterIP 10.152.183.1 443/TCP 84m

オマケ

今回は不要でしたが、insecure-registriesを設定する場合は/etc/docker/daemon.jsonに以下のように設定して

/etc/docker/daemon.json

{ "insecure-registries" : ["localhost:32000"] }

Dockerデーモンを再起動します。

$ sudo systemctl restart docker

おわりに

MicroK8sのregistryアドオンを使って、組み込みレジストリー(プライベートレジストリー)を使ってみました。

かなり簡単に導入できたので驚きました。Kubernetesを使ってちょっと確認をしたい時や開発用途などで便利なのではないでしょうか。

これは、なにをしたくて書いたもの?

Pythonで、マルチスレッドに関する標準ライブラリーを知っておきたいなと思いまして。

ちなみにスレッド自体は過去にも扱っています。

Pythonのスレッドは、ネイティブスレッドなのか? - CLOVER🍀

PythonのTCPServer/HTTPServerをマルチスレッドで使う - CLOVER🍀

今回はロックやセマフォといったスレッド間に関する標準ライブラリーを見ていきます。

threadingライブラリー

threadingライブラリーのページはこちら。

threading --- スレッドベースの並列処理 — Python 3.10.15 ドキュメント

こちらには以下のAPIやクラスが含まれています。

なお、LockRLockConditionSemaphoreはコンテキストマネージャーとして使えます。

with 文でのロック・条件変数・セマフォの使い方

具体的にはロックの獲得をacquireで行い、解放にreleaseを使うものはコンテキストマネージャーとして使えるようになっていて、
withブロックに入る時にacquireが呼び出されwithブロックを抜ける時にreleaseが呼び出されます。

ところで、PythonにおけるスレッドはGILがあるので1プロセス内で同時に実行できるスレッドはひとつだけです。Pythonでマルチスレッドが
有効なのはIOバウンドな処理を並列して実行したい時ですね。

CPython 実装の詳細: CPython は Global Interpreter Lock のため、ある時点で Python コードを実行できるスレッドは1つに限られます (ただし、いくつかのパフォーマンスが強く求められるライブラリはこの制限を克服しています)。アプリケーションにマルチコアマシンの計算能力をより良く利用させたい場合は、 multiprocessing モジュールや concurrent.futures.ProcessPoolExecutor の利用をお勧めします。 ただし、I/Oバウンドなタスクを並行して複数走らせたい場合においては、 マルチスレッドは正しい選択肢です。

ちなみにスレッド自体を直接扱うのではなく、concurrent.futuresThreadPoolExecutorを使うのがよいと思います。

concurrent.futures -- 並列タスク実行 — Python 3.10.15 ドキュメント

今回はこのあたりを試してみたいと思います。

環境

今回の環境はこちら。

$ python3 --version Python 3.10.12

$ pip3 --version pip 22.0.2 from /usr/lib/python3/dist-packages/pip (python 3.10)

準備

確認はpytestで行いたいと思います。型チェックにmypyも入れておきます。

$ pip3 install pytest mypy

インストールされたライブラリーの一覧。

$ pip3 list Package Version


exceptiongroup 1.2.2 iniconfig 2.0.0 mypy 1.13.0 mypy-extensions 1.0.0 packaging 24.2 pip 22.0.2 pluggy 1.5.0 pytest 8.3.3 setuptools 59.6.0 tomli 2.1.0 typing_extensions 4.12.2

動作確認はpytestを使ったテストコードで行いますが、雛形はこちらです。

tests/test_threading.py

from concurrent.futures import ThreadPoolExecutor import datetime import threading import time

def log(message: str) -> None: print(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {threading.current_thread().name} - {message}")

またテスト中に標準出力への書き出しを行うので、pytestは--capture=noオプションを指定して実行します。

$ pytest --capture=no

では、試していってみましょう。

スレッドローカルデータ

最初はスレッドローカルデータから。

スレッドローカルデータ

スレッドローカルデータは、そのスレッド固有のデータを持たせる仕組みです。あたかも単一スレッド前提のような使い方をするコードで
複数スレッドで実行しても、それぞれのデータが独立して扱えるので便利です。

サンプルコード。

def test_thread_local_data() -> None: results = {}

localdata = threading.local()

def thread1() -> None:
    time.sleep(3)

    localdata.mydata = "Hello from thread1"

    time.sleep(2)

    log(f"thread1 data = {localdata.mydata}")

    assert localdata.mydata == "Hello from thread1"

    results[threading.current_thread().name] = "done"

def thread2() -> None:
    time.sleep(2)

    localdata.mydata = "Hello from thread2"

    time.sleep(3)

    log(f"thread2 data = {localdata.mydata}")

    assert localdata.mydata == "Hello from thread2"

    results[threading.current_thread().name] = "done"

with ThreadPoolExecutor() as executor:
    futures = []

    futures.append(executor.submit(thread1))
    futures.append(executor.submit(thread2))

    [f.result() for f in futures]

    assert len(results) == 2

スレッドローカルデータは、threading.localで取得したオブジェクトで表現されます。

localdata = threading.local()

スレッドローカルデータは辞書のように扱えます。

同じオブジェクトに各スレッドが同じ属性に書き込んでいますが、それぞれのスレッドが設定した値がしっかり残っています。

def thread1() -> None:
    time.sleep(3)

    localdata.mydata = "Hello from thread1"

    time.sleep(2)

    log(f"thread1 data = {localdata.mydata}")

    assert localdata.mydata == "Hello from thread1"

    results[threading.current_thread().name] = "done"

def thread2() -> None:
    time.sleep(2)

    localdata.mydata = "Hello from thread2"

    time.sleep(3)

    log(f"thread2 data = {localdata.mydata}")

    assert localdata.mydata == "Hello from thread2"

    results[threading.current_thread().name] = "done"

標準出力の結果。

[2024-11-16 20:32:29] ThreadPoolExecutor-0_0 - thread1 data = Hello from thread1 [2024-11-16 20:32:29] ThreadPoolExecutor-0_1 - thread2 data = Hello from thread2

このように、スレッドごとに固有の値を管理できる仕組みです。

ちなみにスレッドローカルデータはlocalを継承することで独自のスレッドローカルデータを作れたりするのですが、ドキュメントにほとんど
説明がありません。APIの説明もないですね。

詳しくはソースコードを見ること、だそうです。

詳細と例題については、 _threading_local モジュールのドキュメンテーション文字列を参照してください。

https://github.com/python/cpython/blob/v3.10.12/Lib/_threading_local.py

Lock、RLock

次はLockRLockです。

まずはLockから。

def test_lock() -> None: lock = threading.Lock()

results = {}

def with_lock() -> None:
    log("try lock")

    lock.acquire()

    try:
        log("start")

        time.sleep(2)

        log("end")

        results[threading.current_thread().name] = "done"
    finally:
        lock.release()

with ThreadPoolExecutor() as executor:
    futures = []

    futures.append(executor.submit(with_lock))
    futures.append(executor.submit(with_lock))

    [f.result() for f in futures]

    assert len(results) == 2

Lockを作成して

lock = threading.Lock()

Lock#acquireでロックを獲得できます。ロックを獲得できるスレッドはひとつだけで、他のスレッドがLock#acquireを呼び出した場合は
Lock#releaseでロックが解放されるまで待たされることになります。

def with_lock() -> None:
    log("try lock")

    lock.acquire()

    try:
        log("start")

        time.sleep(2)

        log("end")

        results[threading.current_thread().name] = "done"
    finally:
        lock.release()

なのでfinallyで確実にロックを解放する必要があります。

標準出力に書き出された結果を見ると、最初にロックを取得したスレッドがロックを開放するまで2つ目のスレッドが待たされているのが
確認できます。

[2024-11-16 20:41:09] ThreadPoolExecutor-0_0 - try lock [2024-11-16 20:41:09] ThreadPoolExecutor-0_0 - start [2024-11-16 20:41:09] ThreadPoolExecutor-0_1 - try lock [2024-11-16 20:41:11] ThreadPoolExecutor-0_0 - end [2024-11-16 20:41:11] ThreadPoolExecutor-0_1 - start [2024-11-16 20:41:13] ThreadPoolExecutor-0_1 - end

なお、Lockはコンテキストマネージャーに対応しているのでwithを使ってシンプルに書くことができます。

    with lock:
        log("start")

        time.sleep(2)

        log("end")

        results[threading.current_thread().name] = "done"

こちらの方がLock#releaseの呼び出し忘れなどがなくてよいでしょう。以降はwithでロックを扱います。

なお、Lockを使ったロックの場合、ロックを獲得したスレッドであってもロック解放前にLock#acquireを呼び出した場合はロックを取得できず
待たされることになります。

つまり、以下のようなコードを書いてしまうとロックを取得したスレッドが止まってしまいます。

def test_lock_reentrant() -> None: lock = threading.Lock()

results = {}

def with_lock() -> None:
    log("try lock")

    with lock:
        log("start")

        time.sleep(2)

        log("reentrant lock")

        with lock:  
            log("do something")

        log("release reentrant lock")

        log("end")

        results[threading.current_thread().name] = "done"

with ThreadPoolExecutor() as executor:
    futures = []

    futures.append(executor.submit(with_lock))
    futures.append(executor.submit(with_lock))

    [f.result() for f in futures]

    assert len(results) == 2

実行した場合は、標準出力がここで停止します。

[2024-11-16 20:44:08] ThreadPoolExecutor-0_0 - try lock [2024-11-16 20:44:08] ThreadPoolExecutor-0_0 - start [2024-11-16 20:44:08] ThreadPoolExecutor-0_1 - try lock [2024-11-16 20:44:10] ThreadPoolExecutor-0_0 - reentrant lock

つまり、Lockは最入可能ではありません。

最入可能なロックが必要な場合はRLockを使います。

先程のコードをRLockを使って書き直したものがこちらです。

def test_rlock() -> None: lock = threading.RLock()

results = {}

def with_lock() -> None:
    with lock:
        log("start")

        time.sleep(2)

        log("reentrant lock")

        with lock:
            log("do something")

        log("release reentrant lock")

        log("end")

        results[threading.current_thread().name] = "done"

with ThreadPoolExecutor() as executor:
    futures = []

    futures.append(executor.submit(with_lock))
    futures.append(executor.submit(with_lock))

    [f.result() for f in futures]

    assert len(results) == 2

Lockインスタンスを作成していたところをRLockにするだけで、あとはLockと使い方は同じですね。

lock = threading.RLock()

ただしRLockは最入可能なので、先ほどはLockで動作しなかったひとつのスレッドが同じロックインスタンスに対して2回acquire
呼び出すようなコードであっても

def with_lock() -> None:
    with lock:
        log("start")

        time.sleep(2)

        log("reentrant lock")

        with lock:
            log("do something")

        log("release reentrant lock")

        log("end")

        results[threading.current_thread().name] = "done"

このように動くようになります。

[2024-11-16 20:50:49] ThreadPoolExecutor-0_0 - start [2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - reentrant lock [2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - do something [2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - release reentrant lock [2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - end [2024-11-16 20:50:51] ThreadPoolExecutor-0_1 - start [2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - reentrant lock [2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - do something [2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - release reentrant lock [2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - end

Condition

Conditionはロックに関連付けられるオボジェクトで、スレッドを待機させたり起こしたりできます。

サンプルはこちら。

def test_condition() -> None: condition = threading.Condition()

results = {}

def wait_task() -> None:
    with condition:
        log("waiting...")

        condition.wait()

        log("wakeup")

        results[threading.current_thread().name] = "done"

def notify_task() -> None:
    with condition:
        log("notify")

        condition.notify_all()

        log("done")

        results[threading.current_thread().name] = "done"

with ThreadPoolExecutor() as executor:
    futures = []

    futures.append(executor.submit(wait_task))
    futures.append(executor.submit(wait_task))

    time.sleep(3)

    futures.append(executor.submit(notify_task))

    [f.result() for f in futures]

    assert len(results) == 3

Conditionコンストラクターインスタンスを取得しますが、引数を指定しない場合は内部的にRLockインスタンスを作成します。

condition = threading.Condition()

引数を指定する場合は、LockまたはRLockインスタンスを渡す必要があります。

Condition#waitでスレッドを待機させます。Conditionに対する操作は、ロックを獲得したうえで行う必要があります。

def wait_task() -> None:
    with condition:
        log("waiting...")

        condition.wait()

        log("wakeup")

        results[threading.current_thread().name] = "done"

そしてCondition#nofityまたはCondition#notify_allで待機しているスレッドを起こすことができます。

def notify_task() -> None:
    with condition:
        log("notify")

        condition.notify_all()

        log("done")

        results[threading.current_thread().name] = "done"

Condition#nofityではひとつまたは指定した数のスレッドを、Condition#notify_allでは待機しているスレッドすべてを起こすことができます。

標準出力の結果はこちら。

[2024-11-16 20:59:00] ThreadPoolExecutor-0_0 - waiting... [2024-11-16 20:59:00] ThreadPoolExecutor-0_1 - waiting... [2024-11-16 20:59:03] ThreadPoolExecutor-0_2 - notify [2024-11-16 20:59:03] ThreadPoolExecutor-0_2 - done [2024-11-16 20:59:03] ThreadPoolExecutor-0_0 - wakeup [2024-11-16 20:59:03] ThreadPoolExecutor-0_1 - wakeup

ちなみに、Condition#wait_forという引数に指定した関数の戻り値がTrueになるとスレッドが起きるようにするAPIもあるようです。

Semaphore

Semaphoreは、ある範囲を同時に実行できるスレッドの数を制限する仕組みです。

Semaphore

サンプルコードはこちら。

def test_semaphore() -> None: semaphore = threading.Semaphore(2)

results = {}

def with_semaphore() -> None:
    log("acquire semaphore")

    with semaphore:
        log("enter semaphore")

        time.sleep(2)

        log("leave semaphore")

        results[threading.current_thread().name] = "done"

with ThreadPoolExecutor() as executor:
    futures = []

    futures.append(executor.submit(with_semaphore))
    futures.append(executor.submit(with_semaphore))
    futures.append(executor.submit(with_semaphore))
    futures.append(executor.submit(with_semaphore))

    [f.result() for f in futures]

    assert len(results) == 4

Semaphoreは、コンストラクターに同時に実行できるスレッド数を指定してインスタンスを生成します。ここでは2を指定しています。

semaphore = threading.Semaphore(2)

あとはLockRLockのようにロックしたい範囲を指定して使います。

def with_semaphore() -> None:
    log("acquire semaphore")

    with semaphore:
        log("enter semaphore")

        time.sleep(2)

        log("leave semaphore")

        results[threading.current_thread().name] = "done"

今回は4つのスレッドを実行しているのですが、最初に入った2つのスレッドのどちらかが抜けるまでは3つ目、4つ目のスレッドはロックを
獲得できず待機しています。

[2024-11-16 21:02:53] ThreadPoolExecutor-0_0 - acquire semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_0 - enter semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_1 - acquire semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_1 - enter semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_2 - acquire semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_3 - acquire semaphore [2024-11-16 21:02:55] ThreadPoolExecutor-0_0 - leave semaphore [2024-11-16 21:02:55] ThreadPoolExecutor-0_2 - enter semaphore [2024-11-16 21:02:55] ThreadPoolExecutor-0_1 - leave semaphore [2024-11-16 21:02:55] ThreadPoolExecutor-0_3 - enter semaphore [2024-11-16 21:02:57] ThreadPoolExecutor-0_3 - leave semaphore [2024-11-16 21:02:57] ThreadPoolExecutor-0_2 - leave semaphore

Event

Eventは、Eventというオブジェクトを仲介してあるスレッドがイベントを発信した時に、Eventからの通知を待っているスレッドを起動する
しくみです。

Event

サンプルコードはこちら。

def test_event() -> None: event = threading.Event()

results = {}

def wait_event() -> None:
    log("wait...")

    event.wait()

    log("wake up")

    results[threading.current_thread().name] = "done"

def set_event() -> None:
    log("before set event")

    time.sleep(2)
    
    event.set()

    log("after set event")

    results[threading.current_thread().name] = "done"

with ThreadPoolExecutor() as executor:
    futures = []

    futures.append(executor.submit(wait_event))
    futures.append(executor.submit(wait_event))

    time.sleep(2)

    futures.append(executor.submit(set_event))

    [f.result() for f in futures]

    assert len(results) == 3

Eventの作成。

event = threading.Event()

待機するスレッドは、Event#waitで通知を待ちます。

def wait_event() -> None:
    log("wait...")

    event.wait()

    log("wake up")

    results[threading.current_thread().name] = "done"

そしてイベントを送るスレッドは、Event#setで待機しているスレッドをEvent#waitから抜けさせることができます。

def set_event() -> None:
    log("before set event")

    time.sleep(2)
    
    event.set()

    log("after set event")

    results[threading.current_thread().name] = "done"

実行結果。

[2024-11-16 21:08:51] ThreadPoolExecutor-0_0 - wait... [2024-11-16 21:08:51] ThreadPoolExecutor-0_1 - wait... [2024-11-16 21:08:53] ThreadPoolExecutor-0_2 - before set event [2024-11-16 21:08:55] ThreadPoolExecutor-0_2 - after set event [2024-11-16 21:08:55] ThreadPoolExecutor-0_0 - wake up [2024-11-16 21:08:55] ThreadPoolExecutor-0_1 - wake up

2つのスレッドがEvent#setを待っていることがわかります。

Barrier

Barrierを使うと、複数のスレッドの待ち合わせができるようになります。

Barrier

サンプルコードはこちら。

def test_barrier() -> None: barrier = threading.Barrier(3)

results = {}

def thread1() -> None:
    log("thread1 waiting 3sec...")
    time.sleep(3)

    barrier.wait()
    log("thread1 wakeup")

    results["thread1"] = "done"

def thread2() -> None:
    log("thread2 waiting 2sec...")
    time.sleep(2)

    barrier.wait()
    log("thread2 wakeup")

    results["thread2"] = "done"

def thread3() -> None:
    log("thread3 waiting 5sec...")
    time.sleep(5)

    barrier.wait()
    log("thread3 wakeup")

    results["thread3"] = "done"

with ThreadPoolExecutor() as executor:
    futures = []

    futures.append(executor.submit(thread1))
    futures.append(executor.submit(thread2))
    futures.append(executor.submit(thread3))

    [f.result() for f in futures]

    assert results["thread1"] == "done"
    assert results["thread2"] == "done"
    assert results["thread3"] == "done"

Barrierは、コンストラクターに待ち合わせるスレッドの数を指定してインスタンスを生成します。

barrier = threading.Barrier(3)

あとはBarrier#waitを呼び出すとそこで待機し、コンストラクターに指定した数のスレッドがBarrier#waitの呼び出しに到達すると
動き始めます。

def thread1() -> None:
    log("thread1 waiting 3sec...")
    time.sleep(3)

    barrier.wait()
    log("thread1 wakeup")

    results["thread1"] = "done"

def thread2() -> None:
    log("thread2 waiting 2sec...")
    time.sleep(2)

    barrier.wait()
    log("thread2 wakeup")

    results["thread2"] = "done"

def thread3() -> None:
    log("thread3 waiting 5sec...")
    time.sleep(5)

    barrier.wait()
    log("thread3 wakeup")

    results["thread3"] = "done"

つまり、こういう動作結果になります。

[2024-11-16 21:13:53] ThreadPoolExecutor-0_0 - thread1 waiting 3sec... [2024-11-16 21:13:53] ThreadPoolExecutor-0_1 - thread2 waiting 2sec... [2024-11-16 21:13:53] ThreadPoolExecutor-0_2 - thread3 waiting 5sec... [2024-11-16 21:13:58] ThreadPoolExecutor-0_2 - thread3 wakeup [2024-11-16 21:13:58] ThreadPoolExecutor-0_1 - thread2 wakeup [2024-11-16 21:13:58] ThreadPoolExecutor-0_0 - thread1 wakeup

最後のスレッドがBarrier#waitを呼び出すまで他のスレッドが待機し、最後のスレッドがBarrier#waitを呼び出したところで待機していた
スレッドすべてが一気に動き出します。

Timer

最後はTimerです。これは、指定した時間の後にタスクを実行する仕組みですね。

Timer

サンプルコードはこちら。スレッドの待ち合わせにはBarrierを使いました。

def test_timer() -> None: results = {}

barrier = threading.Barrier(2)

def task() -> None:
    log("execute task")
    
    results[threading.current_thread().name] = "done"

    barrier.wait()
    
log("register task")

timer = threading.Timer(3, task)
timer.start()

barrier.wait()

assert len(results) == 1

Timerは、コンストラクターにタスクを起動するまでの秒数と起動するタスクを関数として指定します。

timer = threading.Timer(3, task)

実行結果はこちら。3秒後にタスクが実行されています。

[2024-11-16 21🔞39] MainThread - register task [2024-11-16 21🔞42] Thread-1 - execute task

こんなところでしょうか。

おわりに

Pythonでスレッドに関する標準ライブラリーをいろいろ試してみました。

使う頻度はそう多くないと思いますが、マルチスレッドを扱う時には押さえておいた方がよさそうなものばかりなので覚えておきましょう。