非同期プログラムのイメージと Future (J)

プログラムの概略

今回紹介するプログラム(FutureSample)は、

  • 標準入力からユーザ入力を待ちながら、
  • ユーザー入力に応じて3秒かかるタスクを実行する
    • 別スレッドで実行
    • タスク自身は、3秒 sleep してから与えられた文字を表示するだけ
  • ユーザー入力が q(uit) だった場合、すべてのタスクの終了確認をとってからプログラム終了

というものです。

実行結果です。

a   // ユーザ入力
b   // ユーザ入力
text: a, time: 4.344 // 3秒後タスク出力
text: b, time: 5.11  // 3秒後タスク出力
c   // ユーザ入力
q   // ユーザ入力, 終了指示
Quit process starts. // 終了処理開始
a=4.344   // a の動作結果確認
b=5.11    // b の動作結果確認
text: c, time: 9.986 // 3秒後タスク出力
c=9.986   // c の動作結果確認
// プログラム終了

プログラムを実行してもらえば、入力処理とは別にタスクが処理されていくイメージがつかめるのではと思います。

タスクの生成・依頼

まずは一般論として Java の Thread 生成や、Future の使い方を説明しておきます。

まずは単にスレッドを作成して実行するだけ。 thread.start() で別スレッドが稼働し、// do task の部分が実行されます。 Web API の同期呼出しをしてもらっても構いません。

Thread thread = new Thread(()->{
  // do task
});
thread.start();

仕事を依頼して、自分は別の仕事をしておいて、後で結果を受け取るなら java.util.concurrent.Future を使いましょう。結果を待ち合わせるための「チケット」(=future)を返してくれます。

// Future をつかってタスクを実行
Future<Result> future = executor.submit(()->{
  // do task
  return result;
});
// 別スレッドでタスクを実行中、自分は他のことをしていい
...
// 値の取得
Result result = future.get();

java.util.concurrent.ExecutorService は、 Future<T> submit(Callable<T> task) というメソッドを提供し、Executor の準備したスレッドで task を実行します。 タスクを作る方は、Callable interface にそって、 call() メソッドを実現します(上記では lambda を使って call() の中身だけ書いてます)。

タスクの実行を依頼したら、別の仕事をしても別のタスクもさらに依頼しても大丈夫(複数タスクを実行する場合は、)。 結果は、future に格納されます。あとで結果が欲しくなったらget()で取り出しますが、まだだったら結果が来るまで待つ(wait()相当)ことになります。

最後に、executor の作り方について。 同時に大量にタスクを実行しないなら、1スレッドのみ利用するsingle thread executor (Executors.newSingleThreadExecutor();)で十分ですが、大量のリクエストを同時実行したい場合は実行用に複数スレッドを確保した(スレッドプールと呼ぶ)、thread pool executor を使うといいでしょう。各種 executor の生成用メソッドがは java.util.concurrent.Executorsに並んでいます。

今回のプログラムの解説

  • ExecutorService: 今回は複数スレッドをつかった ExecutorService 利用。
    • 複数タスクを並行に実行できるように(でないと待ち時間が累積する)
ExecutorService executor = Executors.newCachedThreadPool();
  • ArrayList futures: future を貯めておいて、最後に確認するため
ArrayList<Future<SimpleEntry<String, Double>>> futures = new ArrayList<>();
  • main loop
    • ユーザ入力 line をうけとる
    • executor.submit() でタスク実行依頼
    • Future ffutures に詰める
while(true) {
    String line = in.nextLine();
    if(line.equals("q") || line.equals("quit"))
        break;

    // 重い処理を executor に任す
    // 結果がいらないなら  execute() で OK
    // submit() なら future から「後で」結果を取得もできる。
    // retrofit の callback なら、retrofit の executor に任せることになる
    Future<SimpleEntry<String,Double>> f = executor.submit(()->sayAfter(3, line));
    futures.add(f);
}
  • 終了処理
    • executor が仕事受けないように shutdown() を呼んで、
    • futures の各要素 f に対して f.get() 呼ぶことで終了待ち&値取得。
System.out.println("Quit process starts.");
executor.shutdown();
for(Future<SimpleEntry<String, Double>> f: futures) {
    System.out.println(f.get());
}
  • 今回のダミータスク
    • delay (=3) 秒 sleep してから、与えられた文字を表示して、文字と時間のペア(SimpleEntry)を返すだけ。
    • sleep() を囲む try .. catch は、当該スレッドに割り込みが入った時用。
public SimpleEntry<String, Double> sayAfter(int delay, String what) {
    try {
        Thread.sleep((long)delay*1000);
    } catch (Exception e) { /* do nothing */ }
    double t = (System.currentTimeMillis()-start)/1000.0;
    System.out.println("text: " + what +", time: " + t);
    return new SimpleEntry<>(what, t);
}