2025-04-01 10:38:02 +09:00

230 lines
8.0 KiB
Go

package dualwrite
import (
"context"
"fmt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/grafana-app-sdk/logging"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
)
var (
_ grafanarest.Storage = (*dualWriter)(nil)
)
// dualWriter will write first to legacy, then to unified keeping the same internal ID
type dualWriter struct {
legacy grafanarest.Storage
unified grafanarest.Storage
readUnified bool
errorIsOK bool // in "mode1" we try writing both -- but don't block on unified write errors
log logging.Logger
}
func (d *dualWriter) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
// Call get (send read traffic in cloud)
unifiedGet, unifiedErr := d.unified.Get(ctx, name, options)
if d.readUnified {
return unifiedGet, unifiedErr
}
legacyGet, err := d.legacy.Get(ctx, name, options)
if err != nil {
return nil, err
}
if unifiedErr != nil && !apierrors.IsNotFound(unifiedErr) && !d.errorIsOK {
return nil, unifiedErr // the unified error
}
return legacyGet, nil
}
func (d *dualWriter) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
// Call list (send read traffic in cloud)
unifiedList, err := d.unified.List(ctx, options)
if d.readUnified {
return unifiedList, err
}
if err != nil && !d.errorIsOK {
return nil, err
}
return d.legacy.List(ctx, options)
}
// Create overrides the behavior of the generic DualWriter and writes to LegacyStorage and Storage.
func (d *dualWriter) Create(ctx context.Context, in runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
log := d.log.With("method", "Create").WithContext(ctx)
accIn, err := meta.Accessor(in)
if err != nil {
return nil, err
}
if accIn.GetUID() != "" {
return nil, fmt.Errorf("UID should not be: %v", accIn.GetUID())
}
if accIn.GetName() == "" && accIn.GetGenerateName() == "" {
return nil, fmt.Errorf("name or generatename have to be set")
}
// create in legacy first, and then unistore. if unistore fails, but legacy succeeds,
// will try to cleanup the object in legacy.
createdFromLegacy, err := d.legacy.Create(ctx, in, createValidation, options)
if err != nil {
log.Error("unable to create object in legacy storage", "err", err)
return createdFromLegacy, err
}
createdCopy := createdFromLegacy.DeepCopyObject()
accCreated, err := meta.Accessor(createdCopy)
if err != nil {
return createdFromLegacy, err
}
accCreated.SetResourceVersion("")
accCreated.SetUID("")
storageObj, errObjectSt := d.unified.Create(ctx, createdCopy, createValidation, options)
if errObjectSt != nil {
log.Error("unable to create object in unified storage", "err", err)
if d.errorIsOK {
return createdFromLegacy, nil
}
// if we cannot create in unistore, attempt to clean up legacy
_, _, err = d.legacy.Delete(ctx, accCreated.GetName(), nil, &metav1.DeleteOptions{})
if err != nil {
log.Error("unable to cleanup object in legacy storage", "err", err)
}
}
if d.readUnified {
return storageObj, errObjectSt
}
return createdFromLegacy, errObjectSt
}
func (d *dualWriter) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
// delete from legacy first, and then unistore. Will return a failure if either fails,
// unless its a 404.
//
// we want to delete from legacy first, otherwise if the delete from unistore was successful,
// but legacy failed, the user would get a failure, but not be able to retry the delete
// as they would not be able to see the object in unistore anymore.
objFromLegacy, asyncLegacy, err := d.legacy.Delete(ctx, name, deleteValidation, options)
if err != nil && !d.readUnified {
return objFromLegacy, asyncLegacy, err
}
objFromStorage, asyncStorage, err := d.unified.Delete(ctx, name, deleteValidation, options)
if err != nil && apierrors.IsNotFound(err) || d.errorIsOK {
err = nil // clear the error
}
if d.readUnified {
return objFromStorage, asyncStorage, err
}
return objFromLegacy, asyncLegacy, err
}
// Update overrides the behavior of the generic DualWriter and writes first to Storage and then to LegacyStorage.
func (d *dualWriter) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
log := d.log.With("method", "Update").WithContext(ctx)
// The incoming RV is not stable -- it may be from legacy or storage!
// This sets a flag in the context and our apistore is more lenient when it exists
ctx = grafanarest.WithDualWriteUpdate(ctx)
// update in legacy first, and then unistore. Will return a failure if either fails.
//
// we want to update in legacy first, otherwise if the update from unistore was successful,
// but legacy failed, the user would get a failure, but see the update did apply to the source
// of truth, and be less likely to retry to save (and get the stores in sync again)
objFromLegacy, createdLegacy, err := d.legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil {
log.With("object", objFromLegacy).Error("could not update in legacy storage", "err", err)
return objFromLegacy, createdLegacy, err
}
objFromStorage, created, err := d.unified.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil {
log.With("object", objFromStorage).Error("could not update in storage", "err", err)
if d.errorIsOK {
return objFromLegacy, createdLegacy, nil
}
}
if d.readUnified {
return objFromStorage, created, err
}
return objFromLegacy, createdLegacy, err
}
// DeleteCollection overrides the behavior of the generic DualWriter and deletes from both LegacyStorage and Storage.
func (d *dualWriter) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
log := d.log.With("method", "DeleteCollection", "resourceVersion", listOptions.ResourceVersion).WithContext(ctx)
// delete from legacy first, and anything that is successful can be deleted in unistore too.
//
// we want to delete from legacy first, otherwise if the delete from unistore was successful,
// but legacy failed, the user would get a failure, but not be able to retry the delete
// as they would not be able to see the object in unistore anymore.
deletedLegacy, err := d.legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err != nil {
log.With("deleted", deletedLegacy).Error("failed to delete collection successfully from legacy storage", "err", err)
return deletedLegacy, err
}
deletedStorage, err := d.unified.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err != nil {
log.With("deleted", deletedStorage).Error("failed to delete collection successfully from Storage", "err", err)
if d.errorIsOK {
return deletedLegacy, nil
}
}
if d.readUnified {
return deletedStorage, err
}
return deletedLegacy, err
}
func (d *dualWriter) Destroy() {
d.legacy.Destroy()
d.unified.Destroy()
}
func (d *dualWriter) GetSingularName() string {
return d.unified.GetSingularName()
}
func (d *dualWriter) NamespaceScoped() bool {
return d.unified.NamespaceScoped()
}
func (d *dualWriter) New() runtime.Object {
return d.unified.New()
}
func (d *dualWriter) NewList() runtime.Object {
return d.unified.NewList()
}
func (d *dualWriter) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
return d.unified.ConvertToTable(ctx, object, tableOptions)
}