Распараллеливание задач в Java через invokeAll

Прикладное программирование Программирование на Java Распараллеливание задач в Java через invokeAll

Просмотр 0 веток ответов
  • Автор
    Сообщения
    • #5842
      @admin
      StudLance.ru

      Тут не будет подробного описания java concurrent API. Будет рассмотрена задача, которая может возникнуть в жизни каждого программиста — он замечает некоторые независимые операции и у него появляется нестерпимое желание их распределить по нескольким потокам. Грубо говоря, у вас есть какой-то метод, который можно было бы безболезненно запихнуть в Runnable-ы, стартануть и подождать когда все закончат свою работу.

      Понятно, что для этого может подойти invokeAll т.к. в описании про него явно сказано: «Executes the given tasks, returning a list of Futures holding their status and results when all complete.»

      Так вот, некоторые программисты не дочитывают последние слова и думают, что нужно самому контролировать процесс, ждать и «жать на тормоз», проверять все ли потоки завершились, периодически дергать в цикле future.get(), join-ить или делать другие различные телодвижения. Конечно так поступают не все, но тем не менее такое бывает. Для того, чтобы разобраться в том, что происходит на самом деле, достаточно взглянуть в базовую реализацию этого метода (AbstractExecutorService.java). Пример, поставляемый с JDK:

          public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
              throws InterruptedException {
              if (tasks == null)
                  throw new NullPointerException();
              List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
              boolean done = false;
              try {
                  for (Callable<T> t : tasks) {
                      RunnableFuture<T> f = newTaskFor(t);
                      futures.add(f);
                      execute(f);
                  }
                  for (Future<T> f : futures) {
                      if (!f.isDone()) {
                          try {
                              f.get();
                          } catch (CancellationException ignore) {
                          } catch (ExecutionException ignore) {
                          }
                      }
                  }
                  done = true;
                  return futures;
              } finally {
                  if (!done)
                      for (Future<T> f : futures)
                          f.cancel(true);
              }
          }

      Легко видеть, что действительно как сказано в документации:

      • будем так или иначе ждать пока не выполнятся все задачи (т.к. дергаем f.get)
      • разработчики API не слишком бережно относятся к обработке исключений (о чем тоже сказано в документации).

      Суть метода — «притормозить» в текущей нитке, пока все задачи из коллекции которую мы передали не закончат свою работу или не рухнут (по крайней мере в базовой реализации этого метода).

      Пример использования InvokeAll

      Приведенный ниже исходный код может быть использован лишь в ознакомительных целях, в «боевых» задачах его использовать конечно же нельзя, но возможно пример исходного кода поможет как-то лучше понять происходящее. Еще раз, пожалуйста не используйте его «как есть» в своей работе.

      Допустим есть вымышленная задача.

      Дано:
      1. Массив URL-ов (String).
      2. Пустая директория.
      Задание:
      Скачать содержимое URL-ов и поместить его в файлы указанной директории.

      Решение:

      import java.io.IOException;
      import java.io.InputStream;
      import java.net.URI;
      import java.nio.file.Files;
      import java.nio.file.Path;
      import java.nio.file.Paths;
      import java.nio.file.StandardCopyOption;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.concurrent.Callable;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.Future;
       
      public class Main {
          public static void copy(String[] urls, Path dst) {
              int THREADS = 4; // кол-во потоков
              ExecutorService pool = Executors.newFixedThreadPool(THREADS);
              List<Callable<Object>> tasks = new ArrayList<>();
              try {
                  for (int i = 0; i < urls.length; ++i) {
                      final String url = urls[i];
                      final Path dstPath = dst.resolve(i + ".html");
                      // добавляем задачки
                      tasks.add(new Callable<Object>() {
                          public Object call() throws Exception {
                              download(url, dstPath);
                              return null;
                          }
                      });
                  }
                  // Запускаем пул потоков и ДОЖИДАЕМСЯ! 
                  List<Future<Object>> invokeAll = pool.invokeAll(tasks);
              } catch (InterruptedException e) {
                  e.printStackTrace(); // так лучше не делать в продакшене
              } finally {
                  pool.shutdown();
              }
          }
          // копируем содержимое url в dst (и так тоже лучше не делать в продакшене)
          private static long download(String url, Path dst) {
              try {
                  URI u = URI.create(url);
                  try (InputStream in = u.toURL().openStream()) {
                      Files.copy(in, dst, StandardCopyOption.REPLACE_EXISTING);
                  }
                  System.out.printf("%s -> %s %n", url, dst);
                  return dst.toFile().length();
              } catch (IOException e) {
                  e.printStackTrace();
                  return -1;
              }
          }
       
         public static void main(String[] args) throws Exception {
              String urls[] = {"http://oracle.com", "http://google.com", "http://apple.com",
                  "http://ibm.com", "http://sap.com"};
              copy(urls, Paths.get("")); 
          }
      }

      Конечно в Java 8 аналогичный код с содержимым цикла будет смотреться более элегантно:

                  for (int i = 0; i < urls.length; ++i) {
                      String url = urls[i];
                      Path dstPath = dst.resolve(i + ".html");
                      // Да здравствуют лямбды!
                      tasks.add(() -> download(url, dstPath));
                  }

      Применимость и граничные условия.

      Из выше описанного думаю не сложно сделать вывод о том, что к использованию invokeAll следует подходить с пониманием происходящего и осторожностью.

      Лично я использую его не слишком часто, как правило для написания «утилитных» программок, запускаемых из командной строки в обычной JVM. Например эпизодическая выгрузка данных, утилита для конвертации/загрузки чего-либо куда-либо и т.д., в условиях когда можно быстро в ручную проверить, что выполнение прошло успешно.

      Также нужно учитывать, что при работе в Java EE на Application Server-ах так плодить потоки, мягко говоря, не рекомендуется, а спецификация Java EE прямо запрещает такие вольности с созданием и управлением потоков.

      Известный шуточный принцип «Quod licet bovi, non licet Iovi», как всегда проявляется на Java EE.

      Думаю для Java EE программистов более привычна схема работы с асинхронностью/многопоточностью в ентерпрайзе:
      — либо по старинке через JMS/MDB
      — через managed thread (см. ManagedExecutorService)
      — или выкручиваться на месте, используя умения и особенности конкретного Application Server-а.

      StudLance.ru

Просмотр 0 веток ответов
  • Для ответа в этой теме необходимо авторизоваться.
×