Mash: multithreading, coroutines, async & wait

  • Tutorial
image

Предисловие


Напомню, что данный язык был разработан мной в учебных целях в рамках хобби. Я не считаю его (на данный момент) идеально проработанным языком, но кто знает, какое будущее его может ожидать.

Если у вас есть желание попробовать его в действии самому — скачивайте репозиторий проекта, в нем вы сможете найти собранную версию проекта или же собрать её самостоятельно, для своей ОС.

Введение


Многопоточность и асинхронность в наше время являются одними из важнейших составляющих современных языков программирования.

Поэтому я решил добавить в свой язык программирования поддержку современных конструкций и технологий, отчасти путем добавления в язык простых и удобных конструкций.

Быстрые потоки


Позволяют с легкостью распараллелить выполнение кода.
Для этого в Mash добавлена конструкция «launch:… end»

Пример кода:

uses <bf>
uses <crt>

proc main():
  for(i ?= 1; i <= 10; i++):
    launch:
      sleep(random() * 100)
      println(i)
    end
  end
  inputln()
end

Пример вывода:

9
1
2
7
5
3
10
4
6
8

Когда выполнение программы доходит до launch..end, код внутри этого блока запускается в отдельном потоке, а выполнение запускающего кода переносится за этот блок.

Почти такую же языковую конструкцию вы могли встретить ранее в языке программирования Kotlin.

Async & wait


Реализации одних корутин мне не достаточно, именно по-этому в Mash также добавлены конструкции async & wait.

Async позволяет перевести выполнение кода в отдельный поток и продолжить выполнение основного кода.

Wait позволяет дождаться момента, когда все нужные async блоки будут выполнены.

Пример кода:

uses <bf>
uses <crt>

proc main():
  println("Hello!")

  async a:
    println("Test")
    sleep(1000)
    println("Test")
    sleep(1000)
    println("Test")
    sleep(1000)
  end

  async b:
    println("Test 2")
    sleep(300)
    println("Test 2")
    sleep(300)
    println("Test 2")
    sleep(300)
  end

  wait a, b

  println("End!")
  inputln()
end

Вывод:

Hello!
Test
Test 2
Test 2
Test 2
Test
Test
End!

Классическая многопоточность


Основная кодовая база, обеспечивающая поддержку многопоточности сосредоточена в модуле ?<threads>.

Основные составляющие, которые будут рассмотрены далее:

1) Класс TThread (приведено лишь объявление класса, полный код находится дальше в модуле):

class TThread:
  protected:
    var ThreadContext

  public:
    var Resumed, Terminated, FreeOnTerminate
    proc Create, Free

    proc Execute //for overriding
    proc Suspend, Resume, Terminate, WaitFor, ReJoin //Control proc's
end

2) Класс TCriticalSection (его описание):

class TCriticalSection:
  protected:
    var Critical_Section_Controller

  public:
    proc Create, Free

    //Methods
    proc Enter, Leave
    func TryEnter
end


3) Методы, для быстрого создания и запуска потоков:
func Async(method, ...)
func Thread(method, ...)
func Parallel(method, ...)


4) Thread-safe atomic (класс-переменная для межпотокового взаимодействия):
class TAtomic:
  private:
    var Locker, Value

  public:
    proc Create, Free
    proc Set
    func Get
end


5) Корутины (Coroutines):
class TCoroutine(TThread):
  public:
    var NextCoroutine
    proc Create

    proc Yield, YieldFor
end


Итак, разберем все по порядку.

Класс TThread позволяет нам на его основе создать новый класс-наследник, добавив в его поля необходимые переменные, которые будут переданы в новый поток.

Сразу пример кода:

uses <bf>
uses <crt>
uses <threads>

class MyThreadClass(TThread):
  var Param
  proc Create, Execute
end

proc MyThreadClass::Create(Param):
  $Param ?= Param
  TThread::Create$(true)
end

proc MyThreadClass::Execute():
  for(i ?= 0; i < 10; i++):
    PrintLn(i, ": ", $Param)
  end
end

proc main():
  new MyThreadClass("Thread #2!")
  InputLn()
end

Если нам лень описывать новый класс, для создания потока, то мы можем вспомнить про поддержку динамического переопределения методов у экземпляров классов и воспользоваться им.

Пример кода:

uses <bf>
uses <crt>
uses <threads>

proc class::MyThreadedProc():
  for(i ?= 0; i < 10; i++):
    PrintLn(i, ": Threaded hello!")
  end
end

proc main():
  Thr ?= new TThread(false)
  Thr->Execute ?= class::MyThreadedProc
  Thr->Resume()
  InputLn()
end

Если нам нужно просто запустить метод с параметрами в новом потоке, то методы async(), thread() и parallel() — это как раз то, что нужно.

Пример запуска метода в новом потоке:

uses <bf>
uses <crt>
uses <threads>

proc ThreadedProc(Arg):
  for(i ?= 0; i < 10; i++):
    PrintLn(i, ": ", Arg)
  end
end

proc main():
  Async(ThreadedProc, "Thread #1!")
  InputLn()
end

Как вы могли ранее заметить, эти 3 метода являются функциями и возвращают они — схожие с TThread классы.

Их отличие состоит в том, что async() создает поток, который по завершению освободит сам из под себя память, а экземпляр класса TThread будет автоматически удален,
thread() — тоже самое, что и async(), только поток создается изначально замороженным.
И наконец parallel() — создает запущенный поток, который по завершению не выполнит самоуничтожение, т.е. мы можем использовать любые методы TThread класса, например WaitFor() и не бояться возникновения рантайм ошибок. Единственный нюанс — нужно будет вызвать Free() вручную.

Синхронизация потоков


Для этого мной в Mash был добавлен класс TCriticalSection.

Пример кода:

uses <bf>
uses <crt>
uses <threads>

var CSect = new TCriticalSection()

proc ThreadedProc(Arg):
  while true:
    CSect -> Enter()
      PrintLn(Arg)
    CSect -> Leave()
    Sleep(10)
    gc()
  end
end

proc CriticalThreadedProc():
  while true:
    Sleep(3000)
    CSect -> Enter()

    Sleep(1000)
    PrintLn("And now...")
    Sleep(1000)
    PrintLn("Time to...")
    Sleep(1000)
    PrintLn("Critical section!")
    Sleep(3000)

    CSect -> Leave()
    gc()
  end
end

proc main():
  Async(ThreadedProc, "I'm thread #1!!!")
  Async(CriticalThreadedProc)
  InputLn()
end


Atomic


Реализация потоко-безопасного контейнера для хранения каких-либо значений.

Пример кода:
uses <bf>
uses <crt>
uses <threads>

proc main():
  MyThreadValue ?= new TAtomic(0)

  launch:
    while true:
      MyThreadValue -> Set(1)
      Sleep(8)
      gc()
    end
  end

  launch:
    while true:
      MyThreadValue -> Set(2)
      Sleep(3)
      gc()
    end
  end

  launch:
    while true:
      MyThreadValue -> Set(3)
      Sleep(11)
      gc()
    end
  end

  while true:
    PrintLn(MyThreadValue -> Get())
    Sleep(100)
    gc()
  end
end


Coroutines


Данный функционал позволяет синхронизовано распараллелить выполнение кода.

Пример кода:
uses <bf>
uses <crt>
uses <threads>

proc class::Proc1():
  while true:
    println("Hello world #1")
    sleep(100)
    gc()
    $yield()
  end
end

proc class::Proc2():
  while true:
    println("Hello world #2")
    sleep(100)
    gc()
    $yield()
  end
end

proc class::Proc3():
  while true:
    println("Hello world #3")
    sleep(100)
    gc()
    $yield()
  end
end

proc main():
  cor3 ?= new TCoroutine(false, null)
  cor3 -> Execute ?= class::Proc3

  cor2 ?= new TCoroutine(false, cor3)
  cor2 -> Execute ?= class::Proc2

  cor1 ?= new TCoroutine(false, cor2)
  cor1 -> Execute ?= class::Proc1

  cor3 -> NextCoroutine ?= cor1

  cor1 -> Resume()

  InputLn()
end


Вывод:
Hello world #1
Hello world #2
Hello world #3
Hello world #1
Hello world #2
Hello world #3
...


Заключение


Надеюсь, что эта статья была вам интересна.

Жду комментариев :)

P.S.: По вашим замечаниям, конструкцию until..end я убрал из языка. Теперь её место занимает конструкция:

whilst <условие>:
...
end

Представляет она из себя обычный цикл while, с отличием в том, что условие проверяется после выполнения итерации.

Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.

Как вы оцените реализацию многопоточных плюшек в Mash?

Поддержать автора
Поделиться публикацией

Комментарии 23

    +2
    конструкцию until..end я убрал из языка

    [...]
    Представляет она из себя обычный цикл while, с отличием в том, что условие проверяется после выполнения итерации.
    Ещё в классическом Паскале (а м.б. ранее) были четко определены циклы с предусловием и постусловием.
    while i<5 do P(i)
    

    пока i<5 выполнять процедуру P. Сначала проверяется условие i<5, если оно не выпоняется, то P не будет вызвана ни разу.
    repeat P(i) until i<5 

    а здесь сначала вызывается P, а только потом проверяется условие i<5, т.о. P всегда будет вызвана (минимум 1 раз).
      0

      Спасибо за пояснение.

      0
      Для этого мной в Mash был добавлен класс TCriticalSection.

      И все? Ни семафоров, ни условных переменных (conditional variables), ни атомиков?
      Как-то маловато.

      Также возникли вопросы по примеру использования:
      proc CriticalThreadedProc():
        while true:
          Sleep(3000)
          CSect -> Enter() // рекурсия поддерживается? Т.е. если я уже лочил CSect в данном треде - сработает или зависнет?
      
          // а если тут у нас злобный джуниор вписал return - кто будет секцию разлочивать?
      
          CSect -> Leave()
          gc() // а если в вызывающей функции есть временные переменные - все, им кирдык?
        end
      end
        +1
        Я правильно понимаю, что у вас сопрограммы и потоки — это синонимы? Это все потоки уровня ОС?
          0

          Да...

            0
            тогда это просто обычные «железные» потоки, которые могут делать некий специальный yield, при котором переходит передача управления в другой заранее заданный «железный» поток. Так? Если так, то ценность такой конструкции крайне сомнительна.
              –1
              Цель корутин — сохранение состояния выполнения метода. Другого решения этого вопроса я пока что не вижу.
            0
            Корутины выполняются в нескольких потоках, которые поочередно замораживают & размораживают друг-друга при вызове yield()
            +1

            Последнее время немного играл с параллельным программированием в Java. И есть несколько скользких моментов в вашей реализации.


            1. что произойдет при попытке двух потоков задать разное значение атомику? first come, first served?


            2. в случае с критической секцией, что если необходимо дать возможность нескольким потокам одновременный доступ к критической секции?


            3. как насчет коммуникации между корутинами?



            и по последнему изменению do..while ->whilst && while..do -> while. в английском, "whilst" — абсолютно то же самое что и "while" (для native speaker'ов; только что уточнил). плюс, разница в двух символах — потенциальный источник примитивнейших ошибок набора текста, при этом разница в поведении — заметная.

              0
              1) По очереди будет задано новое значение.
              2) Можно передать потокам указатель на класс крит. секции, либо объявить её, как глобальную переменную.
              3) TAtomic, ThreadList, глобальные переменные, а также прочие решения на любой вкус и цвет.

              Мне кажется, что путаницы не будет :)
                0

                По поводу атомика как раз непонятно, в какой очередности будет задано какое значение. Пример:


                // bankAccount = 100;


                • Жена, поток 1: bankAccount.set(50)
                • Зарплата, поток 2:bankAccount.set(1500)

                В зависимости от очередности, последствия могут быть плачевными ;)

                  0
                  Что мешает делать так:
                  • Жена, поток 1: bankAccount->set(bankAccount->get()-50)
                  • Зарплата, поток 2: bankAccount->set(bankAccount->get()+1500)


                  ??
                    0

                    дык это не меняет сути вопроса ведь:


                    • поток 1: bankAccount->set(bankAccount->get() — 50) === bankAccount->set(100 — 50)
                    • поток 2: bankAccount->set(bankAccount->get() + 1500) === bankAccount->set(100 + 1500)

                    я и спрашиваю: как в этом случае будет разрешен конфликт? какой механизм синхронизации используется внутри атомиков?


                    для простоты, попробуйте вот такую программу (не знаю, скомпилится ли — ваш проект непонятно ни как собрать вообще, ни как собрать под OSX):


                    uses <bf>
                    uses <crt>
                    uses <threads>
                    
                    proc addProc(x):
                        for (i ?= 0; i < 1000; i++):
                            x->set(x->get() + 1)
                        end
                    end
                    
                    proc subtractProc(x):
                        for (i ?= 0; i < 1000; i++):
                            x->set(x->get() - 1)
                        end
                    end
                    
                    proc main():
                        bankAccount ?= new TAtomic(0)
                    
                        PrintLn("Initial bank account state:", bankAccount->get())
                    
                        Async(addProc, bankAccount) // 1000
                        Async(subtractProc, bankAccount) // 0
                        Async(addProc, bankAccount) // 1000
                        Async(subtractProc, bankAccount) // 0
                        Async(addProc, bankAccount) // 1000
                        Async(addProc, bankAccount) // 2000
                        Async(subtractProc, bankAccount) // 1000
                        Async(subtractProc, bankAccount) // 0
                    
                        PrintLn("Final bank account state:", bankAccount->get()) // а будет ли 0?
                    end
                      0

                      В java для этого есть операции getAndAdd, addAndGet и подобные. Как в этом языке не знаю но думаю можно добавить на уровне стандартной библиотеки.

                        0

                        Идею понял, в ближайшее время добавлю этот функционал.

                          0

                          Я-то это знаю, мне было интересно как автор предлагает это решить, так как добавление парралелизма в ЯП или его стандартную библиотеку, в моем понимании, подразумевает разрешение подобных вопросов.

                          0

                          Проект можно собрать используя fpc, либо lazarus. Для того, чтобы все заработало, как нужно — собираете svm, библиотеки, mashc и скидываете файлы в похожую, как на гитхабе (bin_w32) иерархию

                  0

                  А что происходит в первом примере с переменной i, новый поток захватывает её текущее значение? В этом случае какая часть контекста копируется в момент создания потока?

                    0
                    В момент создания такого потока копируется состояние переменных внутри тела метода, которые объявлены выше конструкции launch/async. Копии переменных сразу помечаются для сборщика мусора, который имеется у контекста нового потока.
                      0
                      А для массивов и объектов в этом случае копируется содержимое или только указатель?
                        0
                        Для классов — копируется указатель на vtable класса, для массивов — создается новый массив указателей и в него копируются указатели на объекты.
                          0

                          То есть простые параллельные операции над массивами(например сумма элементов), могут быть не эффективным из за издержек на копирование?

                            0

                            Вполне эффективными, если обрабатывать массив частями.

                  Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                  Самое читаемое