Ходим по http, сохраняем в базу
Несколько раз в моей работе приходилось решать такую задачу. Ходим куда-то по http, полученные ответы записываем в базу. Ходить по http лучше в несколько потоков, потому что большая часть времени проводится в ожидании ответа. Сохранять в базу лучше пачками, это существенно быстрее чем вносить записи по одной. Сейчас я бы решил эту задачу так:
package ru.yamakarov.examples;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
import one.util.streamex.StreamEx;
public class ParallelProducerBatchWriter {
private static ThreadLocalRandom random = ThreadLocalRandom.current();
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
int requestCount = 100;
int batchSize = 10;
System.out.println("Отправляем запросы на выполнение");
for (int i = 0; i < requestCount; i++) {
completionService.submit(createRequest(i));
}
System.out.println("Собираем результаты запросов в пачки");
AtomicInteger counter = new AtomicInteger(0);
// Пользуем StreamEx для groupRuns
StreamEx.of(streamResults(completionService, requestCount))
.groupRuns((prev, next) -> counter.incrementAndGet() % batchSize != 0)
.forEach(ParallelProducerBatchWriter::writeBatch);
executor.shutdown();
}
private static void writeBatch(List<String> result) {
try {
Thread.sleep(random.nextInt(100));
System.out.println(result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@NotNull
private static Callable<String> createRequest(int j) {
return () -> {
int time = random.nextInt(100);
try {
Thread.sleep(time);
System.out.println(j + " finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
return j + ": " + time + "ms";
};
}
@NotNull
private static Stream<String> streamResults(CompletionService<String> completionService, int requestCount) {
try {
return Stream.iterate(completionService.take().get(), (prev) ->
{
try {
return completionService.take().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
}).limit(requestCount);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
return Stream.of();
}
}
}
Запросы и сохранение в базу я эмулирую засыпанием потока.