Пишу игрушечную ОС (о реализации мьютекса)


    Продолжаю блог о разработке игрушечной ОС (предыдущие посты: раз, два, три). Сделав паузу в кодировании (майские праздники, всё-таки), продолжаю работу. Только что набросал сканирование PCI-шины. Эта штука понадобится для работы с SATA-контроллером: следующее, что хочу сделать — это простенький драйвер диска. Он позволит поэкспериментировать с проецированием постоянной памяти на адресное пространство (своппинг, доведённый до логического конца). А пока хотел бы описать реализацию мьютекса.

    Для реализации мьютекса (определён и реализован в src/sync.h и src/sync.c) нет необходимости в модификации существующего планировщика, описанного в двух предыдущих постах. Мьютекс может быть построен на базе всего двух его функций: запуска и приостановки потока (см. src/schedule.h).

    struct mutex {
      struct __mutex_node *head, *tail;
      struct spinlock mlock, ilock;
    };
    
    static inline void create_mutex(struct mutex *mutex) {
      mutex->head = mutex->tail = NULL;
      create_spinlock(&mutex->mlock);
      create_spinlock(&mutex->ilock);
    }
    

    Моя реализация мьютекса предполагает два спинлока и очередь спящих потоков. Первый спинлок (mlock) отвечает за доступ к защищаемому ресурсу мьютекса, т.е. он захвачен тогда и только тогда, когда захвачен сам мьютекс. Второй спинлок (ilock) защищает очередь ожидающих потоков от одновременной модификации.

    Итак, как это работает? Когда поток пытается получить мьютекс, он пробует захватить mlock, делая N попыток. Если это у него получается, то мьютекс захвачен. В противном случае он должен безопасно (т.е. через ilock) добавить себя в очередь ожидающих потоков и уснуть.

    static inline err_code acquire_mutex(struct mutex *mutex) {
      extern err_code __sleep_in_mutex(struct mutex *mutex);
      if (!acquire_spinlock_int(&mutex->mlock, 1000))
        return __sleep_in_mutex(mutex);
      return ERR_NONE;
    }
    
    struct __mutex_node {
      struct __mutex_node *next;
      thread_id id;
    };
    
    INTERNAL err_code __sleep_in_mutex(struct mutex *mutex) {
      struct __mutex_node *node = NULL;
      bool acquired;
    
      acquire_spinlock(&mutex->ilock, 0);
      acquired = acquire_spinlock_int(&mutex->mlock, 1);
      if (!acquired) {
        node = alloc_block(&mutex_node_pool);
        if (node) {
          node->next = NULL;
          node->id = get_thread();
          if (mutex->head)
            mutex->head->next = node;
          mutex->head = node;
          if (!mutex->tail)
            mutex->tail = node;
          pause_this_thread(&mutex->ilock);
        }
      }
      if (!node)
        release_spinlock(&mutex->ilock);
    
      return (acquired || node) ? ERR_NONE : ERR_OUT_OF_MEMORY;
    }
    

    Вышеприведённый код нуждается в некоторых пояснениях:

    1. Функция acquire_spinlock_int аналогична acquire_spinlock за исключением того, что она не отключает прерывания до момента освобождения спинлока. Захватывая mlock, мы не хотим отключать прерывания — владение мьютексом может быть долгим. Другое дело, когда мы, захватывая ilock, хотим добавить поток в очередь — эта операция должна быть быстрой.

    2. Следующая строка функции __sleep_in_mutex на первый взгляд бессмысленна:

     acquired = acquire_spinlock_int(&mutex->mlock, 1);
    

    В самом деле, зачем повторно пытаться захватить спинлок, когда мы уже потерпели неудачу? Затем, что между первой попыткой и захватом ilock владелец мьютекса может его вернуть, и лишь позже наш поток получит квант планирования. Без повторной попытки мы добавим себя в очередь и усыпим навеки. Поэтому важно проверить ещё раз уже после захвата ilock (хозяин мьютекса при возврате непременно будет его захватывать).

    3. Функции alloc_block и free_block относятся к пулу заранее выделенных блоков памяти фиксированного размера (см. src/memory.h). Соль этого пула в том, чтобы не вызывать медленный malloc всякий раз, когда нам нужен блок (в данном случае struct __mutex_node). Кстати, этот пул у меня пока без реализации (только заглушка, напрямую вызывающая malloc), как и сам malloc. Если у кого возникнет непреодолимое желание реализовать первый или портировать второй — пишите.

    4. Зачем делать N попыток захватить mlock, если можно уснуть после первой попытки? Можно, просто это не очень эффективно. Время переключения контекста существенно выше одной попытки получить спинлок. Поэтому рационально сделать N попыток (в коде 1000, взято с потолка; в будущем нужно провести практические замеры, вывести и обосновать более разумный N), прежде чем усыпать.

    5. В коде используется модифицированная версия pause_thread: pause_this_thread. Кроме усыпления текущего потока она атомарно (в прерывании) освобождает переданный ей спинлок.

    При освобождении мьютекса хозяин захватывает ilock, после чего проверяет наличие в очереди ждущих потоков. Если поток найден, то он просыпается, становясь новым хозяином мьютекса. Если ожидающих потоков нет, то хозяин возвращает mlock и выходит.

    static inline void release_mutex(struct mutex *mutex) {
      extern void __awake_in_mutex(struct mutex *mutex);
      acquire_spinlock(&mutex->ilock, 0);
      if (mutex->tail)
        __awake_in_mutex(mutex);
      else
        release_spinlock_int(&mutex->mlock);
      release_spinlock(&mutex->ilock);
    }
    
    INTERNAL void __awake_in_mutex(struct mutex *mutex) {
      struct __mutex_node *node;
      err_code err;
    
      do {
        node = mutex->tail;
        mutex->tail = node->next;
        if (mutex->head == node)
          mutex->head = NULL;
        err = resume_thread(node->id);
        free_block(&mutex_node_pool, node);
      }
      while (mutex->tail && err);
    
      if (!mutex->tail)
        release_spinlock_int(&mutex->mlock);
    }
    

    Хотел, было, ещё рассказать о реализации функции sleep, но этот пост и так содержит достаточно пищи для размышления, так что отложу до следующего раза.

    P.S. Если найдёте ошибки в коде — обязательно пишите.
    Поделиться публикацией

    Комментарии 11

      0
      Это ядерный мютекс или юзерспейсный? Если юзерспейсный: что если поток завершат принудительно извне в release_mutex() после захвата mutex->ilock? Я как программист ожидаю что все операции над мютексами атомарны.

        +2
        Вообще, это ядерный код. Пользовательского C-кода вообще не предполагается, для этого будет вирт. машина. Что касается принудительного завершения потока после захвата mutex->ilock, то это невозможно, поскольку захват ilock выключает прерывания, соответственно, поток в этот период монопольно владеет процессором. Другое дело, что поток может быть принудительно завершён или запущен в момент, когда он спит в мьютексе. Ведь описанный мьютекс — это всего лишь надстройка над планировщиком, который ничего о нём не знает. Здесь, наверное, потребуется какая-то защита.
          0
          > выключает прерывания, соответственно, поток в этот период монопольно владеет процессором

          Наверное, только одним ядром? Или тут нет SMP?

          Если нет SMP, то вот тут нужно как раз делать только одну попытку, последующие бессмысленны:
          > Зачем делать N попыток захватить mlock, если можно уснуть после первой попытки?
            +1
            Я имел в виду логический процессор.
              +3
              Без SMP спинлок вообще был бы не нужен.
        0
        перенёс как ответ на коммент выше
          0
          Вот такой оффтопичный вопрос: а где автор находит такие потешные игрушки, точнее, их фотографии? :)
            +1
            Google
            0
            Не очень понял идею N попыток захвата:
            Время переключения контекста существенно выше одной попытки получить спинлок.
            Да, но разве освобождение захваченного спинлока не требует переключения на процесс его захвативший?
              +1
              Смотрите, вообще непрерывные попытки захвата ресурса нужны только тогда, когда у нас есть несколько логических процессоров. В противном случае это пустая трата времени: если ресурс захвачен, то легче просто уступить место другому потоку, ведь всё равно весь оставшийся квант времени ресурс будет занят (его хозяин не планируется).

              В случае нескольких процессоров ситуация меняется. Поток, запрашивающий ресурс, знает, что с ним одновременно может исполняться и хозяин ресурса (на другом процессоре). Поэтому его стратегия меняется. Зачем ему сразу уходить в сон? Он может проснуться нескоро (пока переключатся контексты, да и в очереди планирования он будет не первым). Поэтому разумно некоторое время поупорствовать, настойчиво проверяя доступность ресурса. Но не целый отведённый квант (квант может быть большим и вообще, чем дольше ресурс занят, тем выше вероятность, что и останется занятым). Поэтому потом лучше уйти в сон, ожидая что тебя разбудит сам хозяин освободившегося ресурса.
                0
                Да, несколько процессоров я не учел, спасибо.

            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

            Самое читаемое