Обновить

Комментарии 18

ЗакрепленныеЗакреплённые комментарии

Жесть, без слёз на этот ужасный вырви глаз код даже не взглянешь. Не позавидую тому кто попадет на проект, где кто-то рискнет эту дичь завести

Ничего страшного, никто не заставляет. Всегда можно остаться на Promise<any> и молиться, что ничего не рассыпется

молиться, что ничего не рассыпется

Что конкретно вы имеете ввиду?) Че-то за 9 лет с момента как промисы появились ни один так и не рассыпался

я имею в виду, что на уровне компилятора непонятно, ошибется ли промис. вам надо обязательно заходить в код промиса, чтобы понять, какие ошибки он может выкинуть, и не дай бог это асинхронная функция из какой-то библиотеки

но, в целом, комментарий неконструктивный, не вижу причин давать полноценный ответ на него. человек явно в статью особо не вчитывался

если вы никогда не испытывали проблемы при работе с промисами, то вы большой молодец. может вам стоит рассмотреть переехать на чистый JS, зачем вам эти типы? вам, видимо, компилятора в голове хватает

Код курильщика:

const program = Effect.gen(function* () {
  const userService = yield* UserService

  // запускаем дочерний файбер в фоне
  yield* Effect.fork(
    Effect.gen(function* () {
      yield* Effect.sleep("10 seconds")
      yield* userService.getUser("1")
      yield* Effect.log("это не выполнится")
    })
  )

  // родительский файбер завершается
  yield* Effect.log("родитель завершился")
})

Тоже самое, только код здорового человека:

async function program(userService) {
  // запускаем дочернюю задачу в фоне (не awaiting)
  (async () => {
    await sleep(10000)
    await userService.getUser("1")
    console.log("это не выполнится")
  })()

  // родитель завершается не дожидаясь дочерней задачи
  console.log("родитель завершился")
}

Пример, который я привел, примитивный, потому что это туториал статья. Добавьте отмену фонового процесса, много зависимостей, и передачу этих зависимостей глубже чем в одну функцию, потом сравните)
Но в целом, вы почитайте статью, помимо запуска дочерних тредов, там еще много что есть)

Ну накиньте прям рабочий пример который прям по настоящему делает отмену и сравним с кодом на промисах

class MessengerAdapter extends Context.Tag("MessageAdapter")<                                                                
    MessengerAdapter,                                                                                                          
    {                                                                                                                        
      readonly incomingMessagesDebounced: Stream.Stream<Message>                                                                     
      readonly send: (response: Response) => Effect.Effect<void, MessengerAdapterError>                                        
    }                                                                                                                        
  >() {}                                                                                                                     
                                                                                                                             
  class MessageProcessor extends Context.Tag("MessageProcessor")<
    MessageProcessor,
    {
      readonly process: (msg: Message) => Effect.Effect<void, MessageProcessorError>
      readonly responsesStream: Stream.Stream<Response, MessageProcessorError>
    }                                                                                                                        
  >() {}
                                                                                                                             
  const bot = Effect.gen(function* () {                                                                                      
    const messengerAdapter = yield* MessengerAdapter                                                                     
    const messageProcessor = yield* MessageProcessor                                                                         
    const currentJob = yield* Ref.make(Fiber.void)                                                                     

    //например пришло сообщение боту из телеграма
    yield* messengerAdapter.incomingMessagesDebounced.pipe(                                                                            
      Stream.mapEffect((msg) =>                                                                                              
        Effect.gen(function* () {                                                                                            
          const prev = yield* Ref.get(currentJob)
          yield* Fiber.interrupt(prev)
          const fiber = yield* Effect.fork(messageProcessor.process(msg))                                                    
          yield* Ref.set(currentJob, fiber)                                                                                  
        }).pipe(                                                                                                             
          Effect.catchTag("MessageProcessorError", (err) =>                                                                  
            Effect.gen(function* () {                                                                                        
              yield* Effect.logError(`Failed to process message from ${msg.userId}`, err)
              yield* messageAdapter.send({                                                                                   
                userId: msg.userId,                                                                                          
                text: "Извините, бот временно недоступен. Сейчас подключим оператора."                                       
              })                                                                                                             
            })    
          )                                                                                                                  
        )         
      ),
      Stream.runDrain,
      Effect.fork                                                                                                            
    )

    //отправляем сообщение обратно
    yield* messageProcessor.responsesStream.pipe(                                                                            
      Stream.mapEffect((msg) => messengerAdapter.send(msg).pipe(
        Effect.catchAll((e) => Effect.logError("Failed to send message", e))
      )),
      Stream.runDrain                                                                                                        
    )             
  })

ну вот, например, код для обработки сообщений чатбота. Для полноты добавлю примерную реализацию MessageProcessor, реализацию MessengerAdapter опустим

import { LanguageModel, Prompt } from "@effect/ai"                                                                         
import { Effect, Layer, Queue, Stream } from "effect"

const MessageProcessorLive = Layer.effect(                                                                                 
    MessageProcessor,                                                                                                        
    Effect.gen(function* () {                                                                                                
      const languageModel = yield* LanguageModel.LanguageModel                                                               
      const queue = yield* Queue.unbounded<Message>()                                
                                                                                                                             
      return {                                                                                                               
        process: (msg) =>                                                                                                    
          Effect.gen(function* () {                                                                                          
            const response = yield* languageModel.generateText({                                                             
              prompt: Prompt.make([                             
                Prompt.makeMessage("system", { content: "You are a helpful assistant." }),                                   
                Prompt.makeMessage("user", { content: msg.text })                                                            
              ])                                                                                                             
            }).pipe(                                                                                                               
              Effect.retry({ times: 3, schedule: Schedule.exponential("1 second") }),                                              
              Effect.mapError((e) =>                                                                                               
                new MessageProcessorError({ message: `LLM failed: ${e.message}`, cause: e })
              )                                                                                                                    
            )    
            
            //если ответ от ЛЛМ пришел, то мы обязательно его доставим, прервать не получится
            yield* Effect.forEach(response.text.split("\n\n"), (chunk) =>
              Effect.gen(function* () {                                                                                            
                yield* Queue.offer(queue, { userId: msg.userId, text: chunk })                                                     
                yield* Effect.sleep("5 seconds")                                                                                   
              })                                                                                                                   
            ).pipe(Effect.uninterruptible)                                          
          }),                                                                                                                
        responsesStream: Stream.fromQueue(queue)                                                                             
      }                                         
    })                                                                                                                       
  )

Вот тут, когда приходит новое сообщение, тред, который отвечал за обработку старого сообщения обрывается. И в реализации я использую реальный сервис для работы с ЛЛМ, который автоматически прервется. Не нужно abortController внутрь никуда прокидывать. Все легко тестируется. Реализаций MessengerAdapter может быть сколько угодно. Я пытался на промисах это написать нормально, но не вышло. Может у вас выйдет. Вот функциональные требования:

  1. Входящие сообщения — бот получает сообщения {userId, text} (дебаунс обрабатывать не надо, предполагаем, что он уже обработан)

  2. Обработка — для каждого сообщения: - Отправляем запрос в LLM API (HTTP POST) - Ответ разбиваем по \n\n на чанки - Каждый чанк отправляем клиенту с задержкой 5 секунд между ними

  3. Отмена предыдущего — если пришло новое сообщение, а предыдущее ещё обрабатывается: - Если запрос к LLM ещё в полёте — отменить HTTP-запрос - Если LLM уже ответил и идёт отправка чанков — дождаться отправки всех чанков, только потом начать обработку нового

  4. Ретраи — если LLM вернул ошибку, ретраим 3 раза с экспоненциальным backoff (1с, 2с, 4с). Ретраи тоже отменяются при новом сообщении

  5. Ошибки — если все ретраи провалились, отправить клиенту "Извините, бот временно недоступен. Сейчас подключим оператора." Бот продолжает принимать новые сообщения

Есть огромная проблема, код это просто ад, настолько плохо читаемый и не приятный, что все мнимые микро плюсы, даже близко не перекрываются из-за того, что приходится читать и писать такой код.

skill issue

skill issue

Смешно) Выдавать говнокод за skill это конечно вершина, это как говорить что копать землю вилкой хорошо, а лопатой плохо, ибо у вас просто skill issue и вы не умете достойно копать землю вилкой, мозгов для этого маловато.

ну твои вкусы и привычки не определяют реальность.
перепиши верхний код на промисы, посмотрим где вилка, а где лопата)
а до тех пор skill issue

перепиши верхний код на промисы

Вот ИИ переписал)

class MessageProcessorError extends Error {
    constructor({ message, cause }) {
        super(message)
        this.cause = cause
    }
}

const sleep = (ms) => new Promise((res) => setTimeout(res, ms))

async function retryExponential(fn, times = 3, baseDelay = 1000) {
    for (let i = 0; i <= times; i++) {
        try {
            return await fn()
        } catch (e) {
            if (i === times) throw e
            await sleep(baseDelay * Math.pow(2, i))
        }
    }
}

function createMessageProcessor(languageModel) {
    // Queue — простой массив + подписчики
    const listeners = new Set()
    const queue = {
        offer: (msg) => {
            listeners.forEach((fn) => fn(msg))
        },
        subscribe: (fn) => {
            listeners.add(fn)
            return () => listeners.delete(fn) // unsubscribe
        },
    }

    async function process(msg) {
        // Генерация с ретраями
        let response
        try {
            response = await retryExponential(() => languageModel.generateText({
                prompt: [
                    { role: 'system', content: 'You are a helpful assistant.' },
                    { role: 'user', content: msg.text },
                ],
            }),
            3,
            1000,
            )
        } catch (e) {
            throw new MessageProcessorError({
                message: `LLM failed: ${e.message}`,
                cause: e,
            })
        }

        // Uninterruptible — доставляем все чанки без возможности отмены
        const chunks = response.text.split('\n\n')
        for (const chunk of chunks) {
            queue.offer({ userId: msg.userId, text: chunk })
            await sleep(5000)
        }
    }

    // Stream из очереди — AsyncGenerator
    async function* responsesStream() {
        const buffer = []
        let resolve = null

        const unsubscribe = queue.subscribe((msg) => {
            buffer.push(msg)
            if (resolve) {
                resolve()
                resolve = null
            }
        })

        try {
            while (true) {
                if (buffer.length > 0) {
                    yield buffer.shift()
                } else {
                    // ждём следующего сообщения
                    await new Promise((r) => resolve = r)
                }
            }
        } finally {
            unsubscribe()
        }
    }

    return { process, responsesStream }
}

// Использование
const processor = createMessageProcessor(languageModel)

await processor.process({ userId: '1', text: 'Hello' })

for await (const msg of processor.responsesStream()) {
    console.log(msg)
}

вот это действительно смешно))) мало того, что код ужасно сложный, так еще и у тебя там не обрывается обработка запросов

skill issue

Это было смешно😁

Тоже самое, только код здорового человека:

Главное не смотреть как код с промисами писался до того, как синтаксический сахар в виде асинхронных функций добавили в язык

Зарегистрируйтесь на Хабре, чтобы оставить комментарий

Публикации