Комментарии 18
Полезные ссылки:
effect-smol — репозиторий Effect v4 (сейчас в бете)
t.me/fp_ts — СНГ комьюнити Effect в Telegram
Жесть, без слёз на этот ужасный вырви глаз код даже не взглянешь. Не позавидую тому кто попадет на проект, где кто-то рискнет эту дичь завести
Ничего страшного, никто не заставляет. Всегда можно остаться на Promise<any> и молиться, что ничего не рассыпется
молиться, что ничего не рассыпется
Что конкретно вы имеете ввиду?) Че-то за 9 лет с момента как промисы появились ни один так и не рассыпался
я имею в виду, что на уровне компилятора непонятно, ошибется ли промис. вам надо обязательно заходить в код промиса, чтобы понять, какие ошибки он может выкинуть, и не дай бог это асинхронная функция из какой-то библиотеки
но, в целом, комментарий неконструктивный, не вижу причин давать полноценный ответ на него. человек явно в статью особо не вчитывался
если вы никогда не испытывали проблемы при работе с промисами, то вы большой молодец. может вам стоит рассмотреть переехать на чистый JS, зачем вам эти типы? вам, видимо, компилятора в голове хватает
Код курильщика:
const program = Effect.gen(function* () {
const userService = yield* UserService
// запускаем дочерний файбер в фоне
yield* Effect.fork(
Effect.gen(function* () {
yield* Effect.sleep("10 seconds")
yield* userService.getUser("1")
yield* Effect.log("это не выполнится")
})
)
// родительский файбер завершается
yield* Effect.log("родитель завершился")
})Тоже самое, только код здорового человека:
async function program(userService) {
// запускаем дочернюю задачу в фоне (не awaiting)
(async () => {
await sleep(10000)
await userService.getUser("1")
console.log("это не выполнится")
})()
// родитель завершается не дожидаясь дочерней задачи
console.log("родитель завершился")
}Пример, который я привел, примитивный, потому что это туториал статья. Добавьте отмену фонового процесса, много зависимостей, и передачу этих зависимостей глубже чем в одну функцию, потом сравните)
Но в целом, вы почитайте статью, помимо запуска дочерних тредов, там еще много что есть)
Ну накиньте прям рабочий пример который прям по настоящему делает отмену и сравним с кодом на промисах
class MessengerAdapter extends Context.Tag("MessageAdapter")<
MessengerAdapter,
{
readonly incomingMessagesDebounced: Stream.Stream<Message>
readonly send: (response: Response) => Effect.Effect<void, MessengerAdapterError>
}
>() {}
class MessageProcessor extends Context.Tag("MessageProcessor")<
MessageProcessor,
{
readonly process: (msg: Message) => Effect.Effect<void, MessageProcessorError>
readonly responsesStream: Stream.Stream<Response, MessageProcessorError>
}
>() {}
const bot = Effect.gen(function* () {
const messengerAdapter = yield* MessengerAdapter
const messageProcessor = yield* MessageProcessor
const currentJob = yield* Ref.make(Fiber.void)
//например пришло сообщение боту из телеграма
yield* messengerAdapter.incomingMessagesDebounced.pipe(
Stream.mapEffect((msg) =>
Effect.gen(function* () {
const prev = yield* Ref.get(currentJob)
yield* Fiber.interrupt(prev)
const fiber = yield* Effect.fork(messageProcessor.process(msg))
yield* Ref.set(currentJob, fiber)
}).pipe(
Effect.catchTag("MessageProcessorError", (err) =>
Effect.gen(function* () {
yield* Effect.logError(`Failed to process message from ${msg.userId}`, err)
yield* messageAdapter.send({
userId: msg.userId,
text: "Извините, бот временно недоступен. Сейчас подключим оператора."
})
})
)
)
),
Stream.runDrain,
Effect.fork
)
//отправляем сообщение обратно
yield* messageProcessor.responsesStream.pipe(
Stream.mapEffect((msg) => messengerAdapter.send(msg).pipe(
Effect.catchAll((e) => Effect.logError("Failed to send message", e))
)),
Stream.runDrain
)
})ну вот, например, код для обработки сообщений чатбота. Для полноты добавлю примерную реализацию MessageProcessor, реализацию MessengerAdapter опустим
import { LanguageModel, Prompt } from "@effect/ai"
import { Effect, Layer, Queue, Stream } from "effect"
const MessageProcessorLive = Layer.effect(
MessageProcessor,
Effect.gen(function* () {
const languageModel = yield* LanguageModel.LanguageModel
const queue = yield* Queue.unbounded<Message>()
return {
process: (msg) =>
Effect.gen(function* () {
const response = yield* languageModel.generateText({
prompt: Prompt.make([
Prompt.makeMessage("system", { content: "You are a helpful assistant." }),
Prompt.makeMessage("user", { content: msg.text })
])
}).pipe(
Effect.retry({ times: 3, schedule: Schedule.exponential("1 second") }),
Effect.mapError((e) =>
new MessageProcessorError({ message: `LLM failed: ${e.message}`, cause: e })
)
)
//если ответ от ЛЛМ пришел, то мы обязательно его доставим, прервать не получится
yield* Effect.forEach(response.text.split("\n\n"), (chunk) =>
Effect.gen(function* () {
yield* Queue.offer(queue, { userId: msg.userId, text: chunk })
yield* Effect.sleep("5 seconds")
})
).pipe(Effect.uninterruptible)
}),
responsesStream: Stream.fromQueue(queue)
}
})
)Вот тут, когда приходит новое сообщение, тред, который отвечал за обработку старого сообщения обрывается. И в реализации я использую реальный сервис для работы с ЛЛМ, который автоматически прервется. Не нужно abortController внутрь никуда прокидывать. Все легко тестируется. Реализаций MessengerAdapter может быть сколько угодно. Я пытался на промисах это написать нормально, но не вышло. Может у вас выйдет. Вот функциональные требования:
Входящие сообщения — бот получает сообщения {userId, text} (дебаунс обрабатывать не надо, предполагаем, что он уже обработан)
Обработка — для каждого сообщения: - Отправляем запрос в LLM API (HTTP POST) - Ответ разбиваем по \n\n на чанки - Каждый чанк отправляем клиенту с задержкой 5 секунд между ними
Отмена предыдущего — если пришло новое сообщение, а предыдущее ещё обрабатывается: - Если запрос к LLM ещё в полёте — отменить HTTP-запрос - Если LLM уже ответил и идёт отправка чанков — дождаться отправки всех чанков, только потом начать обработку нового
Ретраи — если LLM вернул ошибку, ретраим 3 раза с экспоненциальным backoff (1с, 2с, 4с). Ретраи тоже отменяются при новом сообщении
Ошибки — если все ретраи провалились, отправить клиенту "Извините, бот временно недоступен. Сейчас подключим оператора." Бот продолжает принимать новые сообщения
Есть огромная проблема, код это просто ад, настолько плохо читаемый и не приятный, что все мнимые микро плюсы, даже близко не перекрываются из-за того, что приходится читать и писать такой код.
skill issue
skill issue
Смешно) Выдавать говнокод за skill это конечно вершина, это как говорить что копать землю вилкой хорошо, а лопатой плохо, ибо у вас просто skill issue и вы не умете достойно копать землю вилкой, мозгов для этого маловато.
ну твои вкусы и привычки не определяют реальность.
перепиши верхний код на промисы, посмотрим где вилка, а где лопата)
а до тех пор skill issue
перепиши верхний код на промисы
Вот ИИ переписал)
class MessageProcessorError extends Error {
constructor({ message, cause }) {
super(message)
this.cause = cause
}
}
const sleep = (ms) => new Promise((res) => setTimeout(res, ms))
async function retryExponential(fn, times = 3, baseDelay = 1000) {
for (let i = 0; i <= times; i++) {
try {
return await fn()
} catch (e) {
if (i === times) throw e
await sleep(baseDelay * Math.pow(2, i))
}
}
}
function createMessageProcessor(languageModel) {
// Queue — простой массив + подписчики
const listeners = new Set()
const queue = {
offer: (msg) => {
listeners.forEach((fn) => fn(msg))
},
subscribe: (fn) => {
listeners.add(fn)
return () => listeners.delete(fn) // unsubscribe
},
}
async function process(msg) {
// Генерация с ретраями
let response
try {
response = await retryExponential(() => languageModel.generateText({
prompt: [
{ role: 'system', content: 'You are a helpful assistant.' },
{ role: 'user', content: msg.text },
],
}),
3,
1000,
)
} catch (e) {
throw new MessageProcessorError({
message: `LLM failed: ${e.message}`,
cause: e,
})
}
// Uninterruptible — доставляем все чанки без возможности отмены
const chunks = response.text.split('\n\n')
for (const chunk of chunks) {
queue.offer({ userId: msg.userId, text: chunk })
await sleep(5000)
}
}
// Stream из очереди — AsyncGenerator
async function* responsesStream() {
const buffer = []
let resolve = null
const unsubscribe = queue.subscribe((msg) => {
buffer.push(msg)
if (resolve) {
resolve()
resolve = null
}
})
try {
while (true) {
if (buffer.length > 0) {
yield buffer.shift()
} else {
// ждём следующего сообщения
await new Promise((r) => resolve = r)
}
}
} finally {
unsubscribe()
}
}
return { process, responsesStream }
}
// Использование
const processor = createMessageProcessor(languageModel)
await processor.process({ userId: '1', text: 'Hello' })
for await (const msg of processor.responsesStream()) {
console.log(msg)
}
Полезные ссылки:
effect-smol — репозиторий Effect v4 (сейчас в бете)
t.me/fp_ts — СНГ комьюнити Effect в Telegram

10 причин попробовать Effect TS/Основы Effect TS