Несколько раз в моей работе приходилось решать такую задачу.
Ходим куда-то по http, полученные ответы записываем в базу.
Ходить по http лучше в несколько потоков, потому что большая часть времени проводится в ожидании ответа.
Сохранять в базу лучше пачками, это существенно быстрее чем вносить записи по одной.
Сейчас я бы решил эту задачу так:
package ru . yamakarov . examples ;
import java.util.List ;
import java.util.concurrent.Callable ;
import java.util.concurrent.CompletionService ;
import java.util.concurrent.ExecutionException ;
import java.util.concurrent.ExecutorCompletionService ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
import java.util.concurrent.ThreadLocalRandom ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.stream.Stream ;
import org.jetbrains.annotations.NotNull ;
import one.util.streamex.StreamEx ;
public class ParallelProducerBatchWriter {
private static ThreadLocalRandom random = ThreadLocalRandom . current ();
public static void main ( String [] args ) {
ExecutorService executor = Executors . newFixedThreadPool ( 10 );
CompletionService < String > completionService = new ExecutorCompletionService <>( executor );
int requestCount = 100 ;
int batchSize = 10 ;
System . out . println ( "Отправляем запросы на выполнение" );
for ( int i = 0 ; i < requestCount ; i ++) {
completionService . submit ( createRequest ( i ));
}
System . out . println ( "Собираем результаты запросов в пачки" );
AtomicInteger counter = new AtomicInteger ( 0 );
// Пользуем StreamEx для groupRuns
StreamEx . of ( streamResults ( completionService , requestCount ))
. groupRuns (( prev , next ) -> counter . incrementAndGet () % batchSize != 0 )
. forEach ( ParallelProducerBatchWriter: : writeBatch );
executor . shutdown ();
}
private static void writeBatch ( List < String > result ) {
try {
Thread . sleep ( random . nextInt ( 100 ));
System . out . println ( result );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
}
@NotNull
private static Callable < String > createRequest ( int j ) {
return () -> {
int time = random . nextInt ( 100 );
try {
Thread . sleep ( time );
System . out . println ( j + " finished" );
} catch ( InterruptedException e ) {
e . printStackTrace ();
}
return j + ": " + time + "ms" ;
};
}
@NotNull
private static Stream < String > streamResults ( CompletionService < String > completionService , int requestCount ) {
try {
return Stream . iterate ( completionService . take (). get (), ( prev ) ->
{
try {
return completionService . take (). get ();
} catch ( InterruptedException | ExecutionException e ) {
e . printStackTrace ();
}
return null ;
}). limit ( requestCount );
} catch ( InterruptedException | ExecutionException e ) {
e . printStackTrace ();
return Stream . of ();
}
}
}
Запросы и сохранение в базу я эмулирую засыпанием потока.