Pull to refresh

Comments 11

Рефлексия отталкивает, конечно… неужели нельзя было сделать без неё?..

Рефлексия здесь - это трудный компромисс) Она используется только на этапе сборки графа на старте, чтобы автоматически определить зависимости между компонентами и убрать ручное связывание.

Всё не настолько плохо, чтобы тащить новую зависимость. Мне не зашел "uber/fx". И за несколько дней разобрался в теме - задача красиво остановить сильно сложнее, чем запустить. Пучок каналов, несколько контекстов, waitgroups для внешних сервисов - и готово. Решение на пару сотен строк, разбросанных по модулям.
Я за отсутствие магии в разработке, если что.

Я как раз и отталкивался от того, что «красиво остановить сильно сложнее, чем запустить», и что в реальных проектах это обычно вырастает в очень похожий код, размазанный по модулям.

Такой код часто не самый простой в отладке и сопровождении, поэтому хотелось собрать lifecycle в одном выделенном месте. Кроме того, при ручном управлении обычно приходится явно прокидывать зависимости между реализациями, что со временем увеличивает связанность.

Но если в конкретном проекте это удобно держать вручную - это действительно лучше, чем тянуть ещё одну библиотеку.

Ух ты, мы с вами буквально одну и ту же проблему решили, только я нигде не публиковал кроме корп репозитория. Чуть проще:

package graceful

import (
	"context"
	"time"

	"go.uber.org/multierr"
)

type (
	GroupConfiguration struct {
		Timeout time.Duration `name:"TIMEOUT" default:"30s"`
	}

	TimeoutOption time.Duration

	Group struct {
		configuration *GroupConfiguration
		ctx           context.Context
		cancel        context.CancelFunc
		errors        chan error
		closers       []closer
		wait          uint64
	}

	Resource[T_resource any] struct {
		group    *Group
		resource *T_resource
	}

	Task[T_resource any] func(context context.Context, resource *T_resource) error

	closer struct {
		resource any
		task     any
		wrapper  func(ctx context.Context, resource, task any, errors chan error)
	}
)

func New(ctx context.Context, configuration *GroupConfiguration) *Group {
	ctx, cancel := context.WithCancel(ctx)

	return &Group{
		configuration: configuration,
		ctx:           ctx,
		cancel:        cancel,
		errors:        make(chan error, 1),
	}
}

func WithOptions(ctx context.Context, options ...any) *Group {
	configuration := &GroupConfiguration{
		Timeout: 30 * time.Second,
	}

	for i := range options {
		switch typed := options[i].(type) {
		case TimeoutOption:
			configuration.Timeout = time.Duration(typed)
		}
	}

	ctx, cancel := context.WithCancel(ctx)

	return &Group{
		configuration: configuration,
		ctx:           ctx,
		cancel:        cancel,
		errors:        make(chan error, 1),
	}
}

func (group *Group) Context() context.Context {
	return group.ctx
}

func (group *Group) Wait() error {
	<-group.ctx.Done()

	ctx, cancel := context.Background(), context.CancelFunc(nil)

	if group.configuration.Timeout > 0 {
		ctx, cancel = context.WithTimeout(ctx, group.configuration.Timeout)
	}

	for _, closer := range group.closers {
		go closer.wrapper(ctx, closer.resource, closer.task, group.errors)
	}

	errors := []error(nil)

cycle:
	for group.wait > 0 {
		select {
		case <-ctx.Done():
			break cycle
		case err := <-group.errors:
			if err != nil {
				errors = append(errors, err)
			}
		}

		group.wait -= 1
	}

	if cancel != nil {
		cancel()
	}

	return multierr.Combine(errors...)
}

func AddResource[T_resource any](group *Group, resource *T_resource) Resource[T_resource] {
	return Resource[T_resource]{
		group:    group,
		resource: resource,
	}
}

func (resource Resource[T_resource]) OnStart(task Task[T_resource]) Resource[T_resource] {
	resource.group.wait += 1

	go func(resource Resource[T_resource], task Task[T_resource]) {
		err := task(resource.group.ctx, resource.resource)

		resource.group.cancel()
		resource.group.errors <- err
	}(resource, task)

	return resource
}

func (resource Resource[T_resource]) OnClose(task Task[T_resource]) Resource[T_resource] {
	resource.group.wait += 1

	wrapper := func(ctx context.Context, resource, task any, errors chan error) {
		errors <- task.(Task[T_resource])(ctx, resource.(*T_resource))
	}

	resource.group.closers = append(resource.group.closers, closer{
		resource: resource.resource,
		task:     task,
		wrapper:  wrapper,
	})

	return resource
}
package notifier

import (
	"context"
	"errors"
	"fmt"
	"os"
	"os/signal"
)

type (
	Notifier struct {
		signals  []os.Signal
		receiver chan os.Signal
	}
)

var (
	ErrSignalReceived = errors.New("signal received")
)

func New(signals []os.Signal) *Notifier {
	return &Notifier{
		signals:  signals,
		receiver: make(chan os.Signal, 1),
	}
}

func OnStart(ctx context.Context, notifier *Notifier) (err error) {
	signal.Notify(notifier.receiver, notifier.signals...)

	select {
	case <-ctx.Done():
		return nil
	case signal := <-notifier.receiver:
		return fmt.Errorf("%w: %s", ErrSignalReceived, signal.String())
	}
}

func OnClose(ctx context.Context, notifier *Notifier) (err error) {
	signal.Stop(notifier.receiver)
	return nil
}

Очень сильно упростило жизнь менее опытных коллег и меня когда за этими менее опытными коллегами надо переделать graceful shutdown.

Не очень вчитывался, но у меня вроде посложнее:
- на внешний сигнал стопа закрываем прием новых запросов (у меня http-сервер), доделываем имеющиеся запросы, сервер по завершении обработки завершается и кидает ок в канал
- по сигналу закрываются соединения с ресурсами, чтобы не потерять данные,
- и потом выход из main (или жесткий шатдаун по истечении второго времени).
И потому есть контекст на управление сервером, есть контекст запросов внутри сервера, и есть контекст на приложуху с ресурсами - данные в одну сторону. И каналы для ответов. Потому долго ковырялся.

Выглядит как хорошая альтернатива десяткам каналов, разбросанных по всему коду

Но я решал немного другую задачу - goscade использует DAG (граф зависимостей) для строгого порядка выполнения. Зависимые компоненты ждут друг друга, а независимые части графа стартуют и останавливаются параллельно. Это нужно, чтобы при старте приложение гарантированно ждало подключения к базе, а при остановке не закрывало базу раньше, чем остановится веб-сервер. При этом два независимых Kafka-консьюмера стартуют одновременно

Все равно получается громоздко, если много зависимостей, все надо регистрировать. Я пришел к подходу самому писать DI контейнер. Структура serviceProvider в которую закидываешь зависимости явно, и методы по инициализации конкретной зависимости. Один метод может вызывать другой. К примеру HttpServer вызывает Repository, Repository вызывает Config. И в main.go достаточно вызывать верхнеуровневую зависимость, которая каскадом инициализирует все нужное, и инициализация будет начинаться с листьев графа. Так же шатдаун будет проходить в обратном порядке. Достаточно просто и удобно. Не нужно регистрировать каждую зависимость

Я использую ровно такой же подход в своих проектах - ручной контейнер, в котором компоненты обернуты в LazyProvider.

Но тут есть нюанс: передача зависимостей в конструктор - это уже достаточное условие для построения графа. (Если зависимость неявная, её можно задать в библиотеке руками).

Библиотека по сути просто автоматизирует работу с этим графом:

  1. Граф перестраивается сам при изменении кода. Добавил аргумент в конструктор - порядок запуска обновился автоматически.

  2. Библиотека сама определяет, какие независимые ветки запускать параллельно, а где нужно ждать.

  3. "Из коробки" есть таймауты, аварийная остановка при ошибках и тд.

Если реализовывать весь этот функционал (параллельный старт, таймауты, графовое выключение) внутри своих методов инициализации вручную - ты по факту пишешь тот же goscade, только без рефлексии, но завязанный на конкретный проект.

А где минусы?

Рефлексия

Sign up to leave a comment.

Articles