非同期プログラムのイメージと Future (J)
プログラムの概略
今回紹介するプログラム(FutureSample)は、
- 標準入力からユーザ入力を待ちながら、
- ユーザー入力に応じて3秒かかるタスクを実行する
- 別スレッドで実行
- タスク自身は、3秒 sleep してから与えられた文字を表示するだけ
- ユーザー入力が
q(uit)
だった場合、すべてのタスクの終了確認をとってからプログラム終了
というものです。
実行結果です。
a // ユーザ入力
b // ユーザ入力
text: a, time: 4.344 // 3秒後タスク出力
text: b, time: 5.11 // 3秒後タスク出力
c // ユーザ入力
q // ユーザ入力, 終了指示
Quit process starts. // 終了処理開始
a=4.344 // a の動作結果確認
b=5.11 // b の動作結果確認
text: c, time: 9.986 // 3秒後タスク出力
c=9.986 // c の動作結果確認
// プログラム終了
プログラムを実行してもらえば、入力処理とは別にタスクが処理されていくイメージがつかめるのではと思います。
タスクの生成・依頼
まずは一般論として Java の Thread 生成や、Future の使い方を説明しておきます。
まずは単にスレッドを作成して実行するだけ。
thread.start()
で別スレッドが稼働し、// do task
の部分が実行されます。
Web API の同期呼出しをしてもらっても構いません。
Thread thread = new Thread(()->{
// do task
});
thread.start();
仕事を依頼して、自分は別の仕事をしておいて、後で結果を受け取るなら java.util.concurrent.Future を使いましょう。結果を待ち合わせるための「チケット」(=future)を返してくれます。
// Future をつかってタスクを実行
Future<Result> future = executor.submit(()->{
// do task
return result;
});
// 別スレッドでタスクを実行中、自分は他のことをしていい
...
// 値の取得
Result result = future.get();
java.util.concurrent.ExecutorService は、
Future<T> submit(Callable<T> task)
というメソッドを提供し、Executor の準備したスレッドで task
を実行します。
タスクを作る方は、Callable interface にそって、 call()
メソッドを実現します(上記では lambda を使って call()
の中身だけ書いてます)。
タスクの実行を依頼したら、別の仕事をしても別のタスクもさらに依頼しても大丈夫(複数タスクを実行する場合は、)。
結果は、future
に格納されます。あとで結果が欲しくなったらget()
で取り出しますが、まだだったら結果が来るまで待つ(wait()
相当)ことになります。
最後に、executor の作り方について。
同時に大量にタスクを実行しないなら、1スレッドのみ利用するsingle thread executor (Executors.newSingleThreadExecutor();
)で十分ですが、大量のリクエストを同時実行したい場合は実行用に複数スレッドを確保した(スレッドプールと呼ぶ)、thread pool executor を使うといいでしょう。各種 executor の生成用メソッドがは java.util.concurrent.Executorsに並んでいます。
今回のプログラムの解説
ExecutorService
: 今回は複数スレッドをつかった ExecutorService 利用。- 複数タスクを並行に実行できるように(でないと待ち時間が累積する)
ExecutorService executor = Executors.newCachedThreadPool();
ArrayList futures
: future を貯めておいて、最後に確認するため
ArrayList<Future<SimpleEntry<String, Double>>> futures = new ArrayList<>();
- main loop
- ユーザ入力
line
をうけとる executor.submit()
でタスク実行依頼Future f
はfutures
に詰める
- ユーザ入力
while(true) {
String line = in.nextLine();
if(line.equals("q") || line.equals("quit"))
break;
// 重い処理を executor に任す
// 結果がいらないなら execute() で OK
// submit() なら future から「後で」結果を取得もできる。
// retrofit の callback なら、retrofit の executor に任せることになる
Future<SimpleEntry<String,Double>> f = executor.submit(()->sayAfter(3, line));
futures.add(f);
}
- 終了処理
executor
が仕事受けないようにshutdown()
を呼んで、futures
の各要素f
に対してf.get()
呼ぶことで終了待ち&値取得。
System.out.println("Quit process starts.");
executor.shutdown();
for(Future<SimpleEntry<String, Double>> f: futures) {
System.out.println(f.get());
}
- 今回のダミータスク
delay
(=3) 秒 sleep してから、与えられた文字を表示して、文字と時間のペア(SimpleEntry
)を返すだけ。sleep()
を囲むtry .. catch
は、当該スレッドに割り込みが入った時用。
public SimpleEntry<String, Double> sayAfter(int delay, String what) {
try {
Thread.sleep((long)delay*1000);
} catch (Exception e) { /* do nothing */ }
double t = (System.currentTimeMillis()-start)/1000.0;
System.out.println("text: " + what +", time: " + t);
return new SimpleEntry<>(what, t);
}