Простой DICOM клиент на GO с балансировщиком задач и веб-интерфейсом


    Привет Хабр! В последнее время я очень сильно увлекся разработкой на языке GO. Изящный и выразительный язык программирования. Мне давно хотелось сделать что-нибудь полезное. По специфике своей работы мне приходится работать с медицинскими архивами DICOM-изображений PACS.


    Github: github.com/Loafter/dtools
    Версия Linux-amd64: github.com/Loafter/dtools/releases/download/1.0/dcmjsser

    Я решил, что пришло время создать свой dicom-клиент с (блэкджеком..) веб-интерфейсом, который может выполнять следующие стандартные операции:
    • Dicom ping;
    • Cкачивание исследований;
    • Загрузка исследования,
    • А также поиск по реквизитам

    (c-echo, c-move,c-store,c-find соотвествено).
    В качестве dicom-библиотеки была выбрана библиотека GrassRoot SDK. Наш клиент будет распараллеливать задачи. Язык go для этого хорошо адаптирован

    Похожий сценарий работы был описан habrahabr.ru/post/198150.
    Наш сценарий несколько отличается:
    У нас есть некий балансировщик задач, который получает задания dicom-сервиса, проверяет возможность выполнения и асинхронно их выполняет. Для того чтобы не было ситуации, когда параллельно выполняется 1000 задач, мы реализуем очередь задач таким образом, чтобы были активные задачи и те, которые находятся в спящем состоянии. По умолчанию только 10 задач будут активными. В противном случае мы могли бы обойтись без баллансировщика вообще, тупо параллельно выполнить 1000 задач параллельно без какого либо контроля.
    Весь код балансировщика находится в файле job_ballancer.go.

    В начале идет описание интерфейсов обработчиков. В случае если работа была выполнена успешно, в случае если вернулась ошибка и сам процесс обработки задачи.

    type JobDispatcher interface {
    	Dispatch(interface{}) (interface{}, error)
    }
    type ErrDispatcher interface {
    	DispatchError(FaJob) error
    }
    type CompDispatcher interface {
    	DispatchSuccess(CompJob) error
    }
    


    Когда мы создаем экземпляр диспетчера, мы его инициализируем соответствующими обработчиками.
    srv.jbBal.Init(&srv.dDisp, srv, srv)
    
    
    //Сама структура балансировщика
    
    
    type JobBallancer struct {
    	jChan    chan interface{}	//канал в который мы передаем задания
    	acJob    map[string]Job		//список активных работ
    	slJob    map[string]Job		//список неактивных работ
    	errDisp  ErrDispatcher		//обработчик работ завершившихся error
    	jobDisp  JobDispatcher		//обработчик заданий
    	compDisp CompDispatcher		//обработчик успешно выполненных работ
    	JbDone   sync.WaitGroup		//ожидаем завершения всех работ
    	aJobC    int			//количество параллельных (активных)
    }
    
    //инициализация балансировщика
    func (jbal *JobBallancer) Init(jdis JobDispatcher, cmd CompDispatcher, erd ErrDispatcher) {
    	jbal.errDisp = erd
    	jbal.jobDisp = jdis
    	jbal.compDisp = cmd
    	jbal.acJob = make(map[string]Job)
    	jbal.slJob = make(map[string]Job)
    	jbal.aJobC = 10
    	jbal.jChan = make(chan interface{})
    	go jbal.takeJob()		//запускаем поток в котором осуществляем 							//балансировку работ
    	log.Println("info: job ballancer inited")
    }
    добавление новой работы в очередь. Операция асинхронная, т.е. работа отсылается в канал, а функция takeJob подбирает ее от туда.
    
    func (jbal *JobBallancer) PushJob(jdat interface{}) error {
    	if jbal.checkInit() {
    		return errors.New("error: JobChan is not inited")
    	}
    	uid := genUid()
    	job := Job{JobId: uid, Data: jdat}
    	jbal.jChan <- job
    	return nil
    
    }
    
    
    
    func (jbal *JobBallancer) takeJob() {
    	for {
    		//извлекаем работу из канала
    		recivedTask := <-jbal.jChan
    		log.Println("info: job taken")
    		switch job := recivedTask.(type) {
    		case TermJob:
    			//если мы получаем сигнал на завершение выходим из функции
    			log.Println("info: recive terminate dispatch singal")
    			return
    		case Job:
    //обычная обработка (если все слоты активных работ заняты то работа записывается в список не активных работ)
    			if len(jbal.acJob) < jbal.aJobC {
    				jbal.JbDone.Add(1)
    				jbal.addActiveJob(job)
    				go jbal.startJob(job)
    				log.Println("info: normal dispatch")
    			} else {
    				jbal.addSleepJob(job)
    				jbal.JbDone.Add(1)
    				log.Println("info: attend maximum active job")
    			}
    		case CompJob:
    			//работа завершилась успехом
    			if err := jbal.compDisp.DispatchSuccess(job); err != nil {
    				log.Println("error: failed dispatch success" + job.Job.JobId)
    			}
    			//удачно завершившуюся работу можно удалить из списка
    			jbal.removeJob(job.Job.JobId)
    			jbal.JbDone.Done()
    			jbal.resumeJobs()
    		case FaJob:
    			//работа завершилась ошибкой
    			if err := jbal.errDisp.DispatchError(job); err != nil {
    				log.Println("error: failed dispatch error" + job.Job.JobId)
    			}
    			//завершившуюся работу можно удалить из списка
    			jbal.removeJob(job.Job.JobId)
    			jbal.JbDone.Done()
    			jbal.resumeJobs()
    		default:
    			log.Fatalln("error: unknown job type")
    			jbal.JbDone.Done()
    		}
    	}
    }
    //функция удаления работы
    func (jbal *JobBallancer) removeJob(jid string) error {
    	if _, isFind := jbal.acJob[jid]; isFind {
    		delete(jbal.acJob, jid)
    	} else {
    		return errors.New("error: can't remove job because job with id not found")
    	}
    	return nil
    }
    
    
    //функция позволяющая правильно завершить работу балансировщика, в случае если  есть работы которые не завершены, функция будет ожидать их завершения
    func (jbal *JobBallancer) TerminateTakeJob() error {
    	if jbal.checkInit() {
    		return errors.New("error: is not inited")
    	}
    	jbal.JbDone.Wait()
    	jbal.jChan <- TermJob{}
    	close(jbal.jChan)
    	if len(jbal.acJob) > 0 {
    		return errors.New("error: list job is not empty")
    	}
    	log.Println("info: greacefully terminate take job")
    	return nil
    }
    


    Остальные вспомогательные функции мы не будем рассматривать. полный код можно посмотреть
    github.com/Loafter/dtools/blob/master/dcmjsser/job_ballancer.go

    Не смотря, что код не сложный и я долго обдумывал его. Но все равно для проверки надежности я реализовал нагрузочный тест на десятки задач:
    testJobDispatcher := TestJobDispatcher{}
    	testErrorDispatcher := TestErrorDispatcher{}
    	testSuccessDispatcher := TestCompletedDispatcher{}
    	jobBallancer := JobBallancer{}
    	jobBallancer.Init(&testJobDispatcher, &testSuccessDispatcher, 		&testErrorDispatcher)
    	for i := 0; i < 40; i++ {
    		jobBallancer.PushJob("data: " + strconv.Itoa(i))
    	}
    	jobBallancer.TerminateTakeJob()
    

    Он отработал нормально. Все задачи были выполнены, а функция TerminateTakeJob завершилась тогда, когда все задачи были выполненны. Для контроля отработаных задач используется объект синхронизации sync.WaitGroup JbDone, который ведет подсчет количества выполненных работ. Как я уже отмечал выше, код балансировщика является универсальным и для того чтобы наш балансировщик работал по-другому, нам достаточно проинстанцировать его соотвествующими обработчиками.

    Как и в прошлой своей поделке) (http://habrahabr.ru/post/247727/) интерефейс приложения я реализовал в виде веб-интерфейса.


    Для теста я использовал публичный dicom-архив 213.165.94.158:11112. С него можно скачивать исследования, если есть прямой айпи и если на стороне клиента открыт порт 11112. Так же я проверил работу на свободном dicom-архиве dcm4che sourceforge.net/projects/cdmedicpacsweb/files/latest/download?source=files.
    Мне удалось собрать рабочую версию для Linux, к сожалению собрать под Widows мне не удалось. Библиотека grassroot успешно собралась, но ошибка возникает при линковке самого приложения.
    cmd/ld: Malformed PE file: Unexpected flags for PE section.

    Об этой ошибке много написано тут: github.com/golang/go/issues/4069.
    К сожалению я не настолько знаком с тонкостями сборки и поэтому получилась версия только под Linux. Может «хабра-эфект» сдвинет с мертвой точки и эту проблему. Для пользователей Windows, которые хотят проверить и посмотреть как это работает, я подготовил виртуальную машину на базе CoreOS (https://yadi.sk/d/y81KC-tyfar6A). В демо машине наш dicom-клиент работает как systemd-сервис.
    При наличии желания, можно например реализовать сервис, который выкачивает исследования с различных dicom-узлов, и выкладывает в zip-архиве для скачивания. Для управления сервисом можно использовать json-сообщения, так же, как делает наш GUI.
    А можно поступить, как поступил я: прикрутить в наше приложение какой-нибудь веб-просмотровщик на базе html5.

    Share post

    Comments 6

      0
      А что за веб-просмотровщик вы используете?
        0
        С паксом шёл в комплекте. Какой то ZFP или как то так называется.
          0
          ZPF это скорее всего ZeroFootPrint, базворд обозначающий приложение, не требующее установки. А что за пакс? Просто хочется этот просмотровщик пощупать.
          +1
          Уберите тэг «DIY».
            0
            Мне кажется, или вот это в коде:
            < source lang="go" >
            ошибка в вёрстке статьи?
              0
              да ошибка

              Only users with full accounts can post comments. Log in, please.