非同期プログラムのイメージと Future (P)

プログラムの概略

今回紹介するプログラム(input_test.py)は、

  • 標準入力からユーザ入力を待ちながら、
  • ユーザー入力に応じて3秒かかるタスクを実行する
    • コルーチンで実行
    • タスク自身は、3秒 sleep してから与えられた文字を表示するだけ
  • ユーザー入力が q(uit) だった場合、すべてのタスクの終了確認をとってからプログラム終了

というものです。

なお、実行するためには aioconsole というライブラリを pip などでインストールする必要があります。

実行結果です。

a   // ユーザ入力
b   // ユーザ入力
time:  4.869091272354126 , text:  a  // 3秒後タスク出力
time:  5.617463111877441 , text:  b  // 3秒後タスク出力
c   // ユーザ入力
q   // ユーザ入力, 終了指示
Quit process starts.     // 終了処理開始
['a', 4.869091272354126] // a の動作結果確認
['b', 5.617463111877441] // b の動作結果確認
time:  10.3854660987854 , text:  c   // 3秒後タスク出力
['c', 10.3854660987854]  // c の動作結果確認
// プログラム終了

プログラムを実行してもらえば、入力処理とは別にタスクが処理されていくイメージがつかめるのではと思います。

asyncio のコルーチンの基本

まずは asyncio の使い方から。

コルーチンは、スレッドのようなものです。ただ、thread は任意のタイミングで切り替わりますが、 コルーチンは単一スレッド上で I/O などのタイミングでのみ実行コルーチンが切り替わりながら動いていきます。

  • 切り替わるタイミング: awaitcreate_task() など asyncio の操作をおこなったタイミング
    • なので、排他制御トラブルは起きにくい。
    • critical section の途中に await など入れないように
  • ただ、I/O などは標準 API ではなく コルーチン対応の API を使う必要。一般の blocking I/O をおこなうと止まる

また、await などのコルーチンの機能を使えるのは async def を用いて定義された関数に限定されます。

  • コルーチンとして呼び出せるもの
    • 自分で async def で定義したルーチン
    • asyncio などで提供されるコルーチン対応 async 関数群
  • コルーチン (cor) の呼出し方
    • task = asyncio.create_task(cor(..)): コルーチンを生成。即時実行されるようにスケジュールされる。終了は待たない
      • await task: 生成されたコルーチンの終了を待つ。
    • await cor(..): コルーチンを生成して、その終了を待つ。ある意味同期呼出しっぽいが、I/O 待ちでスイッチできる。
    • 注: cor(..) だけだとコルーチンを実行することにはならないので、awaitcreate_task() と併用しましょう。

では、プログラム例を通して、プログラムの書き方を見ていきましょう。

コルーチン定義

async def say_after(delay, what):
    await asyncio.sleep(delay)  # delay秒待つ
    t = time.time()-start
    print("time: ", t, ", text: ", what)
    return [what, t]

delay 秒待ってからwhat を表示するプログラム。返り値として、what と表示をおこなった時間の組を返す。

sleep する際も、asyncio.sleep() をつかうことで、別のコルーチンに制御がうつり、時間がくるとスケジュール可能となる。

入力待ちとタスク生成

入力待ちをしてはタスク生成をおこなうのは、async def main_loop() です。 await などをつかう、つまり切り替わって動作するこの関数もコルーチンとして定義されないといけません。

async def main_loop():
    while True:
        line = await aioconsole.ainput()
        if line == "quit" or line == "q":
            break
        task = asyncio.create_task(say_after(3, line))
        futures.append(task)
    # 終了処理	 
    print("Quit process starts.")
    for f in futures:
        print(await f)
  • 入力待ち: こちらもスイッチのタイミングにしたいので、コルーチン対応のaioconsole.ainput()を利用
    • aioconsole を pip で入れる必要
  • コルーチンを起動: create_task で、say_after(3, line)` を実行。
    • 返り値は Task ですが Future としての機能も持ちます。
    • ということで、最後に終了確認するために futures というリストに詰めます
  • 終了処理: 順番に task の終了確認
    • print(await f): await は待つだけでなくコルーチンの返り値も取得できるので、その表示をしている

コルーチンの稼働

コルーチンを使うには、最初はコルーチンのシステムを起動し、そのうえで指定したコルーチンを実行することになります。

asyncio.run(main_loop())

これで、コール―チンのシステム (event_loop と呼ばれれるスケジューラ) を準備したうえで、main_loop() を実行させてくれます。

ただ、終了処理でトラブル時は、明に event_loop を取得してすべてのコルーチンの終了を待つ以下のスタイルの方が良いようです。

loop = asyncio.get_event_loop()
loop.run_until_complete(main_loop())