diff --git a/app/services/locker/locker.go b/app/services/locker/locker.go index e44d8bfd6..f0bebac5f 100644 --- a/app/services/locker/locker.go +++ b/app/services/locker/locker.go @@ -28,6 +28,7 @@ import ( ) const namespaceRepo = "repo" +const namespaceRegistry = "registry" type Locker struct { mtxManager lock.MutexManager @@ -49,7 +50,7 @@ func (l Locker) lock( ctx = logging.NewContext(ctx, func(zc zerolog.Context) zerolog.Context { return zc. Str("key", key). - Str("namespace", namespaceRepo). + Str("namespace", namespace). Str("expiry", expiry.String()) }) diff --git a/app/services/locker/rpmrepodata.go b/app/services/locker/rpmrepodata.go new file mode 100644 index 000000000..fd05c9cb2 --- /dev/null +++ b/app/services/locker/rpmrepodata.go @@ -0,0 +1,36 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package locker + +import ( + "context" + "fmt" + "time" +) + +func (l Locker) LockRpmRepoData( + ctx context.Context, + registryID int64, + expiry time.Duration, +) (func(), error) { + key := fmt.Sprintf("%d/repodata", registryID) + + unlockFn, err := l.lock(ctx, namespaceRegistry, key, expiry) + if err != nil { + return nil, fmt.Errorf("failed to lock mutex for registry [%d] RPM repodata: %w", registryID, err) + } + + return unlockFn, nil +} diff --git a/cmd/gitness/wire_gen.go b/cmd/gitness/wire_gen.go index eb453fc03..d6cc31ebb 100644 --- a/cmd/gitness/wire_gen.go +++ b/cmd/gitness/wire_gen.go @@ -526,7 +526,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro return nil, err } registryHelper := rpm.LocalRegistryHelperProvider(fileManager, artifactRepository) - indexService := index.ProvideService(registryHelper) + indexService := index.ProvideService(registryHelper, lockerLocker) apiHandler := router.APIHandlerProvider(registryRepository, upstreamProxyConfigRepository, fileManager, tagRepository, manifestRepository, cleanupPolicyRepository, imageRepository, storageDriver, spaceFinder, transactor, authenticator, provider, authorizer, auditService, artifactRepository, webhooksRepository, webhooksExecutionRepository, service2, spacePathStore, reporter10, downloadStatRepository, indexService, config) mavenDBStore := maven.DBStoreProvider(registryRepository, imageRepository, artifactRepository, spaceStore, bandwidthStatRepository, downloadStatRepository, nodesRepository, upstreamProxyConfigRepository) mavenLocalRegistry := maven.LocalRegistryProvider(mavenDBStore, transactor, fileManager) diff --git a/registry/services/index/service.go b/registry/services/index/service.go index a71c2800f..338133481 100644 --- a/registry/services/index/service.go +++ b/registry/services/index/service.go @@ -16,16 +16,22 @@ package index import ( "context" + "fmt" + "time" + "github.com/harness/gitness/app/services/locker" "github.com/harness/gitness/registry/app/utils/rpm" ) +const timeout = 3 * time.Minute + type Service interface { RegenerateRpmRepoData(ctx context.Context, registryID int64, rootParentID int64, rootIdentifier string) error } type service struct { rpmRegistryHelper rpm.RegistryHelper + locker *locker.Locker } func (s *service) RegenerateRpmRepoData( @@ -34,14 +40,24 @@ func (s *service) RegenerateRpmRepoData( rootParentID int64, rootIdentifier string, ) error { - // TODO: integrate with distributed lock + unlock, err := s.locker.LockRpmRepoData( + ctx, + registryID, + timeout, + ) + if err != nil { + return fmt.Errorf("failed to lock registry for RPM repo data regeneration: %w", err) + } + defer unlock() return s.rpmRegistryHelper.BuildRegistryFiles(ctx, registryID, rootParentID, rootIdentifier) } func NewService( rpmRegistryHelper rpm.RegistryHelper, + locker *locker.Locker, ) Service { return &service{ rpmRegistryHelper: rpmRegistryHelper, + locker: locker, } } diff --git a/registry/services/index/wire.go b/registry/services/index/wire.go index 7e4b25bb0..025094d55 100644 --- a/registry/services/index/wire.go +++ b/registry/services/index/wire.go @@ -15,6 +15,7 @@ package index import ( + "github.com/harness/gitness/app/services/locker" "github.com/harness/gitness/registry/app/utils/rpm" "github.com/google/wire" @@ -27,6 +28,7 @@ var WireSet = wire.NewSet( func ProvideService( rpmRegistryHelper rpm.RegistryHelper, + locker *locker.Locker, ) Service { - return NewService(rpmRegistryHelper) + return NewService(rpmRegistryHelper, locker) }