Multithreading in practice

    Нашел как-то на stack overflow вопрос (link).
    Need to create java CLI programm that searchs for specific files matched some pattern. Need to use multi-threading approach without using util.concurrent package and to provide good performance on parallel controllers.
    Перевод
    Нужно написать консольную программу, которая ищет файлы по какому-то паттерну. Программа должна быть многопоточная, но нельзя использовать пакет util.concurrent. Требуется добиться максимальной производительности.


    В общем идея в принципе была не сложная. Т.к. по условию нельзя использовать util.concurrent, то надо реализовать свой пул потоков, плюс написать какие-то таски, которые в этом пуле потоков будут крутиться.
    Так же я не был уверен в том, что при многопоточном использовании IO будет увеличение производительности.

    Сразу скажу, что целью было не выполнение задания, а исследование проблемы, поэтому не весь код будет красивый.

    В принципе алгоритм похож на рекурсивный обход дерева, вот к примеру простая его реализация link
    	import java.io.File;
    	public class MainEntry {
    	    public static void main(String[] args) {
    	        walkin(new File("/home/user")); //Replace this with a suitable directory
    	    }
    	 
    	    /**
    	     * Recursive function to descend into the directory tree and find all the files 
    	     * that end with ".mp3"
    	     * @param dir A file object defining the top directory
    	     **/
    	    public static void walkin(File dir) {
    	        String pattern = ".mp3";
    	 
    	        File listFile[] = dir.listFiles();
    	        if(listFile != null) {
    	            for(int i=0; i<listFile.length; i++) {
    	                if(listFile[i].isDirectory()) {
    	                    walkin(listFile[i]);
    	                } else {
    	                    if(listFile[i].getName().endsWith(pattern)) {
    	                        System.out.println(listFile[i].getPath());
    	                    }
    	                }
    	            }
    	        }
    	    }
    	}
    	


    Для начало проверим как быстро работает однопоточная реализация, код однопоточной реализации я приводить не буду, он есть в архиве.
    Результаты следующие:
    154531 мс

    Теперь попробуем сделать то же самое, но будем использовать многопоточную реализацию алгоритма.

    Для этого вместо рекурсивного вызова, мы будем создавать некий Таск, который будем отдавать на выполнение пулу потоков. Так же надо, что бы по результату выполнения таска, была возможность как-то сообщить о результатах. Плюс надо добавить новый таск в пул потоков (вместо рекурсии).
    Тут надо сразу остановиться, почему я выбрал именно небольшие таски и пул тредов, вместо создания новых тредов. Представте, что у нас куча директорий, и на каждую директорию мы будем создавать новый тред? мы можем просто упать по OOM (OutOfMemory) или просто все начнет сильно тормозить из-за переключений между тредами ОС (особенно если одноядерная система). Так же мы будем тратить время на старт нового треда каждый раз при его создании.

    Для начала надо создать класс, который будет выполнять какие-то действие в будущем тред пуле.
    Основные требование к классу:
    — класс должен наследоваться от Thread (в принципе можно только интерфейс Runnable, но так проще)
    — класс должен принимать на выполнения объекты Runnable
    — класс не должен падать при возникновении каких-то исключений в результате выполнения таска.
    — если нету никаких задач, Thread не должен работать в пустую, а должен уходить в ожидание
    — процедура добавления новых объектов Runnable должна быть очень быстрая, иначе при большом количестве мелких задач, кто-нибудь будет либо блокировать работу треда, либо ждать возможности добавить новую задачу в тред
    — ну и он должен быть ThreadSafe

    Вот код:
    import java.util.ArrayList;
    	import java.util.List;
    
    	class BacklogWorker extends Thread {
    	/* здесь хранятся все таски на выполнение*/
    	    private final LinkedList<Runnable> backlog = new LinkedList<Runnable>();
    	    private static final int INITIAL_CAPACITY = 100;
    	/* здесь хранятся таски, которые будут выполнятся */
    	    private final List<Runnable> workQueue = new ArrayList<Runnable>(INITIAL_CAPACITY);
    
    	    BacklogWorker(String name) {
    	        super(name);
    	    }
    
    	/* добавить  новый таск*/
    	    synchronized void enque(Runnable work) {
    	        if (work != null) {
    	            backlog.add(work);
    	        }
    	        notify();
    	    }
    
    	    public void run() {
    	        while (!isInterrupted()) {
    	/* добавляем все в очередь на выполнения, и отпускаем лок*/
    	             synchronized (this) {
    	                if (backlog.isEmpty()) {
    	                    try {
    	                        wait();
    	                    } catch (InterruptedException e) {
    	                        interrupt();
    	                    }
    	                }
    	                int size = backlog.size();
    	                for (int i = 0; i < INITIAL_CAPACITY && i < size; i++) {
    	                    workQueue.add(backlog.poll());
    	                }
    	                backlog.clear();
    	            }
    	            for (Runnable task : workQueue) {
    	                try {
    	                    task.run();
    	                } catch (RuntimeException e) {
    	                    e.printStackTrace();
    	                }
    	            }
    	            workQueue.clear();
    	        }
    	    }
    	}


    Теперь надо создать ThreadPool, который будет распределять работу между его тредами.
    Требование к классу такие:
    — ThreadSafe
    — масштабируемый
    — равномерное распределение между рабочими тредами тасков
    — не блокируемый

    код получился такой:
    	import java.util.concurrent.Executor;
    
    	public class BacklogThreadPool implements Executor/*i don't use anything from concurrent, just only one interface*/ {
    
    	    private final BacklogWorker workers[];
    	    private final int mask;
    	    private static volatile int sequence;
    
    	    public BacklogThreadPool(int threadCount, String id) {
    	        int tc;
    	        for (tc = 1; tc < threadCount; tc <<= 1) ;
    	        mask = tc - 1;
    
    	        if (id == null) {
    	            id = Integer.toString(getSequence());
    	        }
    	        workers = new BacklogWorker[tc];
    
    	        for (int i = 0; i < tc; i++) {
    	            workers[i] = new BacklogWorker((new StringBuilder()).append("thead-pool-worker-").append(id).append(":").append(i).toString());
    	            workers[i].start();
    	        }
    	    }
    
    	    private synchronized int getSequence() {
    	        return sequence++;
    	    }
    
    	    public void shutdown() {
    	        for (int i = 0; i < workers.length; i++)
    	            workers[i].interrupt();
    
    	    }
    
    	    @Override
    	    public void execute(Runnable command) {
    	        workers[command.hashCode() & mask].enque(command);
    	    }
    	}
    	

    В принципе тут все понятно, и наверное комментировать не чего.

    Теперь надо написать таск, который будет выполняться в ThreadPool'е.
    К сожалению первая версия у меня потерялась, поэтому привожу быстро написанную заново версию.
    import java.io.File;
    	import java.util.ArrayList;
    	import java.util.List;
    	import java.util.regex.Matcher;
    	import java.util.regex.Pattern;
    
    	public class WalkinTask1 implements BacklogTask {
    
    	    private List<File> dirs;
    	    private ParseHandler parseHandler;
    
    	    public WalkinTask1(List<File> dirs, ParseHandler parseHandler) {
    	        this.parseHandler = parseHandler;
    	        //this.parseHandler.taskStart();
    	        this.parseHandler.taskStartUnblock();
    	        this.dirs = dirs;
    	    }
    
    	    @Override
    	    public void run() {
    	        try {
    	            List<String> filePaths = new ArrayList<String>();
    	            List<File> dirPaths = new ArrayList<File>();
    	            for (File dir : dirs) {
    	                if (!dirPaths.isEmpty()) {
    	                    dirPaths = new ArrayList<File>();
    	                }
    	                if (!filePaths.isEmpty()) {
    	                    filePaths = new ArrayList<String>();
    	                }
    	                File listFile[] = dir.listFiles();
    
    	                if (listFile != null) {
    	                    for (File file : listFile) {
    	                        if (file.isDirectory()) {
    	                            dirPaths.add(file);
    	                        } else {
    	                            filePaths.add(file.getPath());
    	                        }
    	                    }
    	                }
    	                if (!dirPaths.isEmpty()) {
    	                    parseHandler.schedule(TaskFactory.createWalking1Task(parseHandler, dirPaths));
    	                }
    	                if (!filePaths.isEmpty()) {
    	                    Pattern pattern = parseHandler.getPattern();
    	                    List<String> result = new ArrayList<String>();
    	                    for (String path : filePaths) {
    	                        Matcher matcher = pattern.matcher(path);
    	                        while (matcher.find()) {
    	                            String str = matcher.group();
    	                            if (!"".equals(str)) {
    	                                result.add(str);
    	                            }
    	                        }
    	                    }
    	                    parseHandler.taskComplete(result);
    	                }
    	            }
    	        } finally {
    	            //parseHandler.taskFinish();
    	            parseHandler.taskFinishUnblock();
    	        }
    	    }
    
    	    @Override
    	    public int getTaskType() {
    	        return 1;  //TODO
    	    }
    	}


    Теперь поговорим немного о профайлере. Для чего он нужен я не буду описывать, вы можете сами поискать, если ещё ничего не слышали о таком зверьке. При профайлинге многопоточных приложений наибольшое внимание надо уделять Monitor Usage (в каждом профайлере есть такая возможность). Обычно этот тип профайлинга надо запускать вручную. Интерес представляет сколько времени те или иные треды висят в ожидании локов. К примеру вы можете насоздовать кучу тредов, но они все будут упираться в какой-нибудь лок, и производительность системы будет сильно падать. Так же стоит обратить внимание на использование CPU, к примеру если CPU использует на 10-20%, то это тоже может означать, что треды больше ожидают локов, чем выполняют какие-нибудь вычисления (хотя это не всегда так).

    Теперь посмотрим результат в профайлере:
    время выполнения программы:
    total task: 78687
    55188ms

    В результате скорость работы увеличилась где-то в 3 раза.


    Тут мы видим, что все треды в тредпуле были заняты работой почти на всем протяжении времени. Блокировки тредов почти не наблюдается.

    На второй картинке мы видим, что основное время CPU тратится на IO.

    Тут мы видим, что использование CPU > 80%.

    Тут мы видим только одну блокировку треда, которая заняла меньше 1мс, при 78 тыс тасков весьма хороший результат.

    Как мы видим, в принципе мы нагружаем CPU, и у нас нету простоя, так как все треды почти полностью загружены работой. Блокировок по локам нету.

    Интересно будет посмотреть на картинку номер 2. Как мы видим, самая «дорогая» операция — java.io.File.isDirectory(), она занимает примерно 46% общего времени. Погуглив по поводу этой проблемы, я так ничего и не нашел, кроме возможности использовать Java7, ну или dependency OS фичи. Поэтому возможность оптимизации этой части как я вижу больше нету. Дальше идет уже парсер — java.util.regex.Matcher.find(), а вот тут уже можно ускорить. Можно создать еще один таск, который будет заниматься только парсингом. Т.е. мы разделим две самые тяжелые операции:
    1) работа с файловой системой
    2) парсинг имен
    Третья операция опять IO, и это тоже сложно ускорить.

    Итак модифицируем немного первый таск, и добавим новый:
    	import java.io.File;
    	import java.util.ArrayList;
    	import java.util.List;
    
    	public class WalkinTask implements BacklogTask {
    
    	    private List<File> dirs;
    	    private ParseHandler parseHandler;
    
    	    public WalkinTask(List<File> dirs, ParseHandler parseHandler) {
    	        this.parseHandler = parseHandler;
    	        //this.parseHandler.taskStart();
    	        this.parseHandler.taskStartUnblock();
    	        this.dirs = dirs;
    	    }
    
    	    @Override
    	    public void run() {
    	        try {
    	            List<String> filePaths = new ArrayList<String>();
    	            List<File> dirPaths = new ArrayList<File>();
    	            for (File dir : dirs) {
    	                if (!dirPaths.isEmpty()) {
    	                    dirPaths = new ArrayList<File>();
    	                }
    	                if (!filePaths.isEmpty()) {
    	                    filePaths = new ArrayList<String>();
    	                }
    	                File listFile[] = dir.listFiles();
    
    	                if (listFile != null) {
    	                    for (File file : listFile) {
    	                        if (file.isDirectory()) {
    	                            dirPaths.add(file);
    	                        } else {
    	                            filePaths.add(file.getPath());
    	                        }
    	                    }
    	                }
    	                if (!dirPaths.isEmpty()) {
    	                    parseHandler.schedule(TaskFactory.createWalkingTask(parseHandler, dirPaths));
    	                }
    	                if (!filePaths.isEmpty()) {
    	                    parseHandler.schedule(TaskFactory.createParseTask(parseHandler, filePaths));
    	                }
    	            }
    	        } finally {
    	            //parseHandler.taskFinish();
    	            parseHandler.taskFinishUnblock();
    	        }
    
    	    }
    
    	    @Override
    	    public int getTaskType() {
    	        return 1;  //TODO
    	    }
    	}

    import java.util.ArrayList;
    	import java.util.List;
    	import java.util.regex.Matcher;
    	import java.util.regex.Pattern;
    
    	public class ParseTask implements BacklogTask {
    
    	    private ParseHandler handler;
    	    private List<String> paths;
    
    	    public ParseTask(ParseHandler hander, List<String> paths) {
    	        this.handler = hander;
    	        this.paths = paths;
    	        handler.taskStartUnblock();
    	    }
    
    	    @Override
    	    public void run() {
    	        try {
    	            Pattern pattern = handler.getPattern();
    	            List<String> result = new ArrayList<String>();
    	            for (String path : paths) {
    	                Matcher matcher = pattern.matcher(path);
    	                while (matcher.find()) {
    	                    String str = matcher.group();
    	                    if (!"".equals(str)) {
    	                        result.add(str);
    	                    }
    	                }
    	            }
    	            handler.taskComplete(result);
    	        } finally {
    	            handler.taskFinishUnblock();
    	        }
    	    }
    
    	    @Override
    	    public int getTaskType() {
    	        return 0;  
    	    }
    	}


    И запустим еще раз:
    total task: 221560
    52328


    Как мы видим, результат не сильно отличился от первого запуска, но все-таки немного быстрее, особенно выигрыш будет расти, если будут директории с большим количеством файлов. Но при таком подходе мы увеличили количество тасков почти в 3 раза, что к примеру может сказаться на Garbage Collector'е. Так что тут надо уже выбирать, что мы хотим — максимальную производительность или экономию памяти и ресурсов.

    Теперь надо подумать о том, как выходить из программы, и как возвращать результат. Мы наплодили кучу тасков, и мы не знаем когда они все выполнятся. Я не придумал ни чего лучшего, как просто считать общее количество тасков, и ждать пока счетчик станет равен нулю. Для этого нам понадобится переменная, в которой мы будем накапливать значени. Но тут тоже все не так просто. К примеру если мы возьмем обычную переменную, и будем инкрементировать её когда таск создался, и декрементировать её когда таск закончился. Но при таком подходе результат будет плачевный, т.к. в java операция i++ не является атомарной, даже если мы поставит заветный модификатор volatile. Идеально было бы взять AtomicIteger, но нам по условию нельзя использовать пакет util.concurrent. Поэтому нам придется сделать свой Atomic. Если покопаться как работает Atomic в java, то мы наткнемся на native метод. Сама по себе атомарность изменения переменной реализовано в виде команды процессора, поэтому java вызывает нативную команду ОС, которая уже вызывает команду процессора.
    В принципе мы может использовать обычный synchronized. Но при большом количестве тасков начнется Lock race, и производительность уменьшится (хотя конечно и не критично). Вот пример кода реализующий CAS алгоритм(код был найдем на сайте ibm):
    	public class SimulatedCAS {
    	    private volatile int value;
    
    	    public synchronized int getValue() {
    	        return value;
    	    }
    
    	    public synchronized int compareAndSwap(int expectedValue, int newValue) {
    	        int oldValue = value;
    	        if (value == expectedValue)
    	            value = newValue;
    	        return oldValue;
    	    }
    	}

    	public class CasCounter {
    	    private SimulatedCAS value = new SimulatedCAS();
    
    	    public int getValue() {
    	        return value.getValue();
    	    }
    
    	    public int increment() {
    	        int oldValue = value.getValue();
    	        while (value.compareAndSwap(oldValue, oldValue + 1) != oldValue)
    	            oldValue = value.getValue();
    	        return oldValue + 1;
    	    }
    	    
    	    public int decrement() {
    	        int oldValue = value.getValue();
    	        while (value.compareAndSwap(oldValue, oldValue - 1) != oldValue)
    	            oldValue = value.getValue();
    	        return oldValue - 1;
    	    }
    	}
    	


    Вот вобощем-то и все.
    Архив с исходниками:link

    P.S. Я тестировал это на линуксе, и на виндоусе на 4-х ядерном процессоре. Оптимальное количество тредов в пуле было вычислено эксперементально — 16, т.е. количество ядер * 4, когда-то находил в интернете уже такую формулу, но не помню где. В Windows есть особенность, когда запускаешь первый раз, то работает все очень долго, и часто все виснет на IO, но уже при втором запуске все работает значительно быстрее, думаю это особенность OS кэшировать файловую систему. Я тестировал все со вторым запуском и дальше, потом смотрел в профайлере загруженность CPU, если был где-то провал использования CPU, то считал этот тест неточным и не использовал этот тест в статистике. Тестировал все на папке с проектами (много больших проектов вместе с CVS файлами).

    P.S.S. Это мой первый большой топик на хабре, так что прошу не сильно критиковать по оформлению, по возможности буду исправлять.
    Поделиться публикацией
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 17

      +5
      По коду:
      1. BacklogThreadPool.sequence инкрементится непотокобезопасно, лучше убрать volatile, а getSequence сделать static synchronized. Хотя здесь это, конечно, совершенно некритично.
      2. Я на 100% уверен, что CasCounter будет работать медленнее, поскольку CAS всё равно эмулируется через synchronized, а операция сложения явно проще чем сравнение и условное присваивание.

      Ну и, естественно, вертящийся на языке вопрос: зачем в реальной жизни нужно условие не использовать java.util.concurrent кроме как для поддержки чего-нибудь на Java 1.4
        0
        В лабораторных работах, в тестовых заданиях бывает просят не пользоваться частью библиотек.
          0
          1. поправил
          2. я хотел показать как работает Atomic, CasCounter действительно работает медленее, но не сильно.

          Почему такое условие было поставленно не знаю, думаю это задание либо на собеседование, либо по учебе, и хотят посмотреть как человек понимает многопоточность.
            0
            Да, согласен, условие жестокое :)
            +4
            Насколько я вижу у Вас задача «работа с файловой системой» и задача «парсинг имен» обрабатываются однотипно. Очевидно (если отбросить файловый кеш операционной системы), что чтение списка файлов будет являться бутылочным горлышком. Поэтому лучше будет читать список файлов последовательно в отдельном потоке, а парсинг имен производить в пуле потоков.
              0
              Да, в принципе этот подход возможен, но процент времени, которое тратит процессор на IO и на парсер не соизмерими IO >> parse, Тем более это сильно зависит от OS, к примеру win7 в принципе держит кэш файловой системы, поэтому производительность однопоточного решения будет менее выгодно.
                0
                Ну это зависит от того в каких условиях чаще всего будет применяться алгоритм. Попробуйте запустить Ваше решение на компакт-диске с большим количеством файлов/папок ;)
                Вообще думаю лучше всего было-бы использовать средства фильтрации ОС если условие совпадения endswith.
                  0
                  Все правильно, тут надо решение подводить под конкретную ситуацию, к примеру если это запустить на SSD, то мне кажется, что однопоточное решение на IO будет медленней, чем многопоточное, но если запускать на CD/DVD, то однопоточное решение будет быстрее.
            • НЛО прилетело и опубликовало эту надпись здесь
                0
                была описана как работает в принципе Atomic, как работает JIT уже не так важно. К примеру на одном собеседовании у меня однажды спросили именно как работает Atomic, ответ — JIT в момент компиляции разворачивает вызовы Unsafe в конкретные наборы команд на целевой архитектуре врят ли бы устроил :)
                • НЛО прилетело и опубликовало эту надпись здесь
                0
                Не совсем понял, как вы изначально добавляете таски — берете первую директорию и примерно делите её список поддиректорий на N частей, где N = кол-во ядер процессора * 4?
                По поводу проблемы аггрегации результатов с выполненных тасков — тут вроде бы подходит способ, пропагандируемый fork-join framework.
                  0
                  Удивился что не используется Semaphore в worker'ах, потом вспомнил про запрет на util.concurrent.
                  Хотя он достаточно легко реализуется через wait-notify, и отлично ложится на задачу,
                  реализация ThreadPool'а получилась бы намного короче и понятнее.
                    0
                    public class WalkinTask1 implements BacklogTask
                    а где взять интерфейс BacklogTask?
                      0
                      нашел… линк в конце статьи.
                      0
                      почему-то все потоки весят в wait после того как все поддиректории обработаны.
                        0
                        выяснил, что проблема в последней строке этого куска кода (класс BacklogWorker):
                        for (int i = 0; i < INITIAL_CAPACITY && i < size; i++) {
                        workQueue.add(backlog.poll());
                        }
                        backlog.clear();

                        да и странно как-то… выбрали из бэклога INITIAL_CAPACITY задач (в случае, если их больше там), а остальные удалили. После того как закомментировал эту строку, программа завершается корректно.

                        правда и результаты другие. если взять время выполнения createSingleWalkinTask за 100%, то createWalkingTask выполняется за 87%, а createWalking1Task — 83% времени.

                      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                      Самое читаемое