Как стать автором
Обновить
52.78

Как мы создавали динамический Kubernetes API server для API Aggregation Layer в Cozystack

Уровень сложностиСложный
Время на прочтение89 мин
Количество просмотров1.3K

Привет! Я Андрей Квапил, вы можете знать меня под ником kvaps в сообществах, посвященных Kubernetes и cloud native-инструментам. В этой статье я хотел рассказать, как мы реализовали собственный extension API server в open source PaaS-платформе Cozystack.

Kubernetes действительно поражает своими могучими возможностями к расширению. Вы наверняка уже знаете про operator-паттерн, а также фреймворки kubebuilder и operator-sdk с помощью которых можно его реализовать. Если вкратце, то они позволяют расширять ваш Kubernetes через определение кастом-ресурсов (CRDs) и написание дополнительного контроллера, который будет выполнять вашу бизнес-логику для реконсиляции и управления этими ресурсами. Этот подход широко изучен, а в интернете можно найти огромное количество информации о том, как написать такой оператор.

Однако это не единственный метод расширения Kubernetes API. Так, для более сложных кейсов, например реализации императивной логики, сабресурсов и формирования ответов на лету, можно рассмотреть механизм API aggregation layer, который поддерживается в Kubernetes. В рамках aggregation layer можно разработать свой собственный extension API server и бесшовно интегрировать его в общий Kubernetes API.

В этой статье мы разберем, что такое API aggregation layer, для решения каких задач его стоит использовать, когда его использовать не стоит и как мы использовали эту модель для реализации собственного extension API server в платформе Cozystack

Что такое API Aggregation Layer

Для начала немного разберемся с определениями, чтобы дальше не возникало путаницы. Итак, API aggregation layer — это название фичи в Kubernetes, а extension API server — это конкретная имплементация API server для Aggregation Layer. Extension API server — это точно такой же Kubernetes API server, только он запускается отдельно и обслуживает запросы конкретно для ваших типов ресурсов.

Таким образом aggregation layer позволяет вам написать свой собственный extension API server, легко интегрировать его в Kubernetes и напрямую обрабатывать запросы к ресурсам из определенной группы. В отличие от механизма CRD, extension API регистрируется в Kubernetes как APIService и говорит K8s, что необходимо принимать во внимание этот новый API server и учитывать, что он обслуживает определенные API-ресурсы. Ниже — простой пример:

С помощью приведенной ниже команды можно вывести список всех зарегистрированных API-сервисов:

# kubectl get apiservices.apiregistration.k8s.io

А вот и пример APIService:

NAME                          	SERVICE                   	AVAILABLE   AGE
v1alpha1.apps.cozystack.io    	cozy-system/cozystack-api 	True    	7h29m

Как только Kubernetes api-server получает запросы к ресурсам в группе v1alpha1.apps.cozystack.io, он перенаправляет все запросы в наш api-server, который может обрабатывать их, исходя из заложенной в него бизнес-логики.

Для чего используют API Aggregation Layer

API Aggregation Layer позволяет решить несколько вопросов, для которых бывает недостаточно обычного механизма CRD. Рассмотрим каждый из них. 

Императивная логика и сабресурсы

Помимо обычных ресурсов в Kubernetes существуют и так называемые сабресурсы.

Сабресурсы — это дополнительные операции или действия, которые можно выполнять над основными ресурсами (например, Pod, Deployment, Service) через Kubernetes API. Они предоставляют интерфейсы для управления конкретными аспектами ресурсов, не затрагивая весь объект.

Самый простой пример — это /status, который традиционно выводится в отдельный сабресурс, который, в свою очередь, хранится и обрабатывается отдельно от основного ресурса. Поле status не предназначено для изменения обычными пользователями, поэтому, если вы захотите внести в него какие-то изменения, для обновления status вашему контроллеру придётся использовать отдельную процедуру с указанием имени сабресурса.

Однако помимо /status у подов в Kubernetes есть ещё и такие сабресурсы, как /exec, /portforward и /log. Интересно то, что вместо привычного для Kubernetes декларативного ресурса они представляют собой эндпоинт для императивных операций, например: посмотреть логи, спроксировать подключение, выполнить команду в запущенном контейнере и т.п. 

Именно для выполнения таких императивных команд и реализуют отдельный extension API и extension API server Вот некоторые известные мне имплементации:

  • KubeVirt. Аддон для Kubernetes, расширяющий возможности его API для запуска традиционных виртуальных машин. Созданный в рамках KubeVirt, api-server обслуживает логику сабресурсов /restart /console /vnc для виртуальных машин.

  • Knative. Аддон для Kubernetes, расширяющий его возможности для serverless computing, реализует сабресурс /scale, который позволяет задавать настройки автомасштабирования для своих типов.

Кстати, несмотря на то что логика сабресурсов в Kubernetes может быть императивной, управлять доступом к ним можно декларативно, используя стандартную модель RBAC. Например таким образом можно управлять доступом к /log и /exec у ресурса Pod:

kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  namespace: default
  name: pod-and-pod-logs-reader
rules:
- apiGroups: [""]
  resources: ["pods", "pods/log"]
  verbs: ["get", "list"]
- apiGroups: [""]
  resources: ["pods/exec"]
  verbs: ["create"]

Вы больше не завязаны на etcd

Как правило Kubernetes API server использует etcd в качестве бэкенда, однако реализация вашего собственного API-сервера не привязывает вас к использованию только лишь etcd. Если вы не видите смысла хранить состояние вашего сервера в etcd, можно хранить информацию в любой другой системе и генерировать ответы на лету. Вот несколько кейсов для примера:

  • Metrics-server, стандартное расширение для Kubernetes, позволяющее смотреть метрики нод и подов в режиме реального времени. Реализует альтернативные ресурсы Pod и Node в своём собственном metrics.k8s.io API, обращение к которым транслирует метрики напрямую из cAdvisor. Таким образом при выполнении команд kubectl top node или kubectl top pod metrics-server в режиме реального времени идёт к Kubelet, получает метрики и отдаёт их вам. А раз эта информация формируется в режиме реального времени и актуальна только в момент запроса, нет смысла тратить дополнительные ресурсы на сохранение её в etcd.

  • При необходимости вместо etcd можно использовать любой другой бэкенд или даже реализовать для него Kubernetes-совместимое API. Например, если вы используете Postgres, вы могли бы сделать прозрачное представление его сущностей в Kubernetes API: то есть базы данных, пользователи, и гранты внутри Postgres будут отображаться как обычные Kubernetes-ресурсы в вашем API и с ними можно будет работать через kubectl или любой другой Kubernetes-совместимый инструмент. В отличие от операторов, которые реализуют бизнес-логику с помощью кастом-ресурсов и метода реконсиляции, extension API server избавляет от необходимости иметь отдельные контроллеры для синхронизации состояния из Kubernetes и обратно.

Одноразовые ресурсы

  • В Kubernetes есть специальный API, который используется для предоставления пользователю информации о его полномочиях. Этот API реализован с помощью ресурсов SelfSubjectAccessReview. Особенность этих ресурсов заключается в том, что их нельзя увидеть с помощью методов GET или LIST, их можно только создать с помощью метода CREATE и в момент создания получить вывод с информацией о том, к чему у вас есть доступ.

Если же выполнить kubectl get selfsubjectaccessreviews напрямую, то вы просто получите следующую ошибку:

Error from server (MethodNotAllowed): the server does not allow this method on the requested resource

Причина в том, что Kubernetes API server не предусматривает никакого другого взаимодействия с этим типом ресурсов (их можно только создать).

В свою очередь, API, реализованное с помощью ресурсов SelfSubjectAccessReview, используется для команд такого вида:

# kubectl auth can-i create deployments --namespace dev

При выполнении приведенной выше команды Kubernetes создаёт ресурс SelfSubjectAccessReview в Kubernetes API, что, в свою очередь, позволяет запросить у Kubernetes список возможных полномочий для вашего пользователя. Причем Kubernetes сформирует персонализированный ответ на ваш запрос в режиме реального времени, и эта логика отличается от ситуации, когда ресурс просто хранится в etcd.

  • Похожим образом реализованы ресурсы типа UploadTokenRequest в Containerized Data Importer от проекта KubeVirt. Эти ресурсы позволяют получить одноразовый токен для загрузки образа утилитой virtctl. Для этого Containerized Data Importer реализует свой собственный extension API server.

У проекта KubeVirt есть расширение CDI (Containerized Data Importer), которое позволяет загрузить файлы в PVC с компьютера пользователя. Однако прежде чем клиент начнёт загрузку, ему необходимо получить специальный токен: этот токен получается посредством создания UploadTokenRequest в Kubernetes API, который все запросы на создание UploadTokenRequest отправляет в API-сервер CDI, а уже тот генерирует токен и возвращает его в качестве ответа на этот запрос.

Полная свобода конверсии, валидации и форматирования вывода

  • Ваш собственный API server имеет все возможности ванильного Kubernetes API server. А ресурсы, которые создаются в вашем API server, могут быть провалидированы сразу же и без дополнительных вебхуков. Да, механизм CDR тоже поддерживает server-side-валидацию с помощью Common Expression Language (CEL) для декларативной валидации и ValidatingAdmissionPolicies без необходимости писать вебхуки, однако собственный API server позволяет прописывать более сложную и гибкую логику валидации.

  • Kubernetes позволяет обслуживать несколько версий API для каждого типа ресурсов. Традиционно это v1alpha1, v1beta1, v1. Только одна версия может бы указана в качестве storage version, все запросы к другим должны быть автоматически конвертированы в ту версию, которая указана как storage. При использовании CRD такая механика реализуется с помощью conversion webhooks, тогда как в API server можно реализовать собственный механизм преобразования, использовать разные версии хранилища (один объект может быть сериализован как v1, а другой как v2) или полагаться на внешний дополнительный API.

  • API-server позволяет форматировать табличный вывод как захочется и не накладывает обязательства следовать логике additionalPrinterColumns в CRD. Вместо этого вы можете самостоятельно написать свой собственный форматер, который будет форматировать вывод таблицы и кастомных полей в ней. Например, при использовании additionalPrinterColumns можно выводить значения полей, следуя только логике JSONPath, других вариантов не существует. А в собственном API server вы получите возможность формировать табличный вывод так, как вам захочется, генерируя и подставляя значения на лету.

Динамическая регистрация ресурсов

Ресурсам, которые обслуживает extension API server, нет необходимости быть заранее зарегистрированными в качестве CRD. Как только ваш ваш API server регистрируется в качестве APIService, Kubernetes начинает автоматически опрашивать его, чтобы узнать, какие типы ресурсов он обслуживает. Получив ответ, он автоматически регистрирует все доступные типы в своём API. Следовательно, вы можете реализовать такую логику, которая будет автоматически регистрировать нужные вам типы ресурсов в Kubernetes.

Для чего не стоит использовать API Aggregation Layer

Существует несколько антипаттернов, в рамках которых не рекомендуется использование API Aggregation Layer. Рассмотрим их последовательно.

Нестабильный бэкенд

Если ваш API server по какой-то причине перестанет отвечать: из-за недоступного бэкенда или по другим причинам, — это может заблокировать некоторые функции Kubernetes. Например, при выполнении операции по удалению неймспейсов Kubernetes будет ждать ответа от вашего API server, пытаясь понять, остались ли там какие-то ресурсы. Если ответ не придет, удаление неймспейса будет заблокировано.

Возможно, вы сталкивались с ситуацией, когда при недоступности metrics-server в stderr после каждого запроса в API (даже запроса, не связанного с метриками) появляется лишнее сообщение о том, что metrics.k8s.io недоступен. Это еще один пример того, как применение API Aggregation Layer  может привести к проблемам при недоступности api-server, отвечающего на запросы.

Долгие запросы

Если для пользовательских запросов нельзя сформировать мгновенный ответ, лучше рассмотреть использование operator pattern, так как в противном случае ваш кластер станет менее стабильным. Многие проекты реализуют API server только для ограниченного набора ресурсов, в частности для реализации императивной логики и сабресурсов. Такая рекомендация есть и в официальной документации Kubernetes.

Для чего это понадобилось нам в Cozystack

Напомню, мы разрабатываем свободную PaaS-платформу Cozystack, которая в том числе может использоваться как фреймворк для построения своего собственного приватного облака. Поэтому нам крайне важна возможность легкого расширения платформы.

Cozystack построен на базе FluxCD: любое пользовательское приложение упаковывается в собственный Helm-чарт, готовый для деплоя в пользовательском неймспейсе. Деплой любого приложения в платформе осуществляется посредством создания ресурса HelmRelease, с указанием имени чарта и параметров для него, а вся остальная логика обрабатывается FluxCD. Такой паттерн позволяет легко расширять платформу новыми приложениями и предоставлять возможность для создания собственных приложений, которые необходимо просто упаковать в соответствующий Helm-чарт.

Интерфейс платформы Cozystack
Интерфейс платформы Cozystack

Таким образом, в нашей платформе всё настраивается с помощью ресурсов HelmRelease. Однако есть две проблемы: ограничения модели RBAC и потребность в публичном API. Разберем их подробнее.

Ограничения модели RBAC

Стандартная система RBAC в Kubernetes не позволяет ограничить доступ к списку ресурсов одного типа с разделением их по лейблам или конкретным полям в спеке. При создании роли вы, конечно, можете ограничить доступ, указав конкретные имена ресурсов в resourceNames, для которых будут работать методы GET, PATCH или UPDATE. Однако фильтрация по resourceNames в методе LIST в Kubernetes не работает. Вы можете ограничить листинг определённых ресурсов по kind, но не по имени.

  • В Kubernetes есть специальный API, который используется для того, чтобы предоставлять пользователям информацию об их разрешениях. Этот механизм создан с помощью SelfSubjectAccessReview API. Однако есть один необычный момент: вы не можете просматривать эти ресурсы, используя методы GET или LIST. Вы можете только создать их при помощи метода CREATE и в этот же момент получить вывод с информацией о том, какие доступы у вас имеются.

Поэтому мы решили ввести новые типы ресурсов, основанные на названиях Helm-чартов, которые они используют, и формировать список доступных типов динамически прямо в рантайме нашего extension API server. Таким образом, мы можем переиспользовать стандартную модель RBAC в Kubernetes, чтобы управлять доступами к конкретным типам ресурсов.

Потребность в публичном API

Ввиду того, что наша платформа предоставляет возможности для деплоя различных managed-сервисов, мы хотим организовать публичный доступ к API платформы. При этом мы не можем разрешить пользователям взаимодействовать напрямую с ресурсами типа HelmRelease, так как это позволило бы им задавать произвольные имена и параметры Helm-чартов для деплоя, что потенциально могло бы скомпрометировать нашу систему.

Мы хотим предоставлять пользователю возможность задеплоить конкретный сервис, просто создав для него соответствующий ресурс в Kubernetes. А тип этого ресурса должен называться так же, как и чарт, из которого он деплоится. Приведем примеры:

  • Kind: Kuberneteschart: kubernetes

  • Kind: Postgreschart: postgres

  • Kind: Redischart: redis

  • Kind: VirtualMachinechart: virtual-machine

А еще мы не хотим каждый раз при добавлении нового чарта добавлять новый тип в codegen и перекомпилировать наш extension API server, чтобы он начал обслуживаться. Обновление должно быть динамическим или через предоставленный администратором ConfigMap.

Двухсторонняя конвертация

Сейчас у нас уже есть интеграции и дашборд, которые по-прежнему продолжают использовать ресурсы типа HelmRelease. Однако на данном этапе нам не хотелось бы терять возможность поддержки этого API. Учитывая, что мы просто транслируем одни ресурсы в другие, поддержка сохраняется, более того, она работает в две стороны. Если создать HelmRelease, у вас появится кастомный ресурс в Kubernetes, а если создать кастомный ресурс в Kubernetes, он будет доступен и как HelmRelease.

Никаких дополнительных контроллеров, которые синхронизируют состояние между ресурсами, у нас нет. Все запросы к ресурсам нашего extension API server прозрачно проксируются в HelmRelease и обратно. Это позволяет избавиться от промежуточного состояния и от необходимости написания логики для контроллеров синхронизации состояния.

Реализация

Для реализации Aggregation API можно рассмотреть два проекта:

  • apiserver-builder — на данный момент находится в альфа-версии, и уже 2 года не обновлялся. Работает по принципу kubebuilder, предоставляя вам фреймворк для создания extension API server, позволяя поочерёдно создать структуру проекта и сгенерировать код для ваших ресурсов.

  • sample-apiserver — готовый пример реализованного API server на базе официальных библиотек Kubernetes, который можно взять в качестве основы для вашего проекта.

По объективным причинам мы остановили выбор на втором проекте. Далее я расскажу, что нам потребовалось сделать.

Отключить поддержку etcd

В нашем случае она не требуется, так как все ресурсы будут храниться напрямую в Kubernetes API. Опции etcd отключаются передачей nil в RecommendedOptions.Etcd. Код на GitHub.

Сгенерировать общий тип ресурсов

Мы назвали его Application и выглядит он следующим образом: код на GitHub.

Код
/*
Copyright 2024 The Cozystack Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// ApplicationList is a list of Application objects.
type ApplicationList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

	Items []Application `json:"items" protobuf:"bytes,2,rep,name=items"`
}

// ApplicationStatus is the status of a Application.
type ApplicationStatus struct {
	// Conditions holds the conditions for the Application.
	// +optional
	Version    string             `json:"version,omitempty"`
	Conditions []metav1.Condition `json:"conditions,omitempty"`
}

// GetConditions returns the status conditions of the object.
func (in Application) GetConditions() []metav1.Condition {
	return in.Status.Conditions
}

// SetConditions sets the status conditions on the object.
func (in *Application) SetConditions(conditions []metav1.Condition) {
	in.Status.Conditions = conditions
}

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// Application is an example type with a spec and a status.
type Application struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

	AppVersion string `json:"appVersion,omitempty" protobuf:"bytes,1,opt,name=version"`
	// +optional
	Spec   *apiextensionsv1.JSON `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
	Status ApplicationStatus     `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

Это общий тип, который используется для любого типа приложения, а логика его обработки едина для всех чартов.

Настроить загрузку конфига

Так как мы хотим настраивать наш extension API server через конфиг, мы сформировали структуру конфига в Go.

Код
/*
Copyright 2024 The Cozystack Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package config

import (
	"fmt"
	"os"

	"gopkg.in/yaml.v2"
)

// ResourceConfig represents the structure of the configuration file.
type ResourceConfig struct {
	Resources []Resource `yaml:"resources"`
}

// Resource describes an individual resource.
type Resource struct {
	Application ApplicationConfig `yaml:"application"`
	Release     ReleaseConfig     `yaml:"release"`
}

// ApplicationConfig contains the application settings.
type ApplicationConfig struct {
	Kind       string   `yaml:"kind"`
	Singular   string   `yaml:"singular"`
	Plural     string   `yaml:"plural"`
	ShortNames []string `yaml:"shortNames"`
}

// ReleaseConfig contains the release settings.
type ReleaseConfig struct {
	Prefix string            `yaml:"prefix"`
	Labels map[string]string `yaml:"labels"`
	Chart  ChartConfig       `yaml:"chart"`
}

// ChartConfig contains the chart settings.
type ChartConfig struct {
	Name      string          `yaml:"name"`
	SourceRef SourceRefConfig `yaml:"sourceRef"`
}

// SourceRefConfig contains the reference to the chart source.
type SourceRefConfig struct {
	Kind      string `yaml:"kind"`
	Name      string `yaml:"name"`
	Namespace string `yaml:"namespace"`
}

// LoadConfig loads the configuration from the specified path and validates it.
func LoadConfig(path string) (*ResourceConfig, error) {
	data, err := os.ReadFile(path)
	if err != nil {
		return nil, err
	}

	var config ResourceConfig
	if err := yaml.Unmarshal(data, &config); err != nil {
		return nil, err
	}

	// Validate the configuration.
	for i, res := range config.Resources {
		if res.Application.Kind == "" {
			return nil, fmt.Errorf("resource at index %d has an empty kind", i)
		}
		if res.Application.Plural == "" {
			return nil, fmt.Errorf("resource at index %d has an empty plural", i)
		}
		if res.Release.Chart.Name == "" {
			return nil, fmt.Errorf("resource at index %d has an empty chart name in release", i)
		}
		if res.Release.Chart.SourceRef.Kind == "" || res.Release.Chart.SourceRef.Name == "" || res.Release.Chart.SourceRef.Namespace == "" {
			return nil, fmt.Errorf("resource at index %d has an incomplete sourceRef for chart in release", i)
		}
	}
	return &config, nil
}

А параллельно изменили логику регистрации ресурсов, чтобы созданные нами ресурсы регистрировались с разными Kind.

Код
/*
Copyright 2024 The Cozystack Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
	"log"

	"github.com/aenix.io/cozystack/pkg/config"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
)

// GroupName holds the API group name.
const GroupName = "apps.cozystack.io"

// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha1"}

var (
	// SchemeBuilder allows to add this group to a scheme.
	// TODO: move SchemeBuilder with zz_generated.deepcopy.go to k8s.io/api.
	// localSchemeBuilder and AddToScheme will stay in k8s.io/kubernetes.
	SchemeBuilder      runtime.SchemeBuilder
	localSchemeBuilder = &SchemeBuilder

	// AddToScheme adds this group to a scheme.
	AddToScheme = localSchemeBuilder.AddToScheme
)

func init() {
	// We only register manually written functions here. The registration of the
	// generated functions takes place in the generated files. The separation
	// makes the code compile even when the generated files are missing.
	localSchemeBuilder.Register(addKnownTypes)
}

// Adds the list of known types to the given scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
	metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
	return nil
}

// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
	return SchemeGroupVersion.WithResource(resource).GroupResource()
}

// RegisterDynamicTypes registers types dynamically based on config
func RegisterDynamicTypes(scheme *runtime.Scheme, cfg *config.ResourceConfig) error {
	for _, res := range cfg.Resources {
		kind := res.Application.Kind

		gvk := SchemeGroupVersion.WithKind(kind)
		scheme.AddKnownTypeWithName(gvk, &Application{})
		scheme.AddKnownTypeWithName(gvk.GroupVersion().WithKind(kind+"List"), &ApplicationList{})

		log.Printf("Registered kind: %s\n", kind)

	}

	return nil
}

В итоге мы получили вот такой конфиг, в который можно передать все возможные типы и указание о том, во что именно они должны мапиться.

Код
apiVersion: v1
kind: ConfigMap
metadata:
  name: cozystack-api
  namespace: cozy-system
data:
  config.yaml: |
    resources:
    - application:
        kind: Bucket
        singular: bucket
        plural: buckets
      release:
        prefix: bucket-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: bucket
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: ClickHouse
        singular: clickhouse
        plural: clickhouses
      release:
        prefix: clickhouse-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: clickhouse
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: HTTPCache
        singular: httpcache
        plural: httpcaches
      release:
        prefix: http-cache-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: http-cache
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: NATS
        singular: nats
        plural: natses
      release:
        prefix: nats-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: nats
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: TCPBalancer
        singular: tcpbalancer
        plural: tcpbalancers
      release:
        prefix: tcp-balancer-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: http-cache
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: VirtualMachine
        singular: virtualmachine
        plural: virtualmachines
      release:
        prefix: virtual-machine-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: virtual-machine
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: VPN
        singular: vpn
        plural: vpns
      release:
        prefix: vpn-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: vpn
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: MySQL
        singular: mysql
        plural: mysqls
      release:
        prefix: mysql-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: mysql
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: Tenant
        singular: tenant
        plural: tenants
      release:
        prefix: tenant-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: tenant
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: Kubernetes
        singular: kubernetes
        plural: kuberneteses
      release:
        prefix: kubernetes-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: kubernetes
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: Redis
        singular: redis
        plural: redises
      release:
        prefix: redis-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: rabbitmq
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: RabbitMQ
        singular: rabbitmq
        plural: rabbitmqs
      release:
        prefix: rabbitmq-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: rabbitmq
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: Postgres
        singular: postgres
        plural: postgreses
      release:
        prefix: postgres-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: postgres
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: FerretDB
        singular: ferretdb
        plural: ferretdb
      release:
        prefix: ferretdb-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: ferretdb
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: Kafka
        singular: kafka
        plural: kafkas
      release:
        prefix: ferretdb-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: kafka
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: VMDisk
        plural: vmdisks
        singular: vmdisk
      release:
        prefix: vm-disk-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: vm-disk
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: VMInstance
        plural: vminstances
        singular: vminstance
      release:
        prefix: vm-instance-
        labels:
          cozystack.io/ui: "true"
        chart:
          name: vm-instance
          sourceRef:
            kind: HelmRepository
            name: cozystack-apps
            namespace: cozy-public
    - application:
        kind: Monitoring
        plural: monitorings
        singular: monitoring
      release:
        prefix: ""
        labels:
          cozystack.io/ui: "true"
        chart:
          name: monitoring
          sourceRef:
            kind: HelmRepository
            name: cozystack-extra
            namespace: cozy-public
    - application:
        kind: Etcd
        plural: etcds
        singular: etcd
      release:
        prefix: ""
        labels:
          cozystack.io/ui: "true"
        chart:
          name: etcd
          sourceRef:
            kind: HelmRepository
            name: cozystack-extra
            namespace: cozy-public
    - application:
        kind: Ingress
        plural: ingresses
        singular: ingress
      release:
        prefix: ""
        labels:
          cozystack.io/ui: "true"
        chart:
          name: ingress
          sourceRef:
            kind: HelmRepository
            name: cozystack-extra
            namespace: cozy-public
    - application:
        kind: SeaweedFS
        plural: seaweedfses
        singular: seaweedfs
      release:
        prefix: ""
        labels:
          cozystack.io/ui: "true"
        chart:
          name: seaweedfs
          sourceRef:
            kind: HelmRepository
            name: cozystack-extra
            namespace: cozy-public

Реализовать собственный registry

Для того чтобы хранить состояние не в etcd, а транслировать его напрямую в Kubernetes, в ресурсы типа HelmRelease, мы написали функции конвертации ресурсов Application в HelmRelease и HelmRelease в Application.

Код
/*
Copyright 2024 The Cozystack Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package application

import (
	"context"
	"fmt"
	"net/http"
	"strings"
	"sync"
	"time"

	helmv2 "github.com/fluxcd/helm-controller/api/v2"
	metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	fields "k8s.io/apimachinery/pkg/fields"
	labels "k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/apimachinery/pkg/util/duration"
	"k8s.io/apimachinery/pkg/watch"
	"k8s.io/apiserver/pkg/endpoints/request"
	"k8s.io/apiserver/pkg/registry/rest"
	"k8s.io/client-go/dynamic"
	"k8s.io/klog/v2"

	appsv1alpha1 "github.com/aenix.io/cozystack/pkg/apis/apps/v1alpha1"
	"github.com/aenix.io/cozystack/pkg/config"

	// Importing API errors package to construct appropriate error responses
	apierrors "k8s.io/apimachinery/pkg/api/errors"
)

// Ensure REST implements necessary interfaces
var (
	_ rest.Getter          = &REST{}
	_ rest.Lister          = &REST{}
	_ rest.Updater         = &REST{}
	_ rest.Creater         = &REST{}
	_ rest.GracefulDeleter = &REST{}
	_ rest.Watcher         = &REST{}
	_ rest.Patcher         = &REST{}
)

// Define constants for label and annotation prefixes
const (
	LabelPrefix      = "apps.cozystack.io-"
	AnnotationPrefix = "apps.cozystack.io-"
)

// Define the GroupVersionResource for HelmRelease
var helmReleaseGVR = schema.GroupVersionResource{
	Group:    "helm.toolkit.fluxcd.io",
	Version:  "v2",
	Resource: "helmreleases",
}

// REST implements the RESTStorage interface for Application resources
type REST struct {
	dynamicClient dynamic.Interface
	gvr           schema.GroupVersionResource
	gvk           schema.GroupVersionKind
	kindName      string
	releaseConfig config.ReleaseConfig
}

// NewREST creates a new REST storage for Application with specific configuration
func NewREST(dynamicClient dynamic.Interface, config *config.Resource) *REST {
	return &REST{
		dynamicClient: dynamicClient,
		gvr: schema.GroupVersionResource{
			Group:    appsv1alpha1.GroupName,
			Version:  "v1alpha1",
			Resource: config.Application.Plural,
		},
		gvk: schema.GroupVersion{
			Group:   appsv1alpha1.GroupName,
			Version: "v1alpha1",
		}.WithKind(config.Application.Kind),
		kindName:      config.Application.Kind,
		releaseConfig: config.Release,
	}
}

// NamespaceScoped indicates whether the resource is namespaced
func (r *REST) NamespaceScoped() bool {
	return true
}

// GetSingularName returns the singular name of the resource
func (r *REST) GetSingularName() string {
	return r.gvr.Resource
}

// Create handles the creation of a new Application by converting it to a HelmRelease
func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
	// Assert the object is of type Application
	app, ok := obj.(*appsv1alpha1.Application)
	if !ok {
		return nil, fmt.Errorf("expected Application object, got %T", obj)
	}

	// Convert Application to HelmRelease
	helmRelease, err := r.ConvertApplicationToHelmRelease(app)
	if err != nil {
		klog.Errorf("Conversion error: %v", err)
		return nil, fmt.Errorf("conversion error: %v", err)
	}

	// Merge system labels (from config) directly
	helmRelease.Labels = mergeMaps(r.releaseConfig.Labels, helmRelease.Labels)
	// Merge user labels with prefix
	helmRelease.Labels = mergeMaps(helmRelease.Labels, addPrefixedMap(app.Labels, LabelPrefix))
	// Note: Annotations from config are not handled as r.releaseConfig.Annotations is undefined

	// Convert HelmRelease to unstructured format
	unstructuredHR, err := runtime.DefaultUnstructuredConverter.ToUnstructured(helmRelease)
	if err != nil {
		klog.Errorf("Failed to convert HelmRelease to unstructured: %v", err)
		return nil, fmt.Errorf("failed to convert HelmRelease to unstructured: %v", err)
	}

	klog.V(6).Infof("Creating HelmRelease %s in namespace %s", helmRelease.Name, app.Namespace)

	// Create HelmRelease in Kubernetes
	createdHR, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(app.Namespace).Create(ctx, &unstructured.Unstructured{Object: unstructuredHR}, *options)
	if err != nil {
		klog.Errorf("Failed to create HelmRelease %s: %v", helmRelease.Name, err)
		return nil, fmt.Errorf("failed to create HelmRelease: %v", err)
	}

	// Convert the created HelmRelease back to Application
	convertedApp, err := r.ConvertHelmReleaseToApplication(createdHR)
	if err != nil {
		klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", createdHR.GetName(), err)
		return nil, fmt.Errorf("conversion error: %v", err)
	}

	klog.V(6).Infof("Successfully created and converted HelmRelease %s to Application", createdHR.GetName())

	// Convert Application to unstructured format
	unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&convertedApp)
	if err != nil {
		klog.Errorf("Failed to convert Application to unstructured for resource %s: %v", convertedApp.GetName(), err)
		return nil, fmt.Errorf("failed to convert Application to unstructured: %v", err)
	}

	klog.V(6).Infof("Successfully retrieved and converted resource %s of type %s to unstructured", convertedApp.GetName(), r.gvr.Resource)
	return &unstructured.Unstructured{Object: unstructuredApp}, nil
}

// Get retrieves an Application by converting the corresponding HelmRelease
func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
	namespace, err := r.getNamespace(ctx)
	if err != nil {
		klog.Errorf("Failed to get namespace: %v", err)
		return nil, err
	}

	klog.V(6).Infof("Attempting to retrieve resource %s of type %s in namespace %s", name, r.gvr.Resource, namespace)

	// Get the corresponding HelmRelease using the new prefix
	helmReleaseName := r.releaseConfig.Prefix + name
	hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, helmReleaseName, *options)
	if err != nil {
		klog.Errorf("Error retrieving HelmRelease for resource %s: %v", name, err)

		// Check if the error is a NotFound error
		if apierrors.IsNotFound(err) {
			// Return a NotFound error for the Application resource instead of HelmRelease
			return nil, apierrors.NewNotFound(r.gvr.GroupResource(), name)
		}

		// For other errors, return them as-is
		return nil, err
	}

	// Check if HelmRelease meets the required chartName and sourceRef criteria
	if !r.shouldIncludeHelmRelease(hr) {
		klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmReleaseName)
		// Return a NotFound error for the Application resource
		return nil, apierrors.NewNotFound(r.gvr.GroupResource(), name)
	}

	// Convert HelmRelease to Application
	convertedApp, err := r.ConvertHelmReleaseToApplication(hr)
	if err != nil {
		klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", name, err)
		return nil, fmt.Errorf("conversion error: %v", err)
	}

	// Explicitly set apiVersion and kind for Application
	convertedApp.TypeMeta = metav1.TypeMeta{
		APIVersion: "apps.cozystack.io/v1alpha1",
		Kind:       r.kindName,
	}

	// Convert Application to unstructured format
	unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&convertedApp)
	if err != nil {
		klog.Errorf("Failed to convert Application to unstructured for resource %s: %v", name, err)
		return nil, fmt.Errorf("failed to convert Application to unstructured: %v", err)
	}

	// Explicitly set apiVersion and kind in unstructured object
	unstructuredApp["apiVersion"] = "apps.cozystack.io/v1alpha1"
	unstructuredApp["kind"] = r.kindName

	klog.V(6).Infof("Successfully retrieved and converted resource %s of kind %s to unstructured", name, r.gvr.Resource)
	return &unstructured.Unstructured{Object: unstructuredApp}, nil
}

// List retrieves a list of Applications by converting HelmReleases
func (r *REST) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
	namespace, err := r.getNamespace(ctx)
	if err != nil {
		klog.Errorf("Failed to get namespace: %v", err)
		return nil, err
	}

	klog.V(6).Infof("Attempting to list HelmReleases in namespace %s with options: %v", namespace, options)

	// Get resource name from the request (if any)
	var resourceName string
	if requestInfo, ok := request.RequestInfoFrom(ctx); ok {
		resourceName = requestInfo.Name
	}

	// Initialize variables for selector mapping
	var helmFieldSelector string
	var helmLabelSelector string

	// Process field.selector
	if options.FieldSelector != nil {
		fs, err := fields.ParseSelector(options.FieldSelector.String())
		if err != nil {
			klog.Errorf("Invalid field selector: %v", err)
			return nil, fmt.Errorf("invalid field selector: %v", err)
		}

		// Check if selector is for metadata.name
		if name, exists := fs.RequiresExactMatch("metadata.name"); exists {
			// Convert Application name to HelmRelease name
			mappedName := r.releaseConfig.Prefix + name
			// Create new field.selector for HelmRelease
			helmFieldSelector = fields.OneTermEqualSelector("metadata.name", mappedName).String()
		} else {
			// If field.selector contains other fields, map them directly
			helmFieldSelector = fs.String()
		}
	}

	// Process label.selector
	if options.LabelSelector != nil {
		ls := options.LabelSelector.String()
		parsedLabels, err := labels.Parse(ls)
		if err != nil {
			klog.Errorf("Invalid label selector: %v", err)
			return nil, fmt.Errorf("invalid label selector: %v", err)
		}
		if !parsedLabels.Empty() {
			reqs, _ := parsedLabels.Requirements()
			var prefixedReqs []labels.Requirement
			for _, req := range reqs {
				// Add prefix to each label key
				prefixedReq, err := labels.NewRequirement(LabelPrefix+req.Key(), req.Operator(), req.Values().List())
				if err != nil {
					klog.Errorf("Error prefixing label key: %v", err)
					return nil, fmt.Errorf("error prefixing label key: %v", err)
				}
				prefixedReqs = append(prefixedReqs, *prefixedReq)
			}
			helmLabelSelector = labels.NewSelector().Add(prefixedReqs...).String()
		}
	}

	// Set ListOptions for HelmRelease with selector mapping
	metaOptions := metav1.ListOptions{
		FieldSelector: helmFieldSelector,
		LabelSelector: helmLabelSelector,
	}

	// List HelmReleases with mapped selectors
	hrList, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).List(ctx, metaOptions)
	if err != nil {
		klog.Errorf("Error listing HelmReleases: %v", err)
		return nil, err
	}

	// Initialize empty Application list
	appList := &appsv1alpha1.ApplicationList{
		TypeMeta: metav1.TypeMeta{
			APIVersion: "apps.cozystack.io/v1alpha1",
			Kind:       "ApplicationList",
		},
		ListMeta: metav1.ListMeta{
			ResourceVersion: hrList.GetResourceVersion(),
		},
		Items: []appsv1alpha1.Application{},
	}

	// Iterate over HelmReleases and convert to Applications
	for _, hr := range hrList.Items {
		if !r.shouldIncludeHelmRelease(&hr) {
			continue
		}

		app, err := r.ConvertHelmReleaseToApplication(&hr)
		if err != nil {
			klog.Errorf("Error converting HelmRelease %s to Application: %v", hr.GetName(), err)
			continue
		}

		// If resourceName is set, check for match
		if resourceName != "" && app.Name != resourceName {
			continue
		}

		// Apply label.selector
		if options.LabelSelector != nil {
			sel, err := labels.Parse(options.LabelSelector.String())
			if err != nil {
				klog.Errorf("Invalid label selector: %v", err)
				continue
			}
			if !sel.Matches(labels.Set(app.Labels)) {
				continue
			}
		}

		// Apply field.selector by name and namespace (if specified)
		if options.FieldSelector != nil {
			fs, err := fields.ParseSelector(options.FieldSelector.String())
			if err != nil {
				klog.Errorf("Invalid field selector: %v", err)
				continue
			}

			fieldsSet := fields.Set{
				"metadata.name":      app.Name,
				"metadata.namespace": app.Namespace,
			}
			if !fs.Matches(fieldsSet) {
				continue
			}
		}

		appList.Items = append(appList.Items, app)
	}

	klog.V(6).Infof("Successfully listed %d Application resources in namespace %s", len(appList.Items), namespace)
	return appList, nil
}

// Update updates an existing Application by converting it to a HelmRelease
func (r *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
	// Retrieve the existing Application
	oldObj, err := r.Get(ctx, name, &metav1.GetOptions{})
	if err != nil {
		if apierrors.IsNotFound(err) {
			if !forceAllowCreate {
				return nil, false, err
			}
			// If not found and force allow create, create a new one
			obj, err := objInfo.UpdatedObject(ctx, nil)
			if err != nil {
				klog.Errorf("Failed to get updated object: %v", err)
				return nil, false, err
			}
			createdObj, err := r.Create(ctx, obj, createValidation, &metav1.CreateOptions{})
			if err != nil {
				klog.Errorf("Failed to create new Application: %v", err)
				return nil, false, err
			}
			return createdObj, true, nil
		}
		klog.Errorf("Failed to get existing Application %s: %v", name, err)
		return nil, false, err
	}

	// Update the Application object
	newObj, err := objInfo.UpdatedObject(ctx, oldObj)
	if err != nil {
		klog.Errorf("Failed to get updated object: %v", err)
		return nil, false, err
	}

	// Validate the update if a validation function is provided
	if updateValidation != nil {
		if err := updateValidation(ctx, newObj, oldObj); err != nil {
			klog.Errorf("Update validation failed for Application %s: %v", name, err)
			return nil, false, err
		}
	}

	// Assert the new object is of type Application
	app, ok := newObj.(*appsv1alpha1.Application)
	if !ok {
		errMsg := fmt.Sprintf("expected Application object, got %T", newObj)
		klog.Errorf(errMsg)
		return nil, false, fmt.Errorf(errMsg)
	}

	// Convert Application to HelmRelease
	helmRelease, err := r.ConvertApplicationToHelmRelease(app)
	if err != nil {
		klog.Errorf("Conversion error: %v", err)
		return nil, false, fmt.Errorf("conversion error: %v", err)
	}

	// Merge system labels (from config) directly
	helmRelease.Labels = mergeMaps(r.releaseConfig.Labels, helmRelease.Labels)
	// Merge user labels with prefix
	helmRelease.Labels = mergeMaps(helmRelease.Labels, addPrefixedMap(app.Labels, LabelPrefix))
	// Note: Annotations from config are not handled as r.releaseConfig.Annotations is undefined

	// Convert HelmRelease to unstructured format
	unstructuredHR, err := runtime.DefaultUnstructuredConverter.ToUnstructured(helmRelease)
	if err != nil {
		klog.Errorf("Failed to convert HelmRelease to unstructured: %v", err)
		return nil, false, fmt.Errorf("failed to convert HelmRelease to unstructured: %v", err)
	}

	// Retrieve metadata from unstructured object
	metadata, found, err := unstructured.NestedMap(unstructuredHR, "metadata")
	if err != nil || !found {
		klog.Errorf("Failed to retrieve metadata from HelmRelease: %v, found: %v", err, found)
		return nil, false, fmt.Errorf("failed to retrieve metadata from HelmRelease: %v", err)
	}
	klog.V(6).Infof("HelmRelease Metadata: %+v", metadata)

	klog.V(6).Infof("Updating HelmRelease %s in namespace %s", helmRelease.Name, helmRelease.Namespace)

	// Before updating, ensure the HelmRelease meets the inclusion criteria
	// This prevents updating HelmReleases that should not be managed as Applications
	if !r.shouldIncludeHelmRelease(&unstructured.Unstructured{Object: unstructuredHR}) {
		klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmRelease.Name)
		// Return a NotFound error for the Application resource
		return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
	}

	// Update the HelmRelease in Kubernetes
	resultHR, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(helmRelease.Namespace).Update(ctx, &unstructured.Unstructured{Object: unstructuredHR}, metav1.UpdateOptions{})
	if err != nil {
		klog.Errorf("Failed to update HelmRelease %s: %v", helmRelease.Name, err)
		return nil, false, fmt.Errorf("failed to update HelmRelease: %v", err)
	}

	// After updating, ensure the updated HelmRelease still meets the inclusion criteria
	if !r.shouldIncludeHelmRelease(resultHR) {
		klog.Errorf("Updated HelmRelease %s does not match the required chartName and sourceRef criteria", resultHR.GetName())
		// Return a NotFound error for the Application resource
		return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
	}

	// Convert the updated HelmRelease back to Application
	convertedApp, err := r.ConvertHelmReleaseToApplication(resultHR)
	if err != nil {
		klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", resultHR.GetName(), err)
		return nil, false, fmt.Errorf("conversion error: %v", err)
	}

	klog.V(6).Infof("Successfully updated and converted HelmRelease %s to Application", resultHR.GetName())

	// Explicitly set apiVersion and kind for Application
	convertedApp.TypeMeta = metav1.TypeMeta{
		APIVersion: "apps.cozystack.io/v1alpha1",
		Kind:       r.kindName,
	}

	// Convert Application to unstructured format
	unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&convertedApp)
	if err != nil {
		klog.Errorf("Failed to convert Application to unstructured for resource %s: %v", convertedApp.GetName(), err)
		return nil, false, fmt.Errorf("failed to convert Application to unstructured: %v", err)
	}

	// Explicitly set apiVersion and kind in unstructured object
	unstructuredApp["apiVersion"] = "apps.cozystack.io/v1alpha1"
	unstructuredApp["kind"] = r.kindName

	klog.V(6).Infof("Returning patched Application object: %+v", unstructuredApp)

	return &unstructured.Unstructured{Object: unstructuredApp}, false, nil
}

// Delete removes an Application by deleting the corresponding HelmRelease
func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
	namespace, err := r.getNamespace(ctx)
	if err != nil {
		klog.Errorf("Failed to get namespace: %v", err)
		return nil, false, err
	}

	klog.V(6).Infof("Attempting to delete HelmRelease %s in namespace %s", name, namespace)

	// Construct HelmRelease name with the configured prefix
	helmReleaseName := r.releaseConfig.Prefix + name

	// Retrieve the HelmRelease before attempting to delete
	hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, helmReleaseName, metav1.GetOptions{})
	if err != nil {
		if apierrors.IsNotFound(err) {
			// If HelmRelease does not exist, return NotFound error for Application
			klog.Errorf("HelmRelease %s not found in namespace %s", helmReleaseName, namespace)
			return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
		}
		// For other errors, log and return
		klog.Errorf("Error retrieving HelmRelease %s: %v", helmReleaseName, err)
		return nil, false, err
	}

	// Validate that the HelmRelease meets the inclusion criteria
	if !r.shouldIncludeHelmRelease(hr) {
		klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmReleaseName)
		// Return NotFound error for Application resource
		return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
	}

	klog.V(6).Infof("Deleting HelmRelease %s in namespace %s", helmReleaseName, namespace)

	// Delete the HelmRelease corresponding to the Application
	err = r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Delete(ctx, helmReleaseName, *options)
	if err != nil {
		klog.Errorf("Failed to delete HelmRelease %s: %v", helmReleaseName, err)
		return nil, false, fmt.Errorf("failed to delete HelmRelease: %v", err)
	}

	klog.V(6).Infof("Successfully deleted HelmRelease %s", helmReleaseName)
	return nil, true, nil
}

// Watch sets up a watch on HelmReleases, filters them based on sourceRef and prefix, and converts events to Applications
func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
	namespace, err := r.getNamespace(ctx)
	if err != nil {
		klog.Errorf("Failed to get namespace: %v", err)
		return nil, err
	}

	klog.V(6).Infof("Setting up watch for HelmReleases in namespace %s with options: %v", namespace, options)

	// Get request information, including resource name if specified
	var resourceName string
	if requestInfo, ok := request.RequestInfoFrom(ctx); ok {
		resourceName = requestInfo.Name
	}

	// Initialize variables for selector mapping
	var helmFieldSelector string
	var helmLabelSelector string

	// Process field.selector
	if options.FieldSelector != nil {
		fs, err := fields.ParseSelector(options.FieldSelector.String())
		if err != nil {
			klog.Errorf("Invalid field selector: %v", err)
			return nil, fmt.Errorf("invalid field selector: %v", err)
		}

		// Check if selector is for metadata.name
		if name, exists := fs.RequiresExactMatch("metadata.name"); exists {
			// Convert Application name to HelmRelease name
			mappedName := r.releaseConfig.Prefix + name
			// Create new field.selector for HelmRelease
			helmFieldSelector = fields.OneTermEqualSelector("metadata.name", mappedName).String()
		} else {
			// If field.selector contains other fields, map them directly
			helmFieldSelector = fs.String()
		}
	}

	// Process label.selector
	if options.LabelSelector != nil {
		ls := options.LabelSelector.String()
		parsedLabels, err := labels.Parse(ls)
		if err != nil {
			klog.Errorf("Invalid label selector: %v", err)
			return nil, fmt.Errorf("invalid label selector: %v", err)
		}
		if !parsedLabels.Empty() {
			reqs, _ := parsedLabels.Requirements()
			var prefixedReqs []labels.Requirement
			for _, req := range reqs {
				// Add prefix to each label key
				prefixedReq, err := labels.NewRequirement(LabelPrefix+req.Key(), req.Operator(), req.Values().List())
				if err != nil {
					klog.Errorf("Error prefixing label key: %v", err)
					return nil, fmt.Errorf("error prefixing label key: %v", err)
				}
				prefixedReqs = append(prefixedReqs, *prefixedReq)
			}
			helmLabelSelector = labels.NewSelector().Add(prefixedReqs...).String()
		}
	}

	// Set ListOptions for HelmRelease with selector mapping
	metaOptions := metav1.ListOptions{
		Watch:           true,
		ResourceVersion: options.ResourceVersion,
		FieldSelector:   helmFieldSelector,
		LabelSelector:   helmLabelSelector,
	}

	// Start watch on HelmRelease with mapped selectors
	helmWatcher, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Watch(ctx, metaOptions)
	if err != nil {
		klog.Errorf("Error setting up watch for HelmReleases: %v", err)
		return nil, err
	}

	// Create a custom watcher to transform events
	customW := &customWatcher{
		resultChan: make(chan watch.Event),
		stopChan:   make(chan struct{}),
	}

	go func() {
		defer close(customW.resultChan)
		for {
			select {
			case event, ok := <-helmWatcher.ResultChan():
				if !ok {
					// The watcher has been closed, attempt to re-establish the watch
					klog.Warning("HelmRelease watcher closed, attempting to re-establish")
					// Implement retry logic or exit based on your requirements
					return
				}

				// Check if the object is a *v1.Status
				if status, ok := event.Object.(*metav1.Status); ok {
					klog.V(4).Infof("Received Status object in HelmRelease watch: %v", status.Message)
					continue // Skip processing this event
				}

				// Proceed with processing Unstructured objects
				matches, err := r.isRelevantHelmRelease(&event)
				if err != nil {
					klog.V(4).Infof("Non-critical error filtering HelmRelease event: %v", err)
					continue
				}

				if !matches {
					continue
				}

				// Convert HelmRelease to Application
				app, err := r.ConvertHelmReleaseToApplication(event.Object.(*unstructured.Unstructured))
				if err != nil {
					klog.Errorf("Error converting HelmRelease to Application: %v", err)
					continue
				}

				// Apply field.selector by name if specified
				if resourceName != "" && app.Name != resourceName {
					continue
				}

				// Apply label.selector
				if options.LabelSelector != nil {
					sel, err := labels.Parse(options.LabelSelector.String())
					if err != nil {
						klog.Errorf("Invalid label selector: %v", err)
						continue
					}
					if !sel.Matches(labels.Set(app.Labels)) {
						continue
					}
				}

				// Convert Application to unstructured
				unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&app)
				if err != nil {
					klog.Errorf("Failed to convert Application to unstructured: %v", err)
					continue
				}

				// Create watch event with Application object
				appEvent := watch.Event{
					Type:   event.Type,
					Object: &unstructured.Unstructured{Object: unstructuredApp},
				}

				// Send event to custom watcher
				select {
				case customW.resultChan <- appEvent:
				case <-customW.stopChan:
					return
				case <-ctx.Done():
					return
				}

			case <-customW.stopChan:
				return
			case <-ctx.Done():
				return
			}
		}
	}()

	klog.V(6).Infof("Custom watch established successfully")
	return customW, nil
}

// Helper function to get HelmRelease name from object
func helmReleaseName(obj runtime.Object) string {
	if u, ok := obj.(*unstructured.Unstructured); ok {
		return u.GetName()
	}
	return "<unknown>"
}

// customWatcher wraps the original watcher and filters/converts events
type customWatcher struct {
	resultChan chan watch.Event
	stopChan   chan struct{}
	stopOnce   sync.Once
}

// Stop terminates the watch
func (cw *customWatcher) Stop() {
	cw.stopOnce.Do(func() {
		close(cw.stopChan)
	})
}

// ResultChan returns the event channel
func (cw *customWatcher) ResultChan() <-chan watch.Event {
	return cw.resultChan
}

// isRelevantHelmRelease checks if the HelmRelease meets the sourceRef and prefix criteria
func (r *REST) isRelevantHelmRelease(event *watch.Event) (bool, error) {
	if event.Object == nil {
		return false, nil
	}

	// Check if the object is a *v1.Status
	if status, ok := event.Object.(*metav1.Status); ok {
		// Log at a less severe level or handle specific status errors if needed
		klog.V(4).Infof("Received Status object in HelmRelease watch: %v", status.Message)
		return false, nil // Not relevant for processing as a HelmRelease
	}

	// Proceed if it's an Unstructured object
	hr, ok := event.Object.(*unstructured.Unstructured)
	if !ok {
		return false, fmt.Errorf("expected Unstructured object, got %T", event.Object)
	}

	return r.shouldIncludeHelmRelease(hr), nil
}

// shouldIncludeHelmRelease determines if a HelmRelease should be included based on filtering criteria
func (r *REST) shouldIncludeHelmRelease(hr *unstructured.Unstructured) bool {
	// Filter by Chart Name
	chartName, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "chart")
	if err != nil || !found {
		klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.chart field: %v", hr.GetName(), err)
		return false
	}
	if chartName != r.releaseConfig.Chart.Name {
		klog.V(6).Infof("HelmRelease %s chart name %s does not match expected %s", hr.GetName(), chartName, r.releaseConfig.Chart.Name)
		return false
	}

	// Filter by SourceRefConfig and Prefix
	return r.matchesSourceRefAndPrefix(hr)
}

// matchesSourceRefAndPrefix checks both SourceRefConfig and Prefix criteria
func (r *REST) matchesSourceRefAndPrefix(hr *unstructured.Unstructured) bool {
	// Extract SourceRef fields
	sourceRefKind, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "kind")
	if err != nil || !found {
		klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.kind field: %v", hr.GetName(), err)
		return false
	}
	sourceRefName, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "name")
	if err != nil || !found {
		klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.name field: %v", hr.GetName(), err)
		return false
	}
	sourceRefNamespace, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "namespace")
	if err != nil || !found {
		klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.namespace field: %v", hr.GetName(), err)
		return false
	}

	// Check if SourceRef matches the configuration
	if sourceRefKind != r.releaseConfig.Chart.SourceRef.Kind ||
		sourceRefName != r.releaseConfig.Chart.SourceRef.Name ||
		sourceRefNamespace != r.releaseConfig.Chart.SourceRef.Namespace {
		klog.V(6).Infof("HelmRelease %s sourceRef does not match expected values", hr.GetName())
		return false
	}

	// Additional filtering by Prefix
	name := hr.GetName()
	if !strings.HasPrefix(name, r.releaseConfig.Prefix) {
		klog.V(6).Infof("HelmRelease %s does not have the expected prefix %s", name, r.releaseConfig.Prefix)
		return false
	}

	return true
}

// getNamespace extracts the namespace from the context
func (r *REST) getNamespace(ctx context.Context) (string, error) {
	namespace, ok := request.NamespaceFrom(ctx)
	if !ok {
		err := fmt.Errorf("namespace not found in context")
		klog.Errorf(err.Error())
		return "", err
	}
	return namespace, nil
}

// buildLabelSelector constructs a label selector string from a map of labels
func buildLabelSelector(labels map[string]string) string {
	var selectors []string
	for k, v := range labels {
		selectors = append(selectors, fmt.Sprintf("%s=%s", k, v))
	}
	return strings.Join(selectors, ",")
}

// mergeMaps combines two maps of labels or annotations
func mergeMaps(a, b map[string]string) map[string]string {
	if a == nil && b == nil {
		return nil
	}
	if a == nil {
		return b
	}
	if b == nil {
		return a
	}
	merged := make(map[string]string, len(a)+len(b))
	for k, v := range a {
		merged[k] = v
	}
	for k, v := range b {
		merged[k] = v
	}
	return merged
}

// addPrefixedMap adds the predefined prefix to the keys of a map
func addPrefixedMap(original map[string]string, prefix string) map[string]string {
	if original == nil {
		return nil
	}
	processed := make(map[string]string, len(original))
	for k, v := range original {
		processed[prefix+k] = v
	}
	return processed
}

// filterPrefixedMap filters a map by the predefined prefix and removes the prefix from the keys
func filterPrefixedMap(original map[string]string, prefix string) map[string]string {
	if original == nil {
		return nil
	}
	processed := make(map[string]string)
	for k, v := range original {
		if strings.HasPrefix(k, prefix) {
			newKey := strings.TrimPrefix(k, prefix)
			processed[newKey] = v
		}
	}
	return processed
}

// ConvertHelmReleaseToApplication converts a HelmRelease to an Application
func (r *REST) ConvertHelmReleaseToApplication(hr *unstructured.Unstructured) (appsv1alpha1.Application, error) {
	klog.V(6).Infof("Converting HelmRelease to Application for resource %s", hr.GetName())

	var helmRelease helmv2.HelmRelease
	// Convert unstructured to HelmRelease struct
	err := runtime.DefaultUnstructuredConverter.FromUnstructured(hr.Object, &helmRelease)
	if err != nil {
		klog.Errorf("Error converting from unstructured to HelmRelease: %v", err)
		return appsv1alpha1.Application{}, err
	}

	// Convert HelmRelease struct to Application struct
	app, err := r.convertHelmReleaseToApplication(&helmRelease)
	if err != nil {
		klog.Errorf("Error converting from HelmRelease to Application: %v", err)
		return appsv1alpha1.Application{}, err
	}

	klog.V(6).Infof("Successfully converted HelmRelease %s to Application", hr.GetName())
	return app, nil
}

// ConvertApplicationToHelmRelease converts an Application to a HelmRelease
func (r *REST) ConvertApplicationToHelmRelease(app *appsv1alpha1.Application) (*helmv2.HelmRelease, error) {
	return r.convertApplicationToHelmRelease(app)
}

// convertHelmReleaseToApplication implements the actual conversion logic
func (r *REST) convertHelmReleaseToApplication(hr *helmv2.HelmRelease) (appsv1alpha1.Application, error) {
	app := appsv1alpha1.Application{
		TypeMeta: metav1.TypeMeta{
			APIVersion: "apps.cozystack.io/v1alpha1",
			Kind:       r.kindName,
		},
		ObjectMeta: metav1.ObjectMeta{
			Name:              strings.TrimPrefix(hr.Name, r.releaseConfig.Prefix),
			Namespace:         hr.Namespace,
			UID:               hr.GetUID(),
			ResourceVersion:   hr.GetResourceVersion(),
			CreationTimestamp: hr.CreationTimestamp,
			DeletionTimestamp: hr.DeletionTimestamp,
			Labels:            filterPrefixedMap(hr.Labels, LabelPrefix),
			Annotations:       filterPrefixedMap(hr.Annotations, AnnotationPrefix),
		},
		Spec: hr.Spec.Values,
		Status: appsv1alpha1.ApplicationStatus{
			Version: hr.Status.LastAttemptedRevision,
		},
	}

	var conditions []metav1.Condition
	for _, hrCondition := range hr.GetConditions() {
		if hrCondition.Type == "Ready" || hrCondition.Type == "Released" {
			conditions = append(conditions, metav1.Condition{
				LastTransitionTime: hrCondition.LastTransitionTime,
				Reason:             hrCondition.Reason,
				Message:            hrCondition.Message,
				Status:             hrCondition.Status,
				Type:               hrCondition.Type,
			})
		}
	}
	app.SetConditions(conditions)
	return app, nil
}

// convertApplicationToHelmRelease implements the actual conversion logic
func (r *REST) convertApplicationToHelmRelease(app *appsv1alpha1.Application) (*helmv2.HelmRelease, error) {
	helmRelease := &helmv2.HelmRelease{
		TypeMeta: metav1.TypeMeta{
			APIVersion: "helm.toolkit.fluxcd.io/v2",
			Kind:       "HelmRelease",
		},
		ObjectMeta: metav1.ObjectMeta{
			Name:            r.releaseConfig.Prefix + app.Name,
			Namespace:       app.Namespace,
			Labels:          addPrefixedMap(app.Labels, LabelPrefix),
			Annotations:     addPrefixedMap(app.Annotations, AnnotationPrefix),
			ResourceVersion: app.ObjectMeta.ResourceVersion,
			UID:             app.ObjectMeta.UID,
		},
		Spec: helmv2.HelmReleaseSpec{
			Chart: &helmv2.HelmChartTemplate{
				Spec: helmv2.HelmChartTemplateSpec{
					Chart:             r.releaseConfig.Chart.Name,
					Version:           app.AppVersion,
					ReconcileStrategy: "Revision",
					SourceRef: helmv2.CrossNamespaceObjectReference{
						Kind:      r.releaseConfig.Chart.SourceRef.Kind,
						Name:      r.releaseConfig.Chart.SourceRef.Name,
						Namespace: r.releaseConfig.Chart.SourceRef.Namespace,
					},
				},
			},
			Values: app.Spec,
		},
	}

	return helmRelease, nil
}

// ConvertToTable implements the TableConvertor interface for displaying resources in a table format
func (r *REST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
	klog.V(6).Infof("ConvertToTable: received object of type %T", object)

	var table metav1.Table

	switch obj := object.(type) {
	case *appsv1alpha1.ApplicationList:
		table = r.buildTableFromApplications(obj.Items)
		table.ListMeta.ResourceVersion = obj.ListMeta.ResourceVersion
	case *appsv1alpha1.Application:
		table = r.buildTableFromApplication(*obj)
		table.ListMeta.ResourceVersion = obj.GetResourceVersion()
	case *unstructured.Unstructured:
		var app appsv1alpha1.Application
		err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &app)
		if err != nil {
			klog.Errorf("Failed to convert Unstructured to Application: %v", err)
			return nil, fmt.Errorf("failed to convert Unstructured to Application: %v", err)
		}
		table = r.buildTableFromApplication(app)
		table.ListMeta.ResourceVersion = obj.GetResourceVersion()
	default:
		resource := schema.GroupResource{}
		if info, ok := request.RequestInfoFrom(ctx); ok {
			resource = schema.GroupResource{Group: info.APIGroup, Resource: info.Resource}
		}
		return nil, errNotAcceptable{
			resource: resource,
			message:  "object does not implement the Object interfaces",
		}
	}

	// Handle table options
	if opt, ok := tableOptions.(*metav1.TableOptions); ok && opt != nil && opt.NoHeaders {
		table.ColumnDefinitions = nil
	}

	table.TypeMeta = metav1.TypeMeta{
		APIVersion: "meta.k8s.io/v1",
		Kind:       "Table",
	}

	klog.V(6).Infof("ConvertToTable: returning table with %d rows", len(table.Rows))

	return &table, nil
}

// buildTableFromApplications constructs a table from a list of Applications
func (r *REST) buildTableFromApplications(apps []appsv1alpha1.Application) metav1.Table {
	table := metav1.Table{
		ColumnDefinitions: []metav1.TableColumnDefinition{
			{Name: "NAME", Type: "string", Description: "Name of the Application", Priority: 0},
			{Name: "READY", Type: "string", Description: "Ready status of the Application", Priority: 0},
			{Name: "AGE", Type: "string", Description: "Age of the Application", Priority: 0},
			{Name: "VERSION", Type: "string", Description: "Version of the Application", Priority: 0},
		},
		Rows: make([]metav1.TableRow, 0, len(apps)),
	}
	now := time.Now()

	for _, app := range apps {
		row := metav1.TableRow{
			Cells:  []interface{}{app.GetName(), getReadyStatus(app.Status.Conditions), computeAge(app.GetCreationTimestamp().Time, now), getVersion(app.Status.Version)},
			Object: runtime.RawExtension{Object: &app},
		}
		table.Rows = append(table.Rows, row)
	}

	return table
}

// buildTableFromApplication constructs a table from a single Application
func (r *REST) buildTableFromApplication(app appsv1alpha1.Application) metav1.Table {
	table := metav1.Table{
		ColumnDefinitions: []metav1.TableColumnDefinition{
			{Name: "NAME", Type: "string", Description: "Name of the Application", Priority: 0},
			{Name: "READY", Type: "string", Description: "Ready status of the Application", Priority: 0},
			{Name: "AGE", Type: "string", Description: "Age of the Application", Priority: 0},
			{Name: "VERSION", Type: "string", Description: "Version of the Application", Priority: 0},
		},
		Rows: []metav1.TableRow{},
	}
	now := time.Now()

	row := metav1.TableRow{
		Cells:  []interface{}{app.GetName(), getReadyStatus(app.Status.Conditions), computeAge(app.GetCreationTimestamp().Time, now), getVersion(app.Status.Version)},
		Object: runtime.RawExtension{Object: &app},
	}
	table.Rows = append(table.Rows, row)

	return table
}

// getVersion returns the application version or a placeholder if unknown
func getVersion(version string) string {
	if version == "" {
		return "<unknown>"
	}
	return version
}

// computeAge calculates the age of the object based on CreationTimestamp and current time
func computeAge(creationTime, currentTime time.Time) string {
	ageDuration := currentTime.Sub(creationTime)
	return duration.HumanDuration(ageDuration)
}

// getReadyStatus returns the ready status based on conditions
func getReadyStatus(conditions []metav1.Condition) string {
	for _, condition := range conditions {
		if condition.Type == "Ready" {
			switch condition.Status {
			case metav1.ConditionTrue:
				return "True"
			case metav1.ConditionFalse:
				return "False"
			default:
				return "Unknown"
			}
		}
	}
	return "Unknown"
}

// Destroy releases resources associated with REST
func (r *REST) Destroy() {
	// No additional actions needed to release resources.
}

// New creates a new instance of Application
func (r *REST) New() runtime.Object {
	return &appsv1alpha1.Application{}
}

// NewList returns an empty list of Application objects
func (r *REST) NewList() runtime.Object {
	return &appsv1alpha1.ApplicationList{}
}

// Kind returns the resource kind used for API discovery
func (r *REST) Kind() string {
	return r.gvk.Kind
}

// GroupVersionKind returns the GroupVersionKind for REST
func (r *REST) GroupVersionKind(schema.GroupVersion) schema.GroupVersionKind {
	return r.gvk
}

// errNotAcceptable indicates that the resource does not support conversion to Table
type errNotAcceptable struct {
	resource schema.GroupResource
	message  string
}

func (e errNotAcceptable) Error() string {
	return e.message
}

func (e errNotAcceptable) Status() metav1.Status {
	return metav1.Status{
		Status:  metav1.StatusFailure,
		Code:    http.StatusNotAcceptable,
		Reason:  metav1.StatusReason("NotAcceptable"),
		Message: e.Error(),
	}
}

Реализовали логику фильтрации ресурсов по имени чарта, sourceRef и префиксу в имени HelmRelease.

Код
/*
Copyright 2024 The Cozystack Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package application

import (
	"context"
	"fmt"
	"net/http"
	"strings"
	"sync"
	"time"

	helmv2 "github.com/fluxcd/helm-controller/api/v2"
	metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	fields "k8s.io/apimachinery/pkg/fields"
	labels "k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/apimachinery/pkg/util/duration"
	"k8s.io/apimachinery/pkg/watch"
	"k8s.io/apiserver/pkg/endpoints/request"
	"k8s.io/apiserver/pkg/registry/rest"
	"k8s.io/client-go/dynamic"
	"k8s.io/klog/v2"

	appsv1alpha1 "github.com/aenix.io/cozystack/pkg/apis/apps/v1alpha1"
	"github.com/aenix.io/cozystack/pkg/config"

	// Importing API errors package to construct appropriate error responses
	apierrors "k8s.io/apimachinery/pkg/api/errors"
)

// Ensure REST implements necessary interfaces
var (
	_ rest.Getter          = &REST{}
	_ rest.Lister          = &REST{}
	_ rest.Updater         = &REST{}
	_ rest.Creater         = &REST{}
	_ rest.GracefulDeleter = &REST{}
	_ rest.Watcher         = &REST{}
	_ rest.Patcher         = &REST{}
)

// Define constants for label and annotation prefixes
const (
	LabelPrefix      = "apps.cozystack.io-"
	AnnotationPrefix = "apps.cozystack.io-"
)

// Define the GroupVersionResource for HelmRelease
var helmReleaseGVR = schema.GroupVersionResource{
	Group:    "helm.toolkit.fluxcd.io",
	Version:  "v2",
	Resource: "helmreleases",
}

// REST implements the RESTStorage interface for Application resources
type REST struct {
	dynamicClient dynamic.Interface
	gvr           schema.GroupVersionResource
	gvk           schema.GroupVersionKind
	kindName      string
	releaseConfig config.ReleaseConfig
}

// NewREST creates a new REST storage for Application with specific configuration
func NewREST(dynamicClient dynamic.Interface, config *config.Resource) *REST {
	return &REST{
		dynamicClient: dynamicClient,
		gvr: schema.GroupVersionResource{
			Group:    appsv1alpha1.GroupName,
			Version:  "v1alpha1",
			Resource: config.Application.Plural,
		},
		gvk: schema.GroupVersion{
			Group:   appsv1alpha1.GroupName,
			Version: "v1alpha1",
		}.WithKind(config.Application.Kind),
		kindName:      config.Application.Kind,
		releaseConfig: config.Release,
	}
}

// NamespaceScoped indicates whether the resource is namespaced
func (r *REST) NamespaceScoped() bool {
	return true
}

// GetSingularName returns the singular name of the resource
func (r *REST) GetSingularName() string {
	return r.gvr.Resource
}

// Create handles the creation of a new Application by converting it to a HelmRelease
func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
	// Assert the object is of type Application
	app, ok := obj.(*appsv1alpha1.Application)
	if !ok {
		return nil, fmt.Errorf("expected Application object, got %T", obj)
	}

	// Convert Application to HelmRelease
	helmRelease, err := r.ConvertApplicationToHelmRelease(app)
	if err != nil {
		klog.Errorf("Conversion error: %v", err)
		return nil, fmt.Errorf("conversion error: %v", err)
	}

	// Merge system labels (from config) directly
	helmRelease.Labels = mergeMaps(r.releaseConfig.Labels, helmRelease.Labels)
	// Merge user labels with prefix
	helmRelease.Labels = mergeMaps(helmRelease.Labels, addPrefixedMap(app.Labels, LabelPrefix))
	// Note: Annotations from config are not handled as r.releaseConfig.Annotations is undefined

	// Convert HelmRelease to unstructured format
	unstructuredHR, err := runtime.DefaultUnstructuredConverter.ToUnstructured(helmRelease)
	if err != nil {
		klog.Errorf("Failed to convert HelmRelease to unstructured: %v", err)
		return nil, fmt.Errorf("failed to convert HelmRelease to unstructured: %v", err)
	}

	klog.V(6).Infof("Creating HelmRelease %s in namespace %s", helmRelease.Name, app.Namespace)

	// Create HelmRelease in Kubernetes
	createdHR, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(app.Namespace).Create(ctx, &unstructured.Unstructured{Object: unstructuredHR}, *options)
	if err != nil {
		klog.Errorf("Failed to create HelmRelease %s: %v", helmRelease.Name, err)
		return nil, fmt.Errorf("failed to create HelmRelease: %v", err)
	}

	// Convert the created HelmRelease back to Application
	convertedApp, err := r.ConvertHelmReleaseToApplication(createdHR)
	if err != nil {
		klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", createdHR.GetName(), err)
		return nil, fmt.Errorf("conversion error: %v", err)
	}

	klog.V(6).Infof("Successfully created and converted HelmRelease %s to Application", createdHR.GetName())

	// Convert Application to unstructured format
	unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&convertedApp)
	if err != nil {
		klog.Errorf("Failed to convert Application to unstructured for resource %s: %v", convertedApp.GetName(), err)
		return nil, fmt.Errorf("failed to convert Application to unstructured: %v", err)
	}

	klog.V(6).Infof("Successfully retrieved and converted resource %s of type %s to unstructured", convertedApp.GetName(), r.gvr.Resource)
	return &unstructured.Unstructured{Object: unstructuredApp}, nil
}

// Get retrieves an Application by converting the corresponding HelmRelease
func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
	namespace, err := r.getNamespace(ctx)
	if err != nil {
		klog.Errorf("Failed to get namespace: %v", err)
		return nil, err
	}

	klog.V(6).Infof("Attempting to retrieve resource %s of type %s in namespace %s", name, r.gvr.Resource, namespace)

	// Get the corresponding HelmRelease using the new prefix
	helmReleaseName := r.releaseConfig.Prefix + name
	hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, helmReleaseName, *options)
	if err != nil {
		klog.Errorf("Error retrieving HelmRelease for resource %s: %v", name, err)

		// Check if the error is a NotFound error
		if apierrors.IsNotFound(err) {
			// Return a NotFound error for the Application resource instead of HelmRelease
			return nil, apierrors.NewNotFound(r.gvr.GroupResource(), name)
		}

		// For other errors, return them as-is
		return nil, err
	}

	// Check if HelmRelease meets the required chartName and sourceRef criteria
	if !r.shouldIncludeHelmRelease(hr) {
		klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmReleaseName)
		// Return a NotFound error for the Application resource
		return nil, apierrors.NewNotFound(r.gvr.GroupResource(), name)
	}

	// Convert HelmRelease to Application
	convertedApp, err := r.ConvertHelmReleaseToApplication(hr)
	if err != nil {
		klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", name, err)
		return nil, fmt.Errorf("conversion error: %v", err)
	}

	// Explicitly set apiVersion and kind for Application
	convertedApp.TypeMeta = metav1.TypeMeta{
		APIVersion: "apps.cozystack.io/v1alpha1",
		Kind:       r.kindName,
	}

	// Convert Application to unstructured format
	unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&convertedApp)
	if err != nil {
		klog.Errorf("Failed to convert Application to unstructured for resource %s: %v", name, err)
		return nil, fmt.Errorf("failed to convert Application to unstructured: %v", err)
	}

	// Explicitly set apiVersion and kind in unstructured object
	unstructuredApp["apiVersion"] = "apps.cozystack.io/v1alpha1"
	unstructuredApp["kind"] = r.kindName

	klog.V(6).Infof("Successfully retrieved and converted resource %s of kind %s to unstructured", name, r.gvr.Resource)
	return &unstructured.Unstructured{Object: unstructuredApp}, nil
}

// List retrieves a list of Applications by converting HelmReleases
func (r *REST) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
	namespace, err := r.getNamespace(ctx)
	if err != nil {
		klog.Errorf("Failed to get namespace: %v", err)
		return nil, err
	}

	klog.V(6).Infof("Attempting to list HelmReleases in namespace %s with options: %v", namespace, options)

	// Get resource name from the request (if any)
	var resourceName string
	if requestInfo, ok := request.RequestInfoFrom(ctx); ok {
		resourceName = requestInfo.Name
	}

	// Initialize variables for selector mapping
	var helmFieldSelector string
	var helmLabelSelector string

	// Process field.selector
	if options.FieldSelector != nil {
		fs, err := fields.ParseSelector(options.FieldSelector.String())
		if err != nil {
			klog.Errorf("Invalid field selector: %v", err)
			return nil, fmt.Errorf("invalid field selector: %v", err)
		}

		// Check if selector is for metadata.name
		if name, exists := fs.RequiresExactMatch("metadata.name"); exists {
			// Convert Application name to HelmRelease name
			mappedName := r.releaseConfig.Prefix + name
			// Create new field.selector for HelmRelease
			helmFieldSelector = fields.OneTermEqualSelector("metadata.name", mappedName).String()
		} else {
			// If field.selector contains other fields, map them directly
			helmFieldSelector = fs.String()
		}
	}

	// Process label.selector
	if options.LabelSelector != nil {
		ls := options.LabelSelector.String()
		parsedLabels, err := labels.Parse(ls)
		if err != nil {
			klog.Errorf("Invalid label selector: %v", err)
			return nil, fmt.Errorf("invalid label selector: %v", err)
		}
		if !parsedLabels.Empty() {
			reqs, _ := parsedLabels.Requirements()
			var prefixedReqs []labels.Requirement
			for _, req := range reqs {
				// Add prefix to each label key
				prefixedReq, err := labels.NewRequirement(LabelPrefix+req.Key(), req.Operator(), req.Values().List())
				if err != nil {
					klog.Errorf("Error prefixing label key: %v", err)
					return nil, fmt.Errorf("error prefixing label key: %v", err)
				}
				prefixedReqs = append(prefixedReqs, *prefixedReq)
			}
			helmLabelSelector = labels.NewSelector().Add(prefixedReqs...).String()
		}
	}

	// Set ListOptions for HelmRelease with selector mapping
	metaOptions := metav1.ListOptions{
		FieldSelector: helmFieldSelector,
		LabelSelector: helmLabelSelector,
	}

	// List HelmReleases with mapped selectors
	hrList, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).List(ctx, metaOptions)
	if err != nil {
		klog.Errorf("Error listing HelmReleases: %v", err)
		return nil, err
	}

	// Initialize empty Application list
	appList := &appsv1alpha1.ApplicationList{
		TypeMeta: metav1.TypeMeta{
			APIVersion: "apps.cozystack.io/v1alpha1",
			Kind:       "ApplicationList",
		},
		ListMeta: metav1.ListMeta{
			ResourceVersion: hrList.GetResourceVersion(),
		},
		Items: []appsv1alpha1.Application{},
	}

	// Iterate over HelmReleases and convert to Applications
	for _, hr := range hrList.Items {
		if !r.shouldIncludeHelmRelease(&hr) {
			continue
		}

		app, err := r.ConvertHelmReleaseToApplication(&hr)
		if err != nil {
			klog.Errorf("Error converting HelmRelease %s to Application: %v", hr.GetName(), err)
			continue
		}

		// If resourceName is set, check for match
		if resourceName != "" && app.Name != resourceName {
			continue
		}

		// Apply label.selector
		if options.LabelSelector != nil {
			sel, err := labels.Parse(options.LabelSelector.String())
			if err != nil {
				klog.Errorf("Invalid label selector: %v", err)
				continue
			}
			if !sel.Matches(labels.Set(app.Labels)) {
				continue
			}
		}

		// Apply field.selector by name and namespace (if specified)
		if options.FieldSelector != nil {
			fs, err := fields.ParseSelector(options.FieldSelector.String())
			if err != nil {
				klog.Errorf("Invalid field selector: %v", err)
				continue
			}

			fieldsSet := fields.Set{
				"metadata.name":      app.Name,
				"metadata.namespace": app.Namespace,
			}
			if !fs.Matches(fieldsSet) {
				continue
			}
		}

		appList.Items = append(appList.Items, app)
	}

	klog.V(6).Infof("Successfully listed %d Application resources in namespace %s", len(appList.Items), namespace)
	return appList, nil
}

// Update updates an existing Application by converting it to a HelmRelease
func (r *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
	// Retrieve the existing Application
	oldObj, err := r.Get(ctx, name, &metav1.GetOptions{})
	if err != nil {
		if apierrors.IsNotFound(err) {
			if !forceAllowCreate {
				return nil, false, err
			}
			// If not found and force allow create, create a new one
			obj, err := objInfo.UpdatedObject(ctx, nil)
			if err != nil {
				klog.Errorf("Failed to get updated object: %v", err)
				return nil, false, err
			}
			createdObj, err := r.Create(ctx, obj, createValidation, &metav1.CreateOptions{})
			if err != nil {
				klog.Errorf("Failed to create new Application: %v", err)
				return nil, false, err
			}
			return createdObj, true, nil
		}
		klog.Errorf("Failed to get existing Application %s: %v", name, err)
		return nil, false, err
	}

	// Update the Application object
	newObj, err := objInfo.UpdatedObject(ctx, oldObj)
	if err != nil {
		klog.Errorf("Failed to get updated object: %v", err)
		return nil, false, err
	}

	// Validate the update if a validation function is provided
	if updateValidation != nil {
		if err := updateValidation(ctx, newObj, oldObj); err != nil {
			klog.Errorf("Update validation failed for Application %s: %v", name, err)
			return nil, false, err
		}
	}

	// Assert the new object is of type Application
	app, ok := newObj.(*appsv1alpha1.Application)
	if !ok {
		errMsg := fmt.Sprintf("expected Application object, got %T", newObj)
		klog.Errorf(errMsg)
		return nil, false, fmt.Errorf(errMsg)
	}

	// Convert Application to HelmRelease
	helmRelease, err := r.ConvertApplicationToHelmRelease(app)
	if err != nil {
		klog.Errorf("Conversion error: %v", err)
		return nil, false, fmt.Errorf("conversion error: %v", err)
	}

	// Merge system labels (from config) directly
	helmRelease.Labels = mergeMaps(r.releaseConfig.Labels, helmRelease.Labels)
	// Merge user labels with prefix
	helmRelease.Labels = mergeMaps(helmRelease.Labels, addPrefixedMap(app.Labels, LabelPrefix))
	// Note: Annotations from config are not handled as r.releaseConfig.Annotations is undefined

	// Convert HelmRelease to unstructured format
	unstructuredHR, err := runtime.DefaultUnstructuredConverter.ToUnstructured(helmRelease)
	if err != nil {
		klog.Errorf("Failed to convert HelmRelease to unstructured: %v", err)
		return nil, false, fmt.Errorf("failed to convert HelmRelease to unstructured: %v", err)
	}

	// Retrieve metadata from unstructured object
	metadata, found, err := unstructured.NestedMap(unstructuredHR, "metadata")
	if err != nil || !found {
		klog.Errorf("Failed to retrieve metadata from HelmRelease: %v, found: %v", err, found)
		return nil, false, fmt.Errorf("failed to retrieve metadata from HelmRelease: %v", err)
	}
	klog.V(6).Infof("HelmRelease Metadata: %+v", metadata)

	klog.V(6).Infof("Updating HelmRelease %s in namespace %s", helmRelease.Name, helmRelease.Namespace)

	// Before updating, ensure the HelmRelease meets the inclusion criteria
	// This prevents updating HelmReleases that should not be managed as Applications
	if !r.shouldIncludeHelmRelease(&unstructured.Unstructured{Object: unstructuredHR}) {
		klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmRelease.Name)
		// Return a NotFound error for the Application resource
		return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
	}

	// Update the HelmRelease in Kubernetes
	resultHR, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(helmRelease.Namespace).Update(ctx, &unstructured.Unstructured{Object: unstructuredHR}, metav1.UpdateOptions{})
	if err != nil {
		klog.Errorf("Failed to update HelmRelease %s: %v", helmRelease.Name, err)
		return nil, false, fmt.Errorf("failed to update HelmRelease: %v", err)
	}

	// After updating, ensure the updated HelmRelease still meets the inclusion criteria
	if !r.shouldIncludeHelmRelease(resultHR) {
		klog.Errorf("Updated HelmRelease %s does not match the required chartName and sourceRef criteria", resultHR.GetName())
		// Return a NotFound error for the Application resource
		return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
	}

	// Convert the updated HelmRelease back to Application
	convertedApp, err := r.ConvertHelmReleaseToApplication(resultHR)
	if err != nil {
		klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", resultHR.GetName(), err)
		return nil, false, fmt.Errorf("conversion error: %v", err)
	}

	klog.V(6).Infof("Successfully updated and converted HelmRelease %s to Application", resultHR.GetName())

	// Explicitly set apiVersion and kind for Application
	convertedApp.TypeMeta = metav1.TypeMeta{
		APIVersion: "apps.cozystack.io/v1alpha1",
		Kind:       r.kindName,
	}

	// Convert Application to unstructured format
	unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&convertedApp)
	if err != nil {
		klog.Errorf("Failed to convert Application to unstructured for resource %s: %v", convertedApp.GetName(), err)
		return nil, false, fmt.Errorf("failed to convert Application to unstructured: %v", err)
	}

	// Explicitly set apiVersion and kind in unstructured object
	unstructuredApp["apiVersion"] = "apps.cozystack.io/v1alpha1"
	unstructuredApp["kind"] = r.kindName

	klog.V(6).Infof("Returning patched Application object: %+v", unstructuredApp)

	return &unstructured.Unstructured{Object: unstructuredApp}, false, nil
}

// Delete removes an Application by deleting the corresponding HelmRelease
func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
	namespace, err := r.getNamespace(ctx)
	if err != nil {
		klog.Errorf("Failed to get namespace: %v", err)
		return nil, false, err
	}

	klog.V(6).Infof("Attempting to delete HelmRelease %s in namespace %s", name, namespace)

	// Construct HelmRelease name with the configured prefix
	helmReleaseName := r.releaseConfig.Prefix + name

	// Retrieve the HelmRelease before attempting to delete
	hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, helmReleaseName, metav1.GetOptions{})
	if err != nil {
		if apierrors.IsNotFound(err) {
			// If HelmRelease does not exist, return NotFound error for Application
			klog.Errorf("HelmRelease %s not found in namespace %s", helmReleaseName, namespace)
			return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
		}
		// For other errors, log and return
		klog.Errorf("Error retrieving HelmRelease %s: %v", helmReleaseName, err)
		return nil, false, err
	}

	// Validate that the HelmRelease meets the inclusion criteria
	if !r.shouldIncludeHelmRelease(hr) {
		klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmReleaseName)
		// Return NotFound error for Application resource
		return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
	}

	klog.V(6).Infof("Deleting HelmRelease %s in namespace %s", helmReleaseName, namespace)

	// Delete the HelmRelease corresponding to the Application
	err = r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Delete(ctx, helmReleaseName, *options)
	if err != nil {
		klog.Errorf("Failed to delete HelmRelease %s: %v", helmReleaseName, err)
		return nil, false, fmt.Errorf("failed to delete HelmRelease: %v", err)
	}

	klog.V(6).Infof("Successfully deleted HelmRelease %s", helmReleaseName)
	return nil, true, nil
}

// Watch sets up a watch on HelmReleases, filters them based on sourceRef and prefix, and converts events to Applications
func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
	namespace, err := r.getNamespace(ctx)
	if err != nil {
		klog.Errorf("Failed to get namespace: %v", err)
		return nil, err
	}

	klog.V(6).Infof("Setting up watch for HelmReleases in namespace %s with options: %v", namespace, options)

	// Get request information, including resource name if specified
	var resourceName string
	if requestInfo, ok := request.RequestInfoFrom(ctx); ok {
		resourceName = requestInfo.Name
	}

	// Initialize variables for selector mapping
	var helmFieldSelector string
	var helmLabelSelector string

	// Process field.selector
	if options.FieldSelector != nil {
		fs, err := fields.ParseSelector(options.FieldSelector.String())
		if err != nil {
			klog.Errorf("Invalid field selector: %v", err)
			return nil, fmt.Errorf("invalid field selector: %v", err)
		}

		// Check if selector is for metadata.name
		if name, exists := fs.RequiresExactMatch("metadata.name"); exists {
			// Convert Application name to HelmRelease name
			mappedName := r.releaseConfig.Prefix + name
			// Create new field.selector for HelmRelease
			helmFieldSelector = fields.OneTermEqualSelector("metadata.name", mappedName).String()
		} else {
			// If field.selector contains other fields, map them directly
			helmFieldSelector = fs.String()
		}
	}

	// Process label.selector
	if options.LabelSelector != nil {
		ls := options.LabelSelector.String()
		parsedLabels, err := labels.Parse(ls)
		if err != nil {
			klog.Errorf("Invalid label selector: %v", err)
			return nil, fmt.Errorf("invalid label selector: %v", err)
		}
		if !parsedLabels.Empty() {
			reqs, _ := parsedLabels.Requirements()
			var prefixedReqs []labels.Requirement
			for _, req := range reqs {
				// Add prefix to each label key
				prefixedReq, err := labels.NewRequirement(LabelPrefix+req.Key(), req.Operator(), req.Values().List())
				if err != nil {
					klog.Errorf("Error prefixing label key: %v", err)
					return nil, fmt.Errorf("error prefixing label key: %v", err)
				}
				prefixedReqs = append(prefixedReqs, *prefixedReq)
			}
			helmLabelSelector = labels.NewSelector().Add(prefixedReqs...).String()
		}
	}

	// Set ListOptions for HelmRelease with selector mapping
	metaOptions := metav1.ListOptions{
		Watch:           true,
		ResourceVersion: options.ResourceVersion,
		FieldSelector:   helmFieldSelector,
		LabelSelector:   helmLabelSelector,
	}

	// Start watch on HelmRelease with mapped selectors
	helmWatcher, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Watch(ctx, metaOptions)
	if err != nil {
		klog.Errorf("Error setting up watch for HelmReleases: %v", err)
		return nil, err
	}

	// Create a custom watcher to transform events
	customW := &customWatcher{
		resultChan: make(chan watch.Event),
		stopChan:   make(chan struct{}),
	}

	go func() {
		defer close(customW.resultChan)
		for {
			select {
			case event, ok := <-helmWatcher.ResultChan():
				if !ok {
					// The watcher has been closed, attempt to re-establish the watch
					klog.Warning("HelmRelease watcher closed, attempting to re-establish")
					// Implement retry logic or exit based on your requirements
					return
				}

				// Check if the object is a *v1.Status
				if status, ok := event.Object.(*metav1.Status); ok {
					klog.V(4).Infof("Received Status object in HelmRelease watch: %v", status.Message)
					continue // Skip processing this event
				}

				// Proceed with processing Unstructured objects
				matches, err := r.isRelevantHelmRelease(&event)
				if err != nil {
					klog.V(4).Infof("Non-critical error filtering HelmRelease event: %v", err)
					continue
				}

				if !matches {
					continue
				}

				// Convert HelmRelease to Application
				app, err := r.ConvertHelmReleaseToApplication(event.Object.(*unstructured.Unstructured))
				if err != nil {
					klog.Errorf("Error converting HelmRelease to Application: %v", err)
					continue
				}

				// Apply field.selector by name if specified
				if resourceName != "" && app.Name != resourceName {
					continue
				}

				// Apply label.selector
				if options.LabelSelector != nil {
					sel, err := labels.Parse(options.LabelSelector.String())
					if err != nil {
						klog.Errorf("Invalid label selector: %v", err)
						continue
					}
					if !sel.Matches(labels.Set(app.Labels)) {
						continue
					}
				}

				// Convert Application to unstructured
				unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&app)
				if err != nil {
					klog.Errorf("Failed to convert Application to unstructured: %v", err)
					continue
				}

				// Create watch event with Application object
				appEvent := watch.Event{
					Type:   event.Type,
					Object: &unstructured.Unstructured{Object: unstructuredApp},
				}

				// Send event to custom watcher
				select {
				case customW.resultChan <- appEvent:
				case <-customW.stopChan:
					return
				case <-ctx.Done():
					return
				}

			case <-customW.stopChan:
				return
			case <-ctx.Done():
				return
			}
		}
	}()

	klog.V(6).Infof("Custom watch established successfully")
	return customW, nil
}

// Helper function to get HelmRelease name from object
func helmReleaseName(obj runtime.Object) string {
	if u, ok := obj.(*unstructured.Unstructured); ok {
		return u.GetName()
	}
	return "<unknown>"
}

// customWatcher wraps the original watcher and filters/converts events
type customWatcher struct {
	resultChan chan watch.Event
	stopChan   chan struct{}
	stopOnce   sync.Once
}

// Stop terminates the watch
func (cw *customWatcher) Stop() {
	cw.stopOnce.Do(func() {
		close(cw.stopChan)
	})
}

// ResultChan returns the event channel
func (cw *customWatcher) ResultChan() <-chan watch.Event {
	return cw.resultChan
}

// isRelevantHelmRelease checks if the HelmRelease meets the sourceRef and prefix criteria
func (r *REST) isRelevantHelmRelease(event *watch.Event) (bool, error) {
	if event.Object == nil {
		return false, nil
	}

	// Check if the object is a *v1.Status
	if status, ok := event.Object.(*metav1.Status); ok {
		// Log at a less severe level or handle specific status errors if needed
		klog.V(4).Infof("Received Status object in HelmRelease watch: %v", status.Message)
		return false, nil // Not relevant for processing as a HelmRelease
	}

	// Proceed if it's an Unstructured object
	hr, ok := event.Object.(*unstructured.Unstructured)
	if !ok {
		return false, fmt.Errorf("expected Unstructured object, got %T", event.Object)
	}

	return r.shouldIncludeHelmRelease(hr), nil
}

// shouldIncludeHelmRelease determines if a HelmRelease should be included based on filtering criteria
func (r *REST) shouldIncludeHelmRelease(hr *unstructured.Unstructured) bool {
	// Filter by Chart Name
	chartName, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "chart")
	if err != nil || !found {
		klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.chart field: %v", hr.GetName(), err)
		return false
	}
	if chartName != r.releaseConfig.Chart.Name {
		klog.V(6).Infof("HelmRelease %s chart name %s does not match expected %s", hr.GetName(), chartName, r.releaseConfig.Chart.Name)
		return false
	}

	// Filter by SourceRefConfig and Prefix
	return r.matchesSourceRefAndPrefix(hr)
}

// matchesSourceRefAndPrefix checks both SourceRefConfig and Prefix criteria
func (r *REST) matchesSourceRefAndPrefix(hr *unstructured.Unstructured) bool {
	// Extract SourceRef fields
	sourceRefKind, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "kind")
	if err != nil || !found {
		klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.kind field: %v", hr.GetName(), err)
		return false
	}
	sourceRefName, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "name")
	if err != nil || !found {
		klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.name field: %v", hr.GetName(), err)
		return false
	}
	sourceRefNamespace, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "namespace")
	if err != nil || !found {
		klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.namespace field: %v", hr.GetName(), err)
		return false
	}

	// Check if SourceRef matches the configuration
	if sourceRefKind != r.releaseConfig.Chart.SourceRef.Kind ||
		sourceRefName != r.releaseConfig.Chart.SourceRef.Name ||
		sourceRefNamespace != r.releaseConfig.Chart.SourceRef.Namespace {
		klog.V(6).Infof("HelmRelease %s sourceRef does not match expected values", hr.GetName())
		return false
	}

	// Additional filtering by Prefix
	name := hr.GetName()
	if !strings.HasPrefix(name, r.releaseConfig.Prefix) {
		klog.V(6).Infof("HelmRelease %s does not have the expected prefix %s", name, r.releaseConfig.Prefix)
		return false
	}

	return true
}

// getNamespace extracts the namespace from the context
func (r *REST) getNamespace(ctx context.Context) (string, error) {
	namespace, ok := request.NamespaceFrom(ctx)
	if !ok {
		err := fmt.Errorf("namespace not found in context")
		klog.Errorf(err.Error())
		return "", err
	}
	return namespace, nil
}

// buildLabelSelector constructs a label selector string from a map of labels
func buildLabelSelector(labels map[string]string) string {
	var selectors []string
	for k, v := range labels {
		selectors = append(selectors, fmt.Sprintf("%s=%s", k, v))
	}
	return strings.Join(selectors, ",")
}

// mergeMaps combines two maps of labels or annotations
func mergeMaps(a, b map[string]string) map[string]string {
	if a == nil && b == nil {
		return nil
	}
	if a == nil {
		return b
	}
	if b == nil {
		return a
	}
	merged := make(map[string]string, len(a)+len(b))
	for k, v := range a {
		merged[k] = v
	}
	for k, v := range b {
		merged[k] = v
	}
	return merged
}

// addPrefixedMap adds the predefined prefix to the keys of a map
func addPrefixedMap(original map[string]string, prefix string) map[string]string {
	if original == nil {
		return nil
	}
	processed := make(map[string]string, len(original))
	for k, v := range original {
		processed[prefix+k] = v
	}
	return processed
}

// filterPrefixedMap filters a map by the predefined prefix and removes the prefix from the keys
func filterPrefixedMap(original map[string]string, prefix string) map[string]string {
	if original == nil {
		return nil
	}
	processed := make(map[string]string)
	for k, v := range original {
		if strings.HasPrefix(k, prefix) {
			newKey := strings.TrimPrefix(k, prefix)
			processed[newKey] = v
		}
	}
	return processed
}

// ConvertHelmReleaseToApplication converts a HelmRelease to an Application
func (r *REST) ConvertHelmReleaseToApplication(hr *unstructured.Unstructured) (appsv1alpha1.Application, error) {
	klog.V(6).Infof("Converting HelmRelease to Application for resource %s", hr.GetName())

	var helmRelease helmv2.HelmRelease
	// Convert unstructured to HelmRelease struct
	err := runtime.DefaultUnstructuredConverter.FromUnstructured(hr.Object, &helmRelease)
	if err != nil {
		klog.Errorf("Error converting from unstructured to HelmRelease: %v", err)
		return appsv1alpha1.Application{}, err
	}

	// Convert HelmRelease struct to Application struct
	app, err := r.convertHelmReleaseToApplication(&helmRelease)
	if err != nil {
		klog.Errorf("Error converting from HelmRelease to Application: %v", err)
		return appsv1alpha1.Application{}, err
	}

	klog.V(6).Infof("Successfully converted HelmRelease %s to Application", hr.GetName())
	return app, nil
}

// ConvertApplicationToHelmRelease converts an Application to a HelmRelease
func (r *REST) ConvertApplicationToHelmRelease(app *appsv1alpha1.Application) (*helmv2.HelmRelease, error) {
	return r.convertApplicationToHelmRelease(app)
}

// convertHelmReleaseToApplication implements the actual conversion logic
func (r *REST) convertHelmReleaseToApplication(hr *helmv2.HelmRelease) (appsv1alpha1.Application, error) {
	app := appsv1alpha1.Application{
		TypeMeta: metav1.TypeMeta{
			APIVersion: "apps.cozystack.io/v1alpha1",
			Kind:       r.kindName,
		},
		ObjectMeta: metav1.ObjectMeta{
			Name:              strings.TrimPrefix(hr.Name, r.releaseConfig.Prefix),
			Namespace:         hr.Namespace,
			UID:               hr.GetUID(),
			ResourceVersion:   hr.GetResourceVersion(),
			CreationTimestamp: hr.CreationTimestamp,
			DeletionTimestamp: hr.DeletionTimestamp,
			Labels:            filterPrefixedMap(hr.Labels, LabelPrefix),
			Annotations:       filterPrefixedMap(hr.Annotations, AnnotationPrefix),
		},
		Spec: hr.Spec.Values,
		Status: appsv1alpha1.ApplicationStatus{
			Version: hr.Status.LastAttemptedRevision,
		},
	}

	var conditions []metav1.Condition
	for _, hrCondition := range hr.GetConditions() {
		if hrCondition.Type == "Ready" || hrCondition.Type == "Released" {
			conditions = append(conditions, metav1.Condition{
				LastTransitionTime: hrCondition.LastTransitionTime,
				Reason:             hrCondition.Reason,
				Message:            hrCondition.Message,
				Status:             hrCondition.Status,
				Type:               hrCondition.Type,
			})
		}
	}
	app.SetConditions(conditions)
	return app, nil
}

// convertApplicationToHelmRelease implements the actual conversion logic
func (r *REST) convertApplicationToHelmRelease(app *appsv1alpha1.Application) (*helmv2.HelmRelease, error) {
	helmRelease := &helmv2.HelmRelease{
		TypeMeta: metav1.TypeMeta{
			APIVersion: "helm.toolkit.fluxcd.io/v2",
			Kind:       "HelmRelease",
		},
		ObjectMeta: metav1.ObjectMeta{
			Name:            r.releaseConfig.Prefix + app.Name,
			Namespace:       app.Namespace,
			Labels:          addPrefixedMap(app.Labels, LabelPrefix),
			Annotations:     addPrefixedMap(app.Annotations, AnnotationPrefix),
			ResourceVersion: app.ObjectMeta.ResourceVersion,
			UID:             app.ObjectMeta.UID,
		},
		Spec: helmv2.HelmReleaseSpec{
			Chart: &helmv2.HelmChartTemplate{
				Spec: helmv2.HelmChartTemplateSpec{
					Chart:             r.releaseConfig.Chart.Name,
					Version:           app.AppVersion,
					ReconcileStrategy: "Revision",
					SourceRef: helmv2.CrossNamespaceObjectReference{
						Kind:      r.releaseConfig.Chart.SourceRef.Kind,
						Name:      r.releaseConfig.Chart.SourceRef.Name,
						Namespace: r.releaseConfig.Chart.SourceRef.Namespace,
					},
				},
			},
			Values: app.Spec,
		},
	}

	return helmRelease, nil
}

// ConvertToTable implements the TableConvertor interface for displaying resources in a table format
func (r *REST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
	klog.V(6).Infof("ConvertToTable: received object of type %T", object)

	var table metav1.Table

	switch obj := object.(type) {
	case *appsv1alpha1.ApplicationList:
		table = r.buildTableFromApplications(obj.Items)
		table.ListMeta.ResourceVersion = obj.ListMeta.ResourceVersion
	case *appsv1alpha1.Application:
		table = r.buildTableFromApplication(*obj)
		table.ListMeta.ResourceVersion = obj.GetResourceVersion()
	case *unstructured.Unstructured:
		var app appsv1alpha1.Application
		err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &app)
		if err != nil {
			klog.Errorf("Failed to convert Unstructured to Application: %v", err)
			return nil, fmt.Errorf("failed to convert Unstructured to Application: %v", err)
		}
		table = r.buildTableFromApplication(app)
		table.ListMeta.ResourceVersion = obj.GetResourceVersion()
	default:
		resource := schema.GroupResource{}
		if info, ok := request.RequestInfoFrom(ctx); ok {
			resource = schema.GroupResource{Group: info.APIGroup, Resource: info.Resource}
		}
		return nil, errNotAcceptable{
			resource: resource,
			message:  "object does not implement the Object interfaces",
		}
	}

	// Handle table options
	if opt, ok := tableOptions.(*metav1.TableOptions); ok && opt != nil && opt.NoHeaders {
		table.ColumnDefinitions = nil
	}

	table.TypeMeta = metav1.TypeMeta{
		APIVersion: "meta.k8s.io/v1",
		Kind:       "Table",
	}

	klog.V(6).Infof("ConvertToTable: returning table with %d rows", len(table.Rows))

	return &table, nil
}

// buildTableFromApplications constructs a table from a list of Applications
func (r *REST) buildTableFromApplications(apps []appsv1alpha1.Application) metav1.Table {
	table := metav1.Table{
		ColumnDefinitions: []metav1.TableColumnDefinition{
			{Name: "NAME", Type: "string", Description: "Name of the Application", Priority: 0},
			{Name: "READY", Type: "string", Description: "Ready status of the Application", Priority: 0},
			{Name: "AGE", Type: "string", Description: "Age of the Application", Priority: 0},
			{Name: "VERSION", Type: "string", Description: "Version of the Application", Priority: 0},
		},
		Rows: make([]metav1.TableRow, 0, len(apps)),
	}
	now := time.Now()

	for _, app := range apps {
		row := metav1.TableRow{
			Cells:  []interface{}{app.GetName(), getReadyStatus(app.Status.Conditions), computeAge(app.GetCreationTimestamp().Time, now), getVersion(app.Status.Version)},
			Object: runtime.RawExtension{Object: &app},
		}
		table.Rows = append(table.Rows, row)
	}

	return table
}

// buildTableFromApplication constructs a table from a single Application
func (r *REST) buildTableFromApplication(app appsv1alpha1.Application) metav1.Table {
	table := metav1.Table{
		ColumnDefinitions: []metav1.TableColumnDefinition{
			{Name: "NAME", Type: "string", Description: "Name of the Application", Priority: 0},
			{Name: "READY", Type: "string", Description: "Ready status of the Application", Priority: 0},
			{Name: "AGE", Type: "string", Description: "Age of the Application", Priority: 0},
			{Name: "VERSION", Type: "string", Description: "Version of the Application", Priority: 0},
		},
		Rows: []metav1.TableRow{},
	}
	now := time.Now()

	row := metav1.TableRow{
		Cells:  []interface{}{app.GetName(), getReadyStatus(app.Status.Conditions), computeAge(app.GetCreationTimestamp().Time, now), getVersion(app.Status.Version)},
		Object: runtime.RawExtension{Object: &app},
	}
	table.Rows = append(table.Rows, row)

	return table
}

// getVersion returns the application version or a placeholder if unknown
func getVersion(version string) string {
	if version == "" {
		return "<unknown>"
	}
	return version
}

// computeAge calculates the age of the object based on CreationTimestamp and current time
func computeAge(creationTime, currentTime time.Time) string {
	ageDuration := currentTime.Sub(creationTime)
	return duration.HumanDuration(ageDuration)
}

// getReadyStatus returns the ready status based on conditions
func getReadyStatus(conditions []metav1.Condition) string {
	for _, condition := range conditions {
		if condition.Type == "Ready" {
			switch condition.Status {
			case metav1.ConditionTrue:
				return "True"
			case metav1.ConditionFalse:
				return "False"
			default:
				return "Unknown"
			}
		}
	}
	return "Unknown"
}

// Destroy releases resources associated with REST
func (r *REST) Destroy() {
	// No additional actions needed to release resources.
}

// New creates a new instance of Application
func (r *REST) New() runtime.Object {
	return &appsv1alpha1.Application{}
}

// NewList returns an empty list of Application objects
func (r *REST) NewList() runtime.Object {
	return &appsv1alpha1.ApplicationList{}
}

// Kind returns the resource kind used for API discovery
func (r *REST) Kind() string {
	return r.gvk.Kind
}

// GroupVersionKind returns the GroupVersionKind for REST
func (r *REST) GroupVersionKind(schema.GroupVersion) schema.GroupVersionKind {
	return r.gvk
}

// errNotAcceptable indicates that the resource does not support conversion to Table
type errNotAcceptable struct {
	resource schema.GroupResource
	message  string
}

func (e errNotAcceptable) Error() string {
	return e.message
}

func (e errNotAcceptable) Status() metav1.Status {
	return metav1.Status{
		Status:  metav1.StatusFailure,
		Code:    http.StatusNotAcceptable,
		Reason:  metav1.StatusReason("NotAcceptable"),
		Message: e.Error(),
	}
}

Затем, используя эту логику, мы реализовали методы Get(), Delete(), List(), Create(). Полный пример того, что получилось, можно посмотреть по этой ссылке.

Код
/*
Copyright 2024 The Cozystack Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package application

import (
	"context"
	"fmt"
	"net/http"
	"strings"
	"sync"
	"time"

	helmv2 "github.com/fluxcd/helm-controller/api/v2"
	metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	fields "k8s.io/apimachinery/pkg/fields"
	labels "k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/apimachinery/pkg/util/duration"
	"k8s.io/apimachinery/pkg/watch"
	"k8s.io/apiserver/pkg/endpoints/request"
	"k8s.io/apiserver/pkg/registry/rest"
	"k8s.io/client-go/dynamic"
	"k8s.io/klog/v2"

	appsv1alpha1 "github.com/aenix.io/cozystack/pkg/apis/apps/v1alpha1"
	"github.com/aenix.io/cozystack/pkg/config"

	// Importing API errors package to construct appropriate error responses
	apierrors "k8s.io/apimachinery/pkg/api/errors"
)

// Ensure REST implements necessary interfaces
var (
	_ rest.Getter          = &REST{}
	_ rest.Lister          = &REST{}
	_ rest.Updater         = &REST{}
	_ rest.Creater         = &REST{}
	_ rest.GracefulDeleter = &REST{}
	_ rest.Watcher         = &REST{}
	_ rest.Patcher         = &REST{}
)

// Define constants for label and annotation prefixes
const (
	LabelPrefix      = "apps.cozystack.io-"
	AnnotationPrefix = "apps.cozystack.io-"
)

// Define the GroupVersionResource for HelmRelease
var helmReleaseGVR = schema.GroupVersionResource{
	Group:    "helm.toolkit.fluxcd.io",
	Version:  "v2",
	Resource: "helmreleases",
}

// REST implements the RESTStorage interface for Application resources
type REST struct {
	dynamicClient dynamic.Interface
	gvr           schema.GroupVersionResource
	gvk           schema.GroupVersionKind
	kindName      string
	releaseConfig config.ReleaseConfig
}

// NewREST creates a new REST storage for Application with specific configuration
func NewREST(dynamicClient dynamic.Interface, config *config.Resource) *REST {
	return &REST{
		dynamicClient: dynamicClient,
		gvr: schema.GroupVersionResource{
			Group:    appsv1alpha1.GroupName,
			Version:  "v1alpha1",
			Resource: config.Application.Plural,
		},
		gvk: schema.GroupVersion{
			Group:   appsv1alpha1.GroupName,
			Version: "v1alpha1",
		}.WithKind(config.Application.Kind),
		kindName:      config.Application.Kind,
		releaseConfig: config.Release,
	}
}

// NamespaceScoped indicates whether the resource is namespaced
func (r *REST) NamespaceScoped() bool {
	return true
}

// GetSingularName returns the singular name of the resource
func (r *REST) GetSingularName() string {
	return r.gvr.Resource
}

// Create handles the creation of a new Application by converting it to a HelmRelease
func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
	// Assert the object is of type Application
	app, ok := obj.(*appsv1alpha1.Application)
	if !ok {
		return nil, fmt.Errorf("expected Application object, got %T", obj)
	}

	// Convert Application to HelmRelease
	helmRelease, err := r.ConvertApplicationToHelmRelease(app)
	if err != nil {
		klog.Errorf("Conversion error: %v", err)
		return nil, fmt.Errorf("conversion error: %v", err)
	}

	// Merge system labels (from config) directly
	helmRelease.Labels = mergeMaps(r.releaseConfig.Labels, helmRelease.Labels)
	// Merge user labels with prefix
	helmRelease.Labels = mergeMaps(helmRelease.Labels, addPrefixedMap(app.Labels, LabelPrefix))
	// Note: Annotations from config are not handled as r.releaseConfig.Annotations is undefined

	// Convert HelmRelease to unstructured format
	unstructuredHR, err := runtime.DefaultUnstructuredConverter.ToUnstructured(helmRelease)
	if err != nil {
		klog.Errorf("Failed to convert HelmRelease to unstructured: %v", err)
		return nil, fmt.Errorf("failed to convert HelmRelease to unstructured: %v", err)
	}

	klog.V(6).Infof("Creating HelmRelease %s in namespace %s", helmRelease.Name, app.Namespace)

	// Create HelmRelease in Kubernetes
	createdHR, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(app.Namespace).Create(ctx, &unstructured.Unstructured{Object: unstructuredHR}, *options)
	if err != nil {
		klog.Errorf("Failed to create HelmRelease %s: %v", helmRelease.Name, err)
		return nil, fmt.Errorf("failed to create HelmRelease: %v", err)
	}

	// Convert the created HelmRelease back to Application
	convertedApp, err := r.ConvertHelmReleaseToApplication(createdHR)
	if err != nil {
		klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", createdHR.GetName(), err)
		return nil, fmt.Errorf("conversion error: %v", err)
	}

	klog.V(6).Infof("Successfully created and converted HelmRelease %s to Application", createdHR.GetName())

	// Convert Application to unstructured format
	unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&convertedApp)
	if err != nil {
		klog.Errorf("Failed to convert Application to unstructured for resource %s: %v", convertedApp.GetName(), err)
		return nil, fmt.Errorf("failed to convert Application to unstructured: %v", err)
	}

	klog.V(6).Infof("Successfully retrieved and converted resource %s of type %s to unstructured", convertedApp.GetName(), r.gvr.Resource)
	return &unstructured.Unstructured{Object: unstructuredApp}, nil
}

// Get retrieves an Application by converting the corresponding HelmRelease
func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
	namespace, err := r.getNamespace(ctx)
	if err != nil {
		klog.Errorf("Failed to get namespace: %v", err)
		return nil, err
	}

	klog.V(6).Infof("Attempting to retrieve resource %s of type %s in namespace %s", name, r.gvr.Resource, namespace)

	// Get the corresponding HelmRelease using the new prefix
	helmReleaseName := r.releaseConfig.Prefix + name
	hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, helmReleaseName, *options)
	if err != nil {
		klog.Errorf("Error retrieving HelmRelease for resource %s: %v", name, err)

		// Check if the error is a NotFound error
		if apierrors.IsNotFound(err) {
			// Return a NotFound error for the Application resource instead of HelmRelease
			return nil, apierrors.NewNotFound(r.gvr.GroupResource(), name)
		}

		// For other errors, return them as-is
		return nil, err
	}

	// Check if HelmRelease meets the required chartName and sourceRef criteria
	if !r.shouldIncludeHelmRelease(hr) {
		klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmReleaseName)
		// Return a NotFound error for the Application resource
		return nil, apierrors.NewNotFound(r.gvr.GroupResource(), name)
	}

	// Convert HelmRelease to Application
	convertedApp, err := r.ConvertHelmReleaseToApplication(hr)
	if err != nil {
		klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", name, err)
		return nil, fmt.Errorf("conversion error: %v", err)
	}

	// Explicitly set apiVersion and kind for Application
	convertedApp.TypeMeta = metav1.TypeMeta{
		APIVersion: "apps.cozystack.io/v1alpha1",
		Kind:       r.kindName,
	}

	// Convert Application to unstructured format
	unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&convertedApp)
	if err != nil {
		klog.Errorf("Failed to convert Application to unstructured for resource %s: %v", name, err)
		return nil, fmt.Errorf("failed to convert Application to unstructured: %v", err)
	}

	// Explicitly set apiVersion and kind in unstructured object
	unstructuredApp["apiVersion"] = "apps.cozystack.io/v1alpha1"
	unstructuredApp["kind"] = r.kindName

	klog.V(6).Infof("Successfully retrieved and converted resource %s of kind %s to unstructured", name, r.gvr.Resource)
	return &unstructured.Unstructured{Object: unstructuredApp}, nil
}

// List retrieves a list of Applications by converting HelmReleases
func (r *REST) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
	namespace, err := r.getNamespace(ctx)
	if err != nil {
		klog.Errorf("Failed to get namespace: %v", err)
		return nil, err
	}

	klog.V(6).Infof("Attempting to list HelmReleases in namespace %s with options: %v", namespace, options)

	// Get resource name from the request (if any)
	var resourceName string
	if requestInfo, ok := request.RequestInfoFrom(ctx); ok {
		resourceName = requestInfo.Name
	}

	// Initialize variables for selector mapping
	var helmFieldSelector string
	var helmLabelSelector string

	// Process field.selector
	if options.FieldSelector != nil {
		fs, err := fields.ParseSelector(options.FieldSelector.String())
		if err != nil {
			klog.Errorf("Invalid field selector: %v", err)
			return nil, fmt.Errorf("invalid field selector: %v", err)
		}

		// Check if selector is for metadata.name
		if name, exists := fs.RequiresExactMatch("metadata.name"); exists {
			// Convert Application name to HelmRelease name
			mappedName := r.releaseConfig.Prefix + name
			// Create new field.selector for HelmRelease
			helmFieldSelector = fields.OneTermEqualSelector("metadata.name", mappedName).String()
		} else {
			// If field.selector contains other fields, map them directly
			helmFieldSelector = fs.String()
		}
	}

	// Process label.selector
	if options.LabelSelector != nil {
		ls := options.LabelSelector.String()
		parsedLabels, err := labels.Parse(ls)
		if err != nil {
			klog.Errorf("Invalid label selector: %v", err)
			return nil, fmt.Errorf("invalid label selector: %v", err)
		}
		if !parsedLabels.Empty() {
			reqs, _ := parsedLabels.Requirements()
			var prefixedReqs []labels.Requirement
			for _, req := range reqs {
				// Add prefix to each label key
				prefixedReq, err := labels.NewRequirement(LabelPrefix+req.Key(), req.Operator(), req.Values().List())
				if err != nil {
					klog.Errorf("Error prefixing label key: %v", err)
					return nil, fmt.Errorf("error prefixing label key: %v", err)
				}
				prefixedReqs = append(prefixedReqs, *prefixedReq)
			}
			helmLabelSelector = labels.NewSelector().Add(prefixedReqs...).String()
		}
	}

	// Set ListOptions for HelmRelease with selector mapping
	metaOptions := metav1.ListOptions{
		FieldSelector: helmFieldSelector,
		LabelSelector: helmLabelSelector,
	}

	// List HelmReleases with mapped selectors
	hrList, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).List(ctx, metaOptions)
	if err != nil {
		klog.Errorf("Error listing HelmReleases: %v", err)
		return nil, err
	}

	// Initialize empty Application list
	appList := &appsv1alpha1.ApplicationList{
		TypeMeta: metav1.TypeMeta{
			APIVersion: "apps.cozystack.io/v1alpha1",
			Kind:       "ApplicationList",
		},
		ListMeta: metav1.ListMeta{
			ResourceVersion: hrList.GetResourceVersion(),
		},
		Items: []appsv1alpha1.Application{},
	}

	// Iterate over HelmReleases and convert to Applications
	for _, hr := range hrList.Items {
		if !r.shouldIncludeHelmRelease(&hr) {
			continue
		}

		app, err := r.ConvertHelmReleaseToApplication(&hr)
		if err != nil {
			klog.Errorf("Error converting HelmRelease %s to Application: %v", hr.GetName(), err)
			continue
		}

		// If resourceName is set, check for match
		if resourceName != "" && app.Name != resourceName {
			continue
		}

		// Apply label.selector
		if options.LabelSelector != nil {
			sel, err := labels.Parse(options.LabelSelector.String())
			if err != nil {
				klog.Errorf("Invalid label selector: %v", err)
				continue
			}
			if !sel.Matches(labels.Set(app.Labels)) {
				continue
			}
		}

		// Apply field.selector by name and namespace (if specified)
		if options.FieldSelector != nil {
			fs, err := fields.ParseSelector(options.FieldSelector.String())
			if err != nil {
				klog.Errorf("Invalid field selector: %v", err)
				continue
			}

			fieldsSet := fields.Set{
				"metadata.name":      app.Name,
				"metadata.namespace": app.Namespace,
			}
			if !fs.Matches(fieldsSet) {
				continue
			}
		}

		appList.Items = append(appList.Items, app)
	}

	klog.V(6).Infof("Successfully listed %d Application resources in namespace %s", len(appList.Items), namespace)
	return appList, nil
}

// Update updates an existing Application by converting it to a HelmRelease
func (r *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
	// Retrieve the existing Application
	oldObj, err := r.Get(ctx, name, &metav1.GetOptions{})
	if err != nil {
		if apierrors.IsNotFound(err) {
			if !forceAllowCreate {
				return nil, false, err
			}
			// If not found and force allow create, create a new one
			obj, err := objInfo.UpdatedObject(ctx, nil)
			if err != nil {
				klog.Errorf("Failed to get updated object: %v", err)
				return nil, false, err
			}
			createdObj, err := r.Create(ctx, obj, createValidation, &metav1.CreateOptions{})
			if err != nil {
				klog.Errorf("Failed to create new Application: %v", err)
				return nil, false, err
			}
			return createdObj, true, nil
		}
		klog.Errorf("Failed to get existing Application %s: %v", name, err)
		return nil, false, err
	}

	// Update the Application object
	newObj, err := objInfo.UpdatedObject(ctx, oldObj)
	if err != nil {
		klog.Errorf("Failed to get updated object: %v", err)
		return nil, false, err
	}

	// Validate the update if a validation function is provided
	if updateValidation != nil {
		if err := updateValidation(ctx, newObj, oldObj); err != nil {
			klog.Errorf("Update validation failed for Application %s: %v", name, err)
			return nil, false, err
		}
	}

	// Assert the new object is of type Application
	app, ok := newObj.(*appsv1alpha1.Application)
	if !ok {
		errMsg := fmt.Sprintf("expected Application object, got %T", newObj)
		klog.Errorf(errMsg)
		return nil, false, fmt.Errorf(errMsg)
	}

	// Convert Application to HelmRelease
	helmRelease, err := r.ConvertApplicationToHelmRelease(app)
	if err != nil {
		klog.Errorf("Conversion error: %v", err)
		return nil, false, fmt.Errorf("conversion error: %v", err)
	}

	// Merge system labels (from config) directly
	helmRelease.Labels = mergeMaps(r.releaseConfig.Labels, helmRelease.Labels)
	// Merge user labels with prefix
	helmRelease.Labels = mergeMaps(helmRelease.Labels, addPrefixedMap(app.Labels, LabelPrefix))
	// Note: Annotations from config are not handled as r.releaseConfig.Annotations is undefined

	// Convert HelmRelease to unstructured format
	unstructuredHR, err := runtime.DefaultUnstructuredConverter.ToUnstructured(helmRelease)
	if err != nil {
		klog.Errorf("Failed to convert HelmRelease to unstructured: %v", err)
		return nil, false, fmt.Errorf("failed to convert HelmRelease to unstructured: %v", err)
	}

	// Retrieve metadata from unstructured object
	metadata, found, err := unstructured.NestedMap(unstructuredHR, "metadata")
	if err != nil || !found {
		klog.Errorf("Failed to retrieve metadata from HelmRelease: %v, found: %v", err, found)
		return nil, false, fmt.Errorf("failed to retrieve metadata from HelmRelease: %v", err)
	}
	klog.V(6).Infof("HelmRelease Metadata: %+v", metadata)

	klog.V(6).Infof("Updating HelmRelease %s in namespace %s", helmRelease.Name, helmRelease.Namespace)

	// Before updating, ensure the HelmRelease meets the inclusion criteria
	// This prevents updating HelmReleases that should not be managed as Applications
	if !r.shouldIncludeHelmRelease(&unstructured.Unstructured{Object: unstructuredHR}) {
		klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmRelease.Name)
		// Return a NotFound error for the Application resource
		return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
	}

	// Update the HelmRelease in Kubernetes
	resultHR, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(helmRelease.Namespace).Update(ctx, &unstructured.Unstructured{Object: unstructuredHR}, metav1.UpdateOptions{})
	if err != nil {
		klog.Errorf("Failed to update HelmRelease %s: %v", helmRelease.Name, err)
		return nil, false, fmt.Errorf("failed to update HelmRelease: %v", err)
	}

	// After updating, ensure the updated HelmRelease still meets the inclusion criteria
	if !r.shouldIncludeHelmRelease(resultHR) {
		klog.Errorf("Updated HelmRelease %s does not match the required chartName and sourceRef criteria", resultHR.GetName())
		// Return a NotFound error for the Application resource
		return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
	}

	// Convert the updated HelmRelease back to Application
	convertedApp, err := r.ConvertHelmReleaseToApplication(resultHR)
	if err != nil {
		klog.Errorf("Conversion error from HelmRelease to Application for resource %s: %v", resultHR.GetName(), err)
		return nil, false, fmt.Errorf("conversion error: %v", err)
	}

	klog.V(6).Infof("Successfully updated and converted HelmRelease %s to Application", resultHR.GetName())

	// Explicitly set apiVersion and kind for Application
	convertedApp.TypeMeta = metav1.TypeMeta{
		APIVersion: "apps.cozystack.io/v1alpha1",
		Kind:       r.kindName,
	}

	// Convert Application to unstructured format
	unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&convertedApp)
	if err != nil {
		klog.Errorf("Failed to convert Application to unstructured for resource %s: %v", convertedApp.GetName(), err)
		return nil, false, fmt.Errorf("failed to convert Application to unstructured: %v", err)
	}

	// Explicitly set apiVersion and kind in unstructured object
	unstructuredApp["apiVersion"] = "apps.cozystack.io/v1alpha1"
	unstructuredApp["kind"] = r.kindName

	klog.V(6).Infof("Returning patched Application object: %+v", unstructuredApp)

	return &unstructured.Unstructured{Object: unstructuredApp}, false, nil
}

// Delete removes an Application by deleting the corresponding HelmRelease
func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
	namespace, err := r.getNamespace(ctx)
	if err != nil {
		klog.Errorf("Failed to get namespace: %v", err)
		return nil, false, err
	}

	klog.V(6).Infof("Attempting to delete HelmRelease %s in namespace %s", name, namespace)

	// Construct HelmRelease name with the configured prefix
	helmReleaseName := r.releaseConfig.Prefix + name

	// Retrieve the HelmRelease before attempting to delete
	hr, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Get(ctx, helmReleaseName, metav1.GetOptions{})
	if err != nil {
		if apierrors.IsNotFound(err) {
			// If HelmRelease does not exist, return NotFound error for Application
			klog.Errorf("HelmRelease %s not found in namespace %s", helmReleaseName, namespace)
			return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
		}
		// For other errors, log and return
		klog.Errorf("Error retrieving HelmRelease %s: %v", helmReleaseName, err)
		return nil, false, err
	}

	// Validate that the HelmRelease meets the inclusion criteria
	if !r.shouldIncludeHelmRelease(hr) {
		klog.Errorf("HelmRelease %s does not match the required chartName and sourceRef criteria", helmReleaseName)
		// Return NotFound error for Application resource
		return nil, false, apierrors.NewNotFound(r.gvr.GroupResource(), name)
	}

	klog.V(6).Infof("Deleting HelmRelease %s in namespace %s", helmReleaseName, namespace)

	// Delete the HelmRelease corresponding to the Application
	err = r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Delete(ctx, helmReleaseName, *options)
	if err != nil {
		klog.Errorf("Failed to delete HelmRelease %s: %v", helmReleaseName, err)
		return nil, false, fmt.Errorf("failed to delete HelmRelease: %v", err)
	}

	klog.V(6).Infof("Successfully deleted HelmRelease %s", helmReleaseName)
	return nil, true, nil
}

// Watch sets up a watch on HelmReleases, filters them based on sourceRef and prefix, and converts events to Applications
func (r *REST) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
	namespace, err := r.getNamespace(ctx)
	if err != nil {
		klog.Errorf("Failed to get namespace: %v", err)
		return nil, err
	}

	klog.V(6).Infof("Setting up watch for HelmReleases in namespace %s with options: %v", namespace, options)

	// Get request information, including resource name if specified
	var resourceName string
	if requestInfo, ok := request.RequestInfoFrom(ctx); ok {
		resourceName = requestInfo.Name
	}

	// Initialize variables for selector mapping
	var helmFieldSelector string
	var helmLabelSelector string

	// Process field.selector
	if options.FieldSelector != nil {
		fs, err := fields.ParseSelector(options.FieldSelector.String())
		if err != nil {
			klog.Errorf("Invalid field selector: %v", err)
			return nil, fmt.Errorf("invalid field selector: %v", err)
		}

		// Check if selector is for metadata.name
		if name, exists := fs.RequiresExactMatch("metadata.name"); exists {
			// Convert Application name to HelmRelease name
			mappedName := r.releaseConfig.Prefix + name
			// Create new field.selector for HelmRelease
			helmFieldSelector = fields.OneTermEqualSelector("metadata.name", mappedName).String()
		} else {
			// If field.selector contains other fields, map them directly
			helmFieldSelector = fs.String()
		}
	}

	// Process label.selector
	if options.LabelSelector != nil {
		ls := options.LabelSelector.String()
		parsedLabels, err := labels.Parse(ls)
		if err != nil {
			klog.Errorf("Invalid label selector: %v", err)
			return nil, fmt.Errorf("invalid label selector: %v", err)
		}
		if !parsedLabels.Empty() {
			reqs, _ := parsedLabels.Requirements()
			var prefixedReqs []labels.Requirement
			for _, req := range reqs {
				// Add prefix to each label key
				prefixedReq, err := labels.NewRequirement(LabelPrefix+req.Key(), req.Operator(), req.Values().List())
				if err != nil {
					klog.Errorf("Error prefixing label key: %v", err)
					return nil, fmt.Errorf("error prefixing label key: %v", err)
				}
				prefixedReqs = append(prefixedReqs, *prefixedReq)
			}
			helmLabelSelector = labels.NewSelector().Add(prefixedReqs...).String()
		}
	}

	// Set ListOptions for HelmRelease with selector mapping
	metaOptions := metav1.ListOptions{
		Watch:           true,
		ResourceVersion: options.ResourceVersion,
		FieldSelector:   helmFieldSelector,
		LabelSelector:   helmLabelSelector,
	}

	// Start watch on HelmRelease with mapped selectors
	helmWatcher, err := r.dynamicClient.Resource(helmReleaseGVR).Namespace(namespace).Watch(ctx, metaOptions)
	if err != nil {
		klog.Errorf("Error setting up watch for HelmReleases: %v", err)
		return nil, err
	}

	// Create a custom watcher to transform events
	customW := &customWatcher{
		resultChan: make(chan watch.Event),
		stopChan:   make(chan struct{}),
	}

	go func() {
		defer close(customW.resultChan)
		for {
			select {
			case event, ok := <-helmWatcher.ResultChan():
				if !ok {
					// The watcher has been closed, attempt to re-establish the watch
					klog.Warning("HelmRelease watcher closed, attempting to re-establish")
					// Implement retry logic or exit based on your requirements
					return
				}

				// Check if the object is a *v1.Status
				if status, ok := event.Object.(*metav1.Status); ok {
					klog.V(4).Infof("Received Status object in HelmRelease watch: %v", status.Message)
					continue // Skip processing this event
				}

				// Proceed with processing Unstructured objects
				matches, err := r.isRelevantHelmRelease(&event)
				if err != nil {
					klog.V(4).Infof("Non-critical error filtering HelmRelease event: %v", err)
					continue
				}

				if !matches {
					continue
				}

				// Convert HelmRelease to Application
				app, err := r.ConvertHelmReleaseToApplication(event.Object.(*unstructured.Unstructured))
				if err != nil {
					klog.Errorf("Error converting HelmRelease to Application: %v", err)
					continue
				}

				// Apply field.selector by name if specified
				if resourceName != "" && app.Name != resourceName {
					continue
				}

				// Apply label.selector
				if options.LabelSelector != nil {
					sel, err := labels.Parse(options.LabelSelector.String())
					if err != nil {
						klog.Errorf("Invalid label selector: %v", err)
						continue
					}
					if !sel.Matches(labels.Set(app.Labels)) {
						continue
					}
				}

				// Convert Application to unstructured
				unstructuredApp, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&app)
				if err != nil {
					klog.Errorf("Failed to convert Application to unstructured: %v", err)
					continue
				}

				// Create watch event with Application object
				appEvent := watch.Event{
					Type:   event.Type,
					Object: &unstructured.Unstructured{Object: unstructuredApp},
				}

				// Send event to custom watcher
				select {
				case customW.resultChan <- appEvent:
				case <-customW.stopChan:
					return
				case <-ctx.Done():
					return
				}

			case <-customW.stopChan:
				return
			case <-ctx.Done():
				return
			}
		}
	}()

	klog.V(6).Infof("Custom watch established successfully")
	return customW, nil
}

// Helper function to get HelmRelease name from object
func helmReleaseName(obj runtime.Object) string {
	if u, ok := obj.(*unstructured.Unstructured); ok {
		return u.GetName()
	}
	return "<unknown>"
}

// customWatcher wraps the original watcher and filters/converts events
type customWatcher struct {
	resultChan chan watch.Event
	stopChan   chan struct{}
	stopOnce   sync.Once
}

// Stop terminates the watch
func (cw *customWatcher) Stop() {
	cw.stopOnce.Do(func() {
		close(cw.stopChan)
	})
}

// ResultChan returns the event channel
func (cw *customWatcher) ResultChan() <-chan watch.Event {
	return cw.resultChan
}

// isRelevantHelmRelease checks if the HelmRelease meets the sourceRef and prefix criteria
func (r *REST) isRelevantHelmRelease(event *watch.Event) (bool, error) {
	if event.Object == nil {
		return false, nil
	}

	// Check if the object is a *v1.Status
	if status, ok := event.Object.(*metav1.Status); ok {
		// Log at a less severe level or handle specific status errors if needed
		klog.V(4).Infof("Received Status object in HelmRelease watch: %v", status.Message)
		return false, nil // Not relevant for processing as a HelmRelease
	}

	// Proceed if it's an Unstructured object
	hr, ok := event.Object.(*unstructured.Unstructured)
	if !ok {
		return false, fmt.Errorf("expected Unstructured object, got %T", event.Object)
	}

	return r.shouldIncludeHelmRelease(hr), nil
}

// shouldIncludeHelmRelease determines if a HelmRelease should be included based on filtering criteria
func (r *REST) shouldIncludeHelmRelease(hr *unstructured.Unstructured) bool {
	// Filter by Chart Name
	chartName, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "chart")
	if err != nil || !found {
		klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.chart field: %v", hr.GetName(), err)
		return false
	}
	if chartName != r.releaseConfig.Chart.Name {
		klog.V(6).Infof("HelmRelease %s chart name %s does not match expected %s", hr.GetName(), chartName, r.releaseConfig.Chart.Name)
		return false
	}

	// Filter by SourceRefConfig and Prefix
	return r.matchesSourceRefAndPrefix(hr)
}

// matchesSourceRefAndPrefix checks both SourceRefConfig and Prefix criteria
func (r *REST) matchesSourceRefAndPrefix(hr *unstructured.Unstructured) bool {
	// Extract SourceRef fields
	sourceRefKind, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "kind")
	if err != nil || !found {
		klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.kind field: %v", hr.GetName(), err)
		return false
	}
	sourceRefName, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "name")
	if err != nil || !found {
		klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.name field: %v", hr.GetName(), err)
		return false
	}
	sourceRefNamespace, found, err := unstructured.NestedString(hr.Object, "spec", "chart", "spec", "sourceRef", "namespace")
	if err != nil || !found {
		klog.V(6).Infof("HelmRelease %s missing spec.chart.spec.sourceRef.namespace field: %v", hr.GetName(), err)
		return false
	}

	// Check if SourceRef matches the configuration
	if sourceRefKind != r.releaseConfig.Chart.SourceRef.Kind ||
		sourceRefName != r.releaseConfig.Chart.SourceRef.Name ||
		sourceRefNamespace != r.releaseConfig.Chart.SourceRef.Namespace {
		klog.V(6).Infof("HelmRelease %s sourceRef does not match expected values", hr.GetName())
		return false
	}

	// Additional filtering by Prefix
	name := hr.GetName()
	if !strings.HasPrefix(name, r.releaseConfig.Prefix) {
		klog.V(6).Infof("HelmRelease %s does not have the expected prefix %s", name, r.releaseConfig.Prefix)
		return false
	}

	return true
}

// getNamespace extracts the namespace from the context
func (r *REST) getNamespace(ctx context.Context) (string, error) {
	namespace, ok := request.NamespaceFrom(ctx)
	if !ok {
		err := fmt.Errorf("namespace not found in context")
		klog.Errorf(err.Error())
		return "", err
	}
	return namespace, nil
}

// buildLabelSelector constructs a label selector string from a map of labels
func buildLabelSelector(labels map[string]string) string {
	var selectors []string
	for k, v := range labels {
		selectors = append(selectors, fmt.Sprintf("%s=%s", k, v))
	}
	return strings.Join(selectors, ",")
}

// mergeMaps combines two maps of labels or annotations
func mergeMaps(a, b map[string]string) map[string]string {
	if a == nil && b == nil {
		return nil
	}
	if a == nil {
		return b
	}
	if b == nil {
		return a
	}
	merged := make(map[string]string, len(a)+len(b))
	for k, v := range a {
		merged[k] = v
	}
	for k, v := range b {
		merged[k] = v
	}
	return merged
}

// addPrefixedMap adds the predefined prefix to the keys of a map
func addPrefixedMap(original map[string]string, prefix string) map[string]string {
	if original == nil {
		return nil
	}
	processed := make(map[string]string, len(original))
	for k, v := range original {
		processed[prefix+k] = v
	}
	return processed
}

// filterPrefixedMap filters a map by the predefined prefix and removes the prefix from the keys
func filterPrefixedMap(original map[string]string, prefix string) map[string]string {
	if original == nil {
		return nil
	}
	processed := make(map[string]string)
	for k, v := range original {
		if strings.HasPrefix(k, prefix) {
			newKey := strings.TrimPrefix(k, prefix)
			processed[newKey] = v
		}
	}
	return processed
}

// ConvertHelmReleaseToApplication converts a HelmRelease to an Application
func (r *REST) ConvertHelmReleaseToApplication(hr *unstructured.Unstructured) (appsv1alpha1.Application, error) {
	klog.V(6).Infof("Converting HelmRelease to Application for resource %s", hr.GetName())

	var helmRelease helmv2.HelmRelease
	// Convert unstructured to HelmRelease struct
	err := runtime.DefaultUnstructuredConverter.FromUnstructured(hr.Object, &helmRelease)
	if err != nil {
		klog.Errorf("Error converting from unstructured to HelmRelease: %v", err)
		return appsv1alpha1.Application{}, err
	}

	// Convert HelmRelease struct to Application struct
	app, err := r.convertHelmReleaseToApplication(&helmRelease)
	if err != nil {
		klog.Errorf("Error converting from HelmRelease to Application: %v", err)
		return appsv1alpha1.Application{}, err
	}

	klog.V(6).Infof("Successfully converted HelmRelease %s to Application", hr.GetName())
	return app, nil
}

// ConvertApplicationToHelmRelease converts an Application to a HelmRelease
func (r *REST) ConvertApplicationToHelmRelease(app *appsv1alpha1.Application) (*helmv2.HelmRelease, error) {
	return r.convertApplicationToHelmRelease(app)
}

// convertHelmReleaseToApplication implements the actual conversion logic
func (r *REST) convertHelmReleaseToApplication(hr *helmv2.HelmRelease) (appsv1alpha1.Application, error) {
	app := appsv1alpha1.Application{
		TypeMeta: metav1.TypeMeta{
			APIVersion: "apps.cozystack.io/v1alpha1",
			Kind:       r.kindName,
		},
		ObjectMeta: metav1.ObjectMeta{
			Name:              strings.TrimPrefix(hr.Name, r.releaseConfig.Prefix),
			Namespace:         hr.Namespace,
			UID:               hr.GetUID(),
			ResourceVersion:   hr.GetResourceVersion(),
			CreationTimestamp: hr.CreationTimestamp,
			DeletionTimestamp: hr.DeletionTimestamp,
			Labels:            filterPrefixedMap(hr.Labels, LabelPrefix),
			Annotations:       filterPrefixedMap(hr.Annotations, AnnotationPrefix),
		},
		Spec: hr.Spec.Values,
		Status: appsv1alpha1.ApplicationStatus{
			Version: hr.Status.LastAttemptedRevision,
		},
	}

	var conditions []metav1.Condition
	for _, hrCondition := range hr.GetConditions() {
		if hrCondition.Type == "Ready" || hrCondition.Type == "Released" {
			conditions = append(conditions, metav1.Condition{
				LastTransitionTime: hrCondition.LastTransitionTime,
				Reason:             hrCondition.Reason,
				Message:            hrCondition.Message,
				Status:             hrCondition.Status,
				Type:               hrCondition.Type,
			})
		}
	}
	app.SetConditions(conditions)
	return app, nil
}

// convertApplicationToHelmRelease implements the actual conversion logic
func (r *REST) convertApplicationToHelmRelease(app *appsv1alpha1.Application) (*helmv2.HelmRelease, error) {
	helmRelease := &helmv2.HelmRelease{
		TypeMeta: metav1.TypeMeta{
			APIVersion: "helm.toolkit.fluxcd.io/v2",
			Kind:       "HelmRelease",
		},
		ObjectMeta: metav1.ObjectMeta{
			Name:            r.releaseConfig.Prefix + app.Name,
			Namespace:       app.Namespace,
			Labels:          addPrefixedMap(app.Labels, LabelPrefix),
			Annotations:     addPrefixedMap(app.Annotations, AnnotationPrefix),
			ResourceVersion: app.ObjectMeta.ResourceVersion,
			UID:             app.ObjectMeta.UID,
		},
		Spec: helmv2.HelmReleaseSpec{
			Chart: &helmv2.HelmChartTemplate{
				Spec: helmv2.HelmChartTemplateSpec{
					Chart:             r.releaseConfig.Chart.Name,
					Version:           app.AppVersion,
					ReconcileStrategy: "Revision",
					SourceRef: helmv2.CrossNamespaceObjectReference{
						Kind:      r.releaseConfig.Chart.SourceRef.Kind,
						Name:      r.releaseConfig.Chart.SourceRef.Name,
						Namespace: r.releaseConfig.Chart.SourceRef.Namespace,
					},
				},
			},
			Values: app.Spec,
		},
	}

	return helmRelease, nil
}

// ConvertToTable implements the TableConvertor interface for displaying resources in a table format
func (r *REST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
	klog.V(6).Infof("ConvertToTable: received object of type %T", object)

	var table metav1.Table

	switch obj := object.(type) {
	case *appsv1alpha1.ApplicationList:
		table = r.buildTableFromApplications(obj.Items)
		table.ListMeta.ResourceVersion = obj.ListMeta.ResourceVersion
	case *appsv1alpha1.Application:
		table = r.buildTableFromApplication(*obj)
		table.ListMeta.ResourceVersion = obj.GetResourceVersion()
	case *unstructured.Unstructured:
		var app appsv1alpha1.Application
		err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), &app)
		if err != nil {
			klog.Errorf("Failed to convert Unstructured to Application: %v", err)
			return nil, fmt.Errorf("failed to convert Unstructured to Application: %v", err)
		}
		table = r.buildTableFromApplication(app)
		table.ListMeta.ResourceVersion = obj.GetResourceVersion()
	default:
		resource := schema.GroupResource{}
		if info, ok := request.RequestInfoFrom(ctx); ok {
			resource = schema.GroupResource{Group: info.APIGroup, Resource: info.Resource}
		}
		return nil, errNotAcceptable{
			resource: resource,
			message:  "object does not implement the Object interfaces",
		}
	}

	// Handle table options
	if opt, ok := tableOptions.(*metav1.TableOptions); ok && opt != nil && opt.NoHeaders {
		table.ColumnDefinitions = nil
	}

	table.TypeMeta = metav1.TypeMeta{
		APIVersion: "meta.k8s.io/v1",
		Kind:       "Table",
	}

	klog.V(6).Infof("ConvertToTable: returning table with %d rows", len(table.Rows))

	return &table, nil
}

// buildTableFromApplications constructs a table from a list of Applications
func (r *REST) buildTableFromApplications(apps []appsv1alpha1.Application) metav1.Table {
	table := metav1.Table{
		ColumnDefinitions: []metav1.TableColumnDefinition{
			{Name: "NAME", Type: "string", Description: "Name of the Application", Priority: 0},
			{Name: "READY", Type: "string", Description: "Ready status of the Application", Priority: 0},
			{Name: "AGE", Type: "string", Description: "Age of the Application", Priority: 0},
			{Name: "VERSION", Type: "string", Description: "Version of the Application", Priority: 0},
		},
		Rows: make([]metav1.TableRow, 0, len(apps)),
	}
	now := time.Now()

	for _, app := range apps {
		row := metav1.TableRow{
			Cells:  []interface{}{app.GetName(), getReadyStatus(app.Status.Conditions), computeAge(app.GetCreationTimestamp().Time, now), getVersion(app.Status.Version)},
			Object: runtime.RawExtension{Object: &app},
		}
		table.Rows = append(table.Rows, row)
	}

	return table
}

// buildTableFromApplication constructs a table from a single Application
func (r *REST) buildTableFromApplication(app appsv1alpha1.Application) metav1.Table {
	table := metav1.Table{
		ColumnDefinitions: []metav1.TableColumnDefinition{
			{Name: "NAME", Type: "string", Description: "Name of the Application", Priority: 0},
			{Name: "READY", Type: "string", Description: "Ready status of the Application", Priority: 0},
			{Name: "AGE", Type: "string", Description: "Age of the Application", Priority: 0},
			{Name: "VERSION", Type: "string", Description: "Version of the Application", Priority: 0},
		},
		Rows: []metav1.TableRow{},
	}
	now := time.Now()

	row := metav1.TableRow{
		Cells:  []interface{}{app.GetName(), getReadyStatus(app.Status.Conditions), computeAge(app.GetCreationTimestamp().Time, now), getVersion(app.Status.Version)},
		Object: runtime.RawExtension{Object: &app},
	}
	table.Rows = append(table.Rows, row)

	return table
}

// getVersion returns the application version or a placeholder if unknown
func getVersion(version string) string {
	if version == "" {
		return "<unknown>"
	}
	return version
}

// computeAge calculates the age of the object based on CreationTimestamp and current time
func computeAge(creationTime, currentTime time.Time) string {
	ageDuration := currentTime.Sub(creationTime)
	return duration.HumanDuration(ageDuration)
}

// getReadyStatus returns the ready status based on conditions
func getReadyStatus(conditions []metav1.Condition) string {
	for _, condition := range conditions {
		if condition.Type == "Ready" {
			switch condition.Status {
			case metav1.ConditionTrue:
				return "True"
			case metav1.ConditionFalse:
				return "False"
			default:
				return "Unknown"
			}
		}
	}
	return "Unknown"
}

// Destroy releases resources associated with REST
func (r *REST) Destroy() {
	// No additional actions needed to release resources.
}

// New creates a new instance of Application
func (r *REST) New() runtime.Object {
	return &appsv1alpha1.Application{}
}

// NewList returns an empty list of Application objects
func (r *REST) NewList() runtime.Object {
	return &appsv1alpha1.ApplicationList{}
}

// Kind returns the resource kind used for API discovery
func (r *REST) Kind() string {
	return r.gvk.Kind
}

// GroupVersionKind returns the GroupVersionKind for REST
func (r *REST) GroupVersionKind(schema.GroupVersion) schema.GroupVersionKind {
	return r.gvk
}

// errNotAcceptable indicates that the resource does not support conversion to Table
type errNotAcceptable struct {
	resource schema.GroupResource
	message  string
}

func (e errNotAcceptable) Error() string {
	return e.message
}

func (e errNotAcceptable) Status() metav1.Status {
	return metav1.Status{
		Status:  metav1.StatusFailure,
		Code:    http.StatusNotAcceptable,
		Reason:  metav1.StatusReason("NotAcceptable"),
		Message: e.Error(),
	}
}

В конце каждого метода мы устанавливаем правильный Kind и возвращаем объект unstructured.Unstructured{}, чтобы Kubernetes правильно этот объект сериализовал. В противном случае он всегда будет сериализовывать их с kind: Application, а это нам не нужно.

Что мы получили

В итоге в Cozystack все наши типы из ConfigMap доступны в Kubernetes как есть:

# kubectl api-resources | grep cozystack
buckets                 	apps.cozystack.io/v1alpha1     	true     	Bucket
clickhouses             	apps.cozystack.io/v1alpha1     	true     	ClickHouse
etcds                   	apps.cozystack.io/v1alpha1     	true     	Etcd
ferretdb                	apps.cozystack.io/v1alpha1     	true     	FerretDB
httpcaches              	apps.cozystack.io/v1alpha1     	true     	HTTPCache
ingresses               	apps.cozystack.io/v1alpha1     	true     	Ingress
kafkas                  	apps.cozystack.io/v1alpha1     	true     	Kafka
kuberneteses            	apps.cozystack.io/v1alpha1     	true     	Kubernetes
monitorings             	apps.cozystack.io/v1alpha1     	true     	Monitoring
mysqls                  	apps.cozystack.io/v1alpha1     	true     	MySQL
natses                  	apps.cozystack.io/v1alpha1     	true     	NATS
postgreses              	apps.cozystack.io/v1alpha1     	true     	Postgres
rabbitmqs               	apps.cozystack.io/v1alpha1     	true     	RabbitMQ
redises                 	apps.cozystack.io/v1alpha1     	true     	Redis
seaweedfses             	apps.cozystack.io/v1alpha1     	true     	SeaweedFS
tcpbalancers            	apps.cozystack.io/v1alpha1     	true     	TCPBalancer
tenants                 	apps.cozystack.io/v1alpha1     	true     	Tenant
virtualmachines         	apps.cozystack.io/v1alpha1     	true     	VirtualMachine
vmdisks                 	apps.cozystack.io/v1alpha1     	true     	VMDisk
vminstances             	apps.cozystack.io/v1alpha1     	true     	VMInstance
vpns                    	apps.cozystack.io/v1alpha1     	true     	VPN

Мы можем работать с ними, как с обычными ресурсами Kubernetes.

Просмотр S3-бакетов:

# kubectl get buckets.apps.cozystack.io -n tenant-kvaps

Пример вывода:

NAME   	  READY   AGE   VERSION
foo    	  True	  22h   0.1.0
testaasd  True	  27h   0.1.0

Просмотр Kubernetes-кластеров:

# kubectl get kuberneteses.apps.cozystack.io -n tenant-kvaps

Пример вывода:

NAME	  READY   AGE   VERSION
abc 	  False   19h   0.14.0
asdte     True    22h   0.13.0

Просмотр дисков виртуальных машин:

# kubectl get vmdisks.apps.cozystack.io -n tenant-kvaps

Пример вывода:

NAME         	READY     AGE   VERSION
docker       	True	  21d   0.1.0
test         	True	  18d   0.1.0
win2k25-iso  	True	  21d   0.1.0
win2k25-system  True	  21d   0.1.0

Просмотр виртуальных машин:

# kubectl get vminstances.apps.cozystack.io -n tenant-kvaps

Пример вывода:

NAME  	READY     AGE   VERSION
docker	True	  21d   0.1.0
test  	True	  18d   0.1.0
win2k25 True	  20d   0.1.0

Мы можем создавать, изменять и удалять каждый из них, при этом любое взаимодействие с ними будет транслироваться на ресурсы типа HelmRelease, попутно отрабатывая структуру ресурса и префикс в имени:

Команда для просмотра связанных Helm-релизов:

# kubectl get helmreleases -n tenant-kvaps -l cozystack.io/ui

Пример вывода:

NAME                 	AGE   READY
bucket-foo           	22h   True
bucket-testaasd      	27h   True
kubernetes-abc       	19h   False
kubernetes-asdte     	22h   True
redis-test           	18d   True
redis-yttt           	12d   True
vm-disk-docker       	21d   True
vm-disk-test         	18d   True
vm-disk-win2k25-iso  	21d   True
vm-disk-win2k25-system  21d   True
vm-instance-docker   	21d   True
vm-instance-test     	18d   True
vm-instance-win2k25  	20d   True

Дальнейшие действия

Мы не собираемся на останавливаться на имплементации базового слоя API и в будущем планируем добавить новые фичи:

  • Сделать валидацию на базе OpenAPI-спеки, сформированной напрямую из Helm-чартов.

  • Реализовать контроллер, который будет собирать release notes из задеплоенных релизов и показывать пользователю информацию для доступа к конкретным сервисам.

  • Переработать наш дашборд так, чтобы он начал взаимодействовать с новым API.

Заключение

API Aggregation Layer позволил нам быстро и эффективно решить свою задачу, предоставив гибкий механизм для расширения Kubernetes API динамически регистрируемыми ресурсами и конвертирования их на лету В итоге это сделало нашу платформу ещё более гибкой и расширяемой без необходимости написания кода для каждого нового ресурса.

API вы можете протестировать самостоятельно в свободной PaaS-платформе Cozystack начиная с версии v0.18.

Полезные ссылки

Теги:
Хабы:
Всего голосов 7: ↑7 и ↓0+10
Комментарии1
3

Публикации

Информация

Сайт
aenix.io
Дата регистрации
Численность
2–10 человек
Местоположение
Чехия
Представитель
Andrei Kvapil

Истории