feat: [AH-1221]: Implement distributed locking for repo data (#3731)

* rebase
* feat: [AH-1221]: Implement distributed locking for repo data
This commit is contained in:
Tudor Macari 2025-04-30 05:33:35 +00:00 committed by Harness
parent 57f613cac1
commit c62f822571
5 changed files with 59 additions and 4 deletions

View File

@ -28,6 +28,7 @@ import (
)
const namespaceRepo = "repo"
const namespaceRegistry = "registry"
type Locker struct {
mtxManager lock.MutexManager
@ -49,7 +50,7 @@ func (l Locker) lock(
ctx = logging.NewContext(ctx, func(zc zerolog.Context) zerolog.Context {
return zc.
Str("key", key).
Str("namespace", namespaceRepo).
Str("namespace", namespace).
Str("expiry", expiry.String())
})

View File

@ -0,0 +1,36 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package locker
import (
"context"
"fmt"
"time"
)
func (l Locker) LockRpmRepoData(
ctx context.Context,
registryID int64,
expiry time.Duration,
) (func(), error) {
key := fmt.Sprintf("%d/repodata", registryID)
unlockFn, err := l.lock(ctx, namespaceRegistry, key, expiry)
if err != nil {
return nil, fmt.Errorf("failed to lock mutex for registry [%d] RPM repodata: %w", registryID, err)
}
return unlockFn, nil
}

View File

@ -526,7 +526,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
return nil, err
}
registryHelper := rpm.LocalRegistryHelperProvider(fileManager, artifactRepository)
indexService := index.ProvideService(registryHelper)
indexService := index.ProvideService(registryHelper, lockerLocker)
apiHandler := router.APIHandlerProvider(registryRepository, upstreamProxyConfigRepository, fileManager, tagRepository, manifestRepository, cleanupPolicyRepository, imageRepository, storageDriver, spaceFinder, transactor, authenticator, provider, authorizer, auditService, artifactRepository, webhooksRepository, webhooksExecutionRepository, service2, spacePathStore, reporter10, downloadStatRepository, indexService, config)
mavenDBStore := maven.DBStoreProvider(registryRepository, imageRepository, artifactRepository, spaceStore, bandwidthStatRepository, downloadStatRepository, nodesRepository, upstreamProxyConfigRepository)
mavenLocalRegistry := maven.LocalRegistryProvider(mavenDBStore, transactor, fileManager)

View File

@ -16,16 +16,22 @@ package index
import (
"context"
"fmt"
"time"
"github.com/harness/gitness/app/services/locker"
"github.com/harness/gitness/registry/app/utils/rpm"
)
const timeout = 3 * time.Minute
type Service interface {
RegenerateRpmRepoData(ctx context.Context, registryID int64, rootParentID int64, rootIdentifier string) error
}
type service struct {
rpmRegistryHelper rpm.RegistryHelper
locker *locker.Locker
}
func (s *service) RegenerateRpmRepoData(
@ -34,14 +40,24 @@ func (s *service) RegenerateRpmRepoData(
rootParentID int64,
rootIdentifier string,
) error {
// TODO: integrate with distributed lock
unlock, err := s.locker.LockRpmRepoData(
ctx,
registryID,
timeout,
)
if err != nil {
return fmt.Errorf("failed to lock registry for RPM repo data regeneration: %w", err)
}
defer unlock()
return s.rpmRegistryHelper.BuildRegistryFiles(ctx, registryID, rootParentID, rootIdentifier)
}
func NewService(
rpmRegistryHelper rpm.RegistryHelper,
locker *locker.Locker,
) Service {
return &service{
rpmRegistryHelper: rpmRegistryHelper,
locker: locker,
}
}

View File

@ -15,6 +15,7 @@
package index
import (
"github.com/harness/gitness/app/services/locker"
"github.com/harness/gitness/registry/app/utils/rpm"
"github.com/google/wire"
@ -27,6 +28,7 @@ var WireSet = wire.NewSet(
func ProvideService(
rpmRegistryHelper rpm.RegistryHelper,
locker *locker.Locker,
) Service {
return NewService(rpmRegistryHelper)
return NewService(rpmRegistryHelper, locker)
}