diff --git a/app/api/controller/githook/post_receive.go b/app/api/controller/githook/post_receive.go index 472df25b0..c9cda0381 100644 --- a/app/api/controller/githook/post_receive.go +++ b/app/api/controller/githook/post_receive.go @@ -17,7 +17,6 @@ package githook import ( "context" "fmt" - "slices" "strings" "time" @@ -70,7 +69,7 @@ func (c *Controller) PostReceive( c.handleEmptyRepoPush(ctx, repo, in.PostReceiveInput, &out) // always update last git push time - best effort - c.updateLastGITPushTime(ctx, repo, in) + c.updateLastGITPushTime(ctx, repo) // report ref events if repo is in an active state - best effort if repo.State == enum.RepoStateActive { @@ -364,16 +363,7 @@ func (c *Controller) handleEmptyRepoPush( func (c *Controller) updateLastGITPushTime( ctx context.Context, repo *types.Repository, - in types.GithookPostReceiveInput, ) { - isNonePRRefFn := func(refUpdate hook.ReferenceUpdate) bool { - return !strings.HasPrefix(refUpdate.Ref, gitReferenceNamePullReq) - } - // ignore push that only contains pr refs for last git push time updates - if !slices.ContainsFunc(in.RefUpdates, isNonePRRefFn) { - return - } - newRepo, err := c.repoStore.UpdateOptLock(ctx, repo, func(r *types.Repository) error { r.LastGITPush = time.Now().UnixMilli() return nil diff --git a/app/api/controller/pullreq/merge.go b/app/api/controller/pullreq/merge.go index 4d5a56123..e90165d95 100644 --- a/app/api/controller/pullreq/merge.go +++ b/app/api/controller/pullreq/merge.go @@ -294,8 +294,14 @@ func (c *Controller) Merge( }(in.Method) if checkMergeability { + // for merge-check we can skip git hooks explicitly (we don't update any refs anyway) + writeParams, err := controller.CreateRPCSystemReferencesWriteParams(ctx, c.urlProvider, session, targetRepo) + if err != nil { + return nil, nil, fmt.Errorf("failed to create RPC write params: %w", err) + } + mergeOutput, err = c.git.Merge(ctx, &git.MergeParams{ - WriteParams: targetWriteParams, + WriteParams: writeParams, BaseBranch: pr.TargetBranch, HeadRepoUID: sourceRepo.GitUID, HeadBranch: pr.SourceBranch, diff --git a/app/api/controller/pullreq/pr_create.go b/app/api/controller/pullreq/pr_create.go index 1d1292518..f629b0980 100644 --- a/app/api/controller/pullreq/pr_create.go +++ b/app/api/controller/pullreq/pr_create.go @@ -115,7 +115,7 @@ func (c *Controller) Create( return nil, err } - targetWriteParams, err := controller.CreateRPCInternalWriteParams( + targetWriteParams, err := controller.CreateRPCSystemReferencesWriteParams( ctx, c.urlProvider, session, targetRepo, ) if err != nil { diff --git a/app/api/controller/pullreq/pr_state.go b/app/api/controller/pullreq/pr_state.go index ab7696162..98e6bb158 100644 --- a/app/api/controller/pullreq/pr_state.go +++ b/app/api/controller/pullreq/pr_state.go @@ -142,7 +142,7 @@ func (c *Controller) State(ctx context.Context, stateChange = changeClose } - targetWriteParams, err := controller.CreateRPCInternalWriteParams(ctx, c.urlProvider, session, targetRepo) + targetWriteParams, err := controller.CreateRPCSystemReferencesWriteParams(ctx, c.urlProvider, session, targetRepo) if err != nil { return nil, fmt.Errorf("failed to create RPC write params: %w", err) } diff --git a/app/api/controller/util.go b/app/api/controller/util.go index 2ec992596..eb1cd498d 100644 --- a/app/api/controller/util.go +++ b/app/api/controller/util.go @@ -27,12 +27,13 @@ import ( ) // createRPCWriteParams creates base write parameters for git write operations. -// TODO: this function should be in git package and should accept params as interface (contract) +// TODO: this function should be in git package and should accept params as interface (contract). func createRPCWriteParams( ctx context.Context, urlProvider url.Provider, session *auth.Session, repo *types.RepositoryCore, + disabled bool, isInternal bool, ) (git.WriteParams, error) { // generate envars (add everything githook CLI needs for execution) @@ -41,7 +42,7 @@ func createRPCWriteParams( urlProvider.GetInternalAPIURL(ctx), repo.ID, session.Principal.ID, - false, + disabled, isInternal, ) if err != nil { @@ -66,7 +67,7 @@ func CreateRPCExternalWriteParams( session *auth.Session, repo *types.RepositoryCore, ) (git.WriteParams, error) { - return createRPCWriteParams(ctx, urlProvider, session, repo, false) + return createRPCWriteParams(ctx, urlProvider, session, repo, false, false) } // CreateRPCInternalWriteParams creates base write parameters for git internal write operations. @@ -77,7 +78,18 @@ func CreateRPCInternalWriteParams( session *auth.Session, repo *types.RepositoryCore, ) (git.WriteParams, error) { - return createRPCWriteParams(ctx, urlProvider, session, repo, true) + return createRPCWriteParams(ctx, urlProvider, session, repo, false, true) +} + +// CreateRPCSystemReferencesWriteParams creates base write parameters for write operations +// on system references (e.g. pullreq references). +func CreateRPCSystemReferencesWriteParams( + ctx context.Context, + urlProvider url.Provider, + session *auth.Session, + repo *types.RepositoryCore, +) (git.WriteParams, error) { + return createRPCWriteParams(ctx, urlProvider, session, repo, true, true) } func MapBranch(b git.Branch) (types.Branch, error) { diff --git a/app/services/importer/repository.go b/app/services/importer/repository.go index f9a203496..090b10ded 100644 --- a/app/services/importer/repository.go +++ b/app/services/importer/repository.go @@ -577,7 +577,7 @@ func (r *Repository) createEnvVars(ctx context.Context, r.urlProvider.GetInternalAPIURL(ctx), repoID, principal.ID, - false, + true, true, ) if err != nil { diff --git a/app/services/pullreq/handlers_head_ref.go b/app/services/pullreq/handlers_head_ref.go index 1391cc49c..df188f05b 100644 --- a/app/services/pullreq/handlers_head_ref.go +++ b/app/services/pullreq/handlers_head_ref.go @@ -36,7 +36,7 @@ func (s *Service) updateHeadRefOnBranchUpdate(ctx context.Context, return fmt.Errorf("failed to get repo git info: %w", err) } - writeParams, err := createSystemRPCWriteParams(ctx, s.urlProvider, repoGit.ID, repoGit.GitUID) + writeParams, err := createRPCSystemReferencesWriteParams(ctx, s.urlProvider, repoGit.ID, repoGit.GitUID) if err != nil { return fmt.Errorf("failed to generate rpc write params: %w", err) } diff --git a/app/services/pullreq/handlers_mergeable.go b/app/services/pullreq/handlers_mergeable.go index e01062e05..8a9ceb083 100644 --- a/app/services/pullreq/handlers_mergeable.go +++ b/app/services/pullreq/handlers_mergeable.go @@ -155,7 +155,7 @@ func (s *Service) updateMergeData( } } - writeParams, err := createSystemRPCWriteParams(ctx, s.urlProvider, targetRepo.ID, targetRepo.GitUID) + writeParams, err := createRPCSystemReferencesWriteParams(ctx, s.urlProvider, targetRepo.ID, targetRepo.GitUID) if err != nil { return fmt.Errorf("failed to generate rpc write params: %w", err) } diff --git a/app/services/pullreq/service.go b/app/services/pullreq/service.go index a9fd9dab3..cd9035b0e 100644 --- a/app/services/pullreq/service.go +++ b/app/services/pullreq/service.go @@ -248,8 +248,8 @@ func New(ctx context.Context, return service, nil } -// createSystemRPCWriteParams creates base write parameters for write operations. -func createSystemRPCWriteParams( +// createRPCSystemReferencesWriteParams creates base write parameters for write operations. +func createRPCSystemReferencesWriteParams( ctx context.Context, urlProvider url.Provider, repoID int64, @@ -257,13 +257,13 @@ func createSystemRPCWriteParams( ) (git.WriteParams, error) { principal := bootstrap.NewSystemServiceSession().Principal - // generate envars (add everything githook CLI needs for execution) + // generate envars - skip githook execution since it's system references only envVars, err := githook.GenerateEnvironmentVariables( ctx, urlProvider.GetInternalAPIURL(ctx), repoID, principal.ID, - false, + true, true, ) if err != nil { diff --git a/app/services/usage/event_handlers.go b/app/services/usage/event_handlers.go new file mode 100644 index 000000000..8700cbc02 --- /dev/null +++ b/app/services/usage/event_handlers.go @@ -0,0 +1,95 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package usage + +import ( + "context" + "fmt" + "time" + + repoevents "github.com/harness/gitness/app/events/repo" + "github.com/harness/gitness/app/paths" + "github.com/harness/gitness/events" + "github.com/harness/gitness/stream" + "github.com/harness/gitness/types" +) + +type RepoFinder interface { + FindByID(ctx context.Context, id int64) (*types.RepositoryCore, error) +} + +func registerEventListeners( + ctx context.Context, + instanceID string, + sender Sender, + repoEvReaderFactory *events.ReaderFactory[*repoevents.Reader], + repoFinder RepoFinder, +) error { + // repo events + const groupRepo = "gitness:usage:repo" + _, err := repoEvReaderFactory.Launch(ctx, groupRepo, instanceID, + func(r *repoevents.Reader) error { + const idleTimeout = 10 * time.Second + r.Configure( + stream.WithConcurrency(1), + stream.WithHandlerOptions( + stream.WithIdleTimeout(idleTimeout), + stream.WithMaxRetries(2), + )) + _ = r.RegisterCreated(repoCreateHandler(sender, repoFinder)) + _ = r.RegisterPushed(repoPushHandler(sender, repoFinder)) + + return nil + }) + if err != nil { + return fmt.Errorf("failed to launch repo event reader: %w", err) + } + + return nil +} + +func repoCreateHandler(sender Sender, repoFinder RepoFinder) events.HandlerFunc[*repoevents.CreatedPayload] { + return func(ctx context.Context, event *events.Event[*repoevents.CreatedPayload]) error { + return sendRepoPushUsage(ctx, sender, repoFinder, event.Payload.RepoID) + } +} + +func repoPushHandler(sender Sender, repoFinder RepoFinder) events.HandlerFunc[*repoevents.PushedPayload] { + return func(ctx context.Context, event *events.Event[*repoevents.PushedPayload]) error { + return sendRepoPushUsage(ctx, sender, repoFinder, event.Payload.RepoID) + } +} + +func sendRepoPushUsage(ctx context.Context, sender Sender, repoFinder RepoFinder, repoID int64) error { + repo, err := repoFinder.FindByID(ctx, repoID) + if err != nil { + return fmt.Errorf("failed to find repo with id %d: %w", repoID, err) + } + + rootSpace, _, err := paths.DisectRoot(repo.Path) + if err != nil { + return fmt.Errorf("failed to disect repo path %q: %w", repo.Path, err) + } + + m := Metric{ + SpaceRef: rootSpace, + Pushes: 1, + } + if err := sender.Send(ctx, m); err != nil { + return fmt.Errorf("failed to send usage metric: %w", err) + } + + return nil +} diff --git a/app/services/usage/mocks.go b/app/services/usage/mocks.go index 27476123a..28d289657 100644 --- a/app/services/usage/mocks.go +++ b/app/services/usage/mocks.go @@ -54,6 +54,20 @@ func (s *SpaceFinderMock) FindByRef( return s.FindByRefFn(ctx, spaceRef) } +type RepoFinderMock struct { + FindByIDFn func( + ctx context.Context, + id int64, + ) (*types.RepositoryCore, error) +} + +func (r *RepoFinderMock) FindByID( + ctx context.Context, + id int64, +) (*types.RepositoryCore, error) { + return r.FindByIDFn(ctx, id) +} + type MetricsMock struct { UpsertOptimisticFn func(ctx context.Context, in *types.UsageMetric) error GetMetricsFn func( diff --git a/app/services/usage/usage.go b/app/services/usage/usage.go index a66122d71..2d036d835 100644 --- a/app/services/usage/usage.go +++ b/app/services/usage/usage.go @@ -25,6 +25,10 @@ import ( "github.com/rs/zerolog/log" ) +var ( + days30 = time.Duration(30*24) * time.Hour +) + type Bandwidth struct { Out int64 In int64 @@ -33,6 +37,7 @@ type Bandwidth struct { type Metric struct { SpaceRef string Bandwidth + Pushes int64 } type SpaceFinder interface { @@ -54,7 +59,7 @@ type MetricStore interface { ) ([]types.UsageMetric, error) } -type Mediator struct { +type mediator struct { queue *queue workers []*worker @@ -67,13 +72,13 @@ type Mediator struct { config Config } -func NewMediator( +func newMediator( ctx context.Context, spaceFinder SpaceFinder, usageMetricsStore MetricStore, config Config, -) *Mediator { - m := &Mediator{ +) *mediator { + m := &mediator{ queue: newQueue(), spaceFinder: spaceFinder, metricsStore: usageMetricsStore, @@ -86,7 +91,7 @@ func NewMediator( return m } -func (m *Mediator) Start(ctx context.Context) { +func (m *mediator) Start(ctx context.Context) { for i := range m.workers { w := newWorker(i, m.queue) go w.start(ctx, m.process) @@ -94,29 +99,29 @@ func (m *Mediator) Start(ctx context.Context) { } } -func (m *Mediator) Stop() { +func (m *mediator) Stop() { for i := range m.workers { m.workers[i].stop() } } -func (m *Mediator) Send(ctx context.Context, payload Metric) error { +func (m *mediator) Send(ctx context.Context, payload Metric) error { m.wg.Add(1) m.queue.Add(ctx, payload) return nil } -func (m *Mediator) Wait() { +func (m *mediator) Wait() { m.wg.Wait() } -func (m *Mediator) Size(ctx context.Context, spaceRef string) (Bandwidth, error) { +func (m *mediator) Size(ctx context.Context, spaceRef string) (Bandwidth, error) { space, err := m.spaceFinder.FindByRef(ctx, spaceRef) if err != nil { return Bandwidth{}, fmt.Errorf("could not find space: %w", err) } now := time.Now() - metric, err := m.metricsStore.GetMetrics(ctx, space.ID, now.Add(-m.days30()).UnixMilli(), now.UnixMilli()) + metric, err := m.metricsStore.GetMetrics(ctx, space.ID, now.Add(-days30).UnixMilli(), now.UnixMilli()) if err != nil { return Bandwidth{}, err } @@ -126,11 +131,7 @@ func (m *Mediator) Size(ctx context.Context, spaceRef string) (Bandwidth, error) }, nil } -func (m *Mediator) days30() time.Duration { - return time.Duration(30*24) * time.Hour -} - -func (m *Mediator) process(ctx context.Context, payload *Metric) { +func (m *mediator) process(ctx context.Context, payload *Metric) { defer m.wg.Done() space, err := m.spaceFinder.FindByRef(ctx, payload.SpaceRef) @@ -143,6 +144,7 @@ func (m *Mediator) process(ctx context.Context, payload *Metric) { RootSpaceID: space.ID, Bandwidth: payload.Out, Storage: payload.In, + Pushes: payload.Pushes, }); err != nil { log.Ctx(ctx).Err(err).Msg("failed to upsert usage metrics") } diff --git a/app/services/usage/usage_test.go b/app/services/usage/usage_test.go index 6be943d4f..5228d5b03 100644 --- a/app/services/usage/usage_test.go +++ b/app/services/usage/usage_test.go @@ -20,7 +20,10 @@ import ( "sync" "sync/atomic" "testing" + "time" + repoevents "github.com/harness/gitness/app/events/repo" + "github.com/harness/gitness/events" "github.com/harness/gitness/types" "github.com/stretchr/testify/require" @@ -36,10 +39,38 @@ func TestMediator_basic(t *testing.T) { return space, nil }, } + repo := &types.RepositoryCore{ + ID: 2, + Path: "space/repo", + } + repoFinderMock := &RepoFinderMock{ + FindByIDFn: func(_ context.Context, id int64) (*types.RepositoryCore, error) { + if id != repo.ID { + return nil, fmt.Errorf("expected id to be %d, got %d", repo.ID, id) + } + return repo, nil + }, + } + + eventSystem, err := events.ProvideSystem(events.Config{ + Mode: events.ModeInMemory, + MaxStreamLength: 100, + }, nil) + if err != nil { + t.Fatalf("failed to create event system: %v", err) + } + repoEvReaderFactory, err := repoevents.NewReaderFactory(eventSystem) + if err != nil { + t.Fatalf("failed to create repo event reader factory: %v", err) + } + repoEvReporter, err := repoevents.NewReporter(eventSystem) + if err != nil { + t.Fatalf("failed to create repo event reporter: %v", err) + } out := atomic.Int64{} in := atomic.Int64{} - counter := atomic.Int64{} + pushes := atomic.Int64{} usageMock := &MetricsMock{ UpsertOptimisticFn: func(_ context.Context, metric *types.UsageMetric) error { @@ -48,7 +79,7 @@ func TestMediator_basic(t *testing.T) { } out.Add(metric.Bandwidth) in.Add(metric.Storage) - counter.Add(1) + pushes.Add(metric.Pushes) return nil }, GetMetricsFn: func( @@ -67,9 +98,11 @@ func TestMediator_basic(t *testing.T) { }, } - numRoutines := 10 + numBandwidthRoutines := 10 + numEventsCreated := 4 + numEventsPushed := 5 defaultSize := 512 - mediator := NewMediator( + mediator := newMediator( context.Background(), spaceFinderMock, usageMock, @@ -77,8 +110,13 @@ func TestMediator_basic(t *testing.T) { MaxWorkers: 5, }, ) + err = registerEventListeners(context.Background(), "test", mediator, repoEvReaderFactory, repoFinderMock) + if err != nil { + t.Fatalf("failed to register event listeners: %v", err) + } + wg := sync.WaitGroup{} - for range numRoutines { + for range numBandwidthRoutines { wg.Add(1) go func() { defer wg.Done() @@ -93,8 +131,29 @@ func TestMediator_basic(t *testing.T) { } wg.Wait() + + for range numEventsCreated { + repoEvReporter.Created(context.Background(), &repoevents.CreatedPayload{ + Base: repoevents.Base{ + RepoID: repo.ID, + }, + }) + } + + for range numEventsPushed { + repoEvReporter.Pushed(context.Background(), &repoevents.PushedPayload{ + Base: repoevents.Base{ + RepoID: repo.ID, + }, + }) + } + + // todo: add ability to wait for event system to complete + time.Sleep(200 * time.Millisecond) + mediator.Wait() - require.Equal(t, int64(numRoutines*defaultSize), out.Load()) - require.Equal(t, int64(numRoutines*defaultSize), in.Load()) + require.Equal(t, int64(numBandwidthRoutines*defaultSize), out.Load()) + require.Equal(t, int64(numBandwidthRoutines*defaultSize), in.Load()) + require.Equal(t, int64(numEventsCreated+numEventsPushed), pushes.Load()) } diff --git a/app/services/usage/wire.go b/app/services/usage/wire.go index 0f7c576c0..423eb55e3 100644 --- a/app/services/usage/wire.go +++ b/app/services/usage/wire.go @@ -16,9 +16,12 @@ package usage import ( "context" + "fmt" + repoevents "github.com/harness/gitness/app/events/repo" "github.com/harness/gitness/app/services/refcache" "github.com/harness/gitness/app/store" + "github.com/harness/gitness/events" "github.com/harness/gitness/types" "github.com/google/wire" @@ -32,15 +35,24 @@ func ProvideMediator( ctx context.Context, config *types.Config, spaceFinder refcache.SpaceFinder, + repoFinder refcache.RepoFinder, metricsStore store.UsageMetricStore, -) Sender { + repoEvReaderFactory *events.ReaderFactory[*repoevents.Reader], +) (Sender, error) { if !config.UsageMetrics.Enabled { - return &Noop{} + return &Noop{}, nil } - return NewMediator( + + m := newMediator( ctx, spaceFinder, metricsStore, NewConfig(config), ) + + if err := registerEventListeners(ctx, config.InstanceID, m, repoEvReaderFactory, repoFinder); err != nil { + return nil, fmt.Errorf("failed to register event listeners: %w", err) + } + + return m, nil } diff --git a/app/store/database/migrate/postgres/0109_alter_usage_table_add_pushes.down.sql b/app/store/database/migrate/postgres/0109_alter_usage_table_add_pushes.down.sql new file mode 100644 index 000000000..3c940860c --- /dev/null +++ b/app/store/database/migrate/postgres/0109_alter_usage_table_add_pushes.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE usage_metrics +DROP COLUMN usage_metric_pushes; diff --git a/app/store/database/migrate/postgres/0109_alter_usage_table_add_pushes.up.sql b/app/store/database/migrate/postgres/0109_alter_usage_table_add_pushes.up.sql new file mode 100644 index 000000000..f4d03ebe4 --- /dev/null +++ b/app/store/database/migrate/postgres/0109_alter_usage_table_add_pushes.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE usage_metrics +ADD COLUMN usage_metric_pushes INTEGER NOT NULL DEFAULT 0; diff --git a/app/store/database/migrate/sqlite/0109_alter_usage_table_add_pushes.down.sql b/app/store/database/migrate/sqlite/0109_alter_usage_table_add_pushes.down.sql new file mode 100644 index 000000000..3c940860c --- /dev/null +++ b/app/store/database/migrate/sqlite/0109_alter_usage_table_add_pushes.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE usage_metrics +DROP COLUMN usage_metric_pushes; diff --git a/app/store/database/migrate/sqlite/0109_alter_usage_table_add_pushes.up.sql b/app/store/database/migrate/sqlite/0109_alter_usage_table_add_pushes.up.sql new file mode 100644 index 000000000..f4d03ebe4 --- /dev/null +++ b/app/store/database/migrate/sqlite/0109_alter_usage_table_add_pushes.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE usage_metrics +ADD COLUMN usage_metric_pushes INTEGER NOT NULL DEFAULT 0; diff --git a/app/store/database/usage_metrics.go b/app/store/database/usage_metrics.go index 5f54c9858..a4b8b1271 100644 --- a/app/store/database/usage_metrics.go +++ b/app/store/database/usage_metrics.go @@ -30,6 +30,17 @@ import ( var _ store.UsageMetricStore = (*UsageMetricsStore)(nil) +type usageMetric struct { + RootSpaceID int64 `db:"usage_metric_space_id"` + Date int64 `db:"usage_metric_date"` + Created int64 `db:"usage_metric_created"` + Updated int64 `db:"usage_metric_updated"` + Bandwidth int64 `db:"usage_metric_bandwidth"` + Storage int64 `db:"usage_metric_storage"` + Pushes int64 `db:"usage_metric_pushes"` + Version int64 `db:"usage_metric_version"` +} + // NewUsageMetricsStore returns a new UsageMetricsStore. func NewUsageMetricsStore(db *sqlx.DB) *UsageMetricsStore { return &UsageMetricsStore{ @@ -48,9 +59,9 @@ func (s *UsageMetricsStore) getVersion( date int64, ) int64 { const sqlQuery = ` - SELECT - usage_metric_version - FROM usage_metrics + SELECT + usage_metric_version + FROM usage_metrics WHERE usage_metric_space_id = $1 AND usage_metric_date = $2 ` var version int64 @@ -70,6 +81,7 @@ func (s *UsageMetricsStore) Upsert(ctx context.Context, in *types.UsageMetric) e ,usage_metric_updated ,usage_metric_bandwidth ,usage_metric_storage + ,usage_metric_pushes ,usage_metric_version ) VALUES ( :usage_metric_space_id @@ -78,8 +90,9 @@ func (s *UsageMetricsStore) Upsert(ctx context.Context, in *types.UsageMetric) e ,:usage_metric_updated ,:usage_metric_bandwidth ,:usage_metric_storage + ,:usage_metric_pushes ,:usage_metric_version - ) + ) ON CONFLICT (usage_metric_space_id, usage_metric_date) DO UPDATE SET @@ -87,6 +100,7 @@ func (s *UsageMetricsStore) Upsert(ctx context.Context, in *types.UsageMetric) e ,usage_metric_updated = EXCLUDED.usage_metric_updated ,usage_metric_bandwidth = usage_metrics.usage_metric_bandwidth + EXCLUDED.usage_metric_bandwidth ,usage_metric_storage = usage_metrics.usage_metric_storage + EXCLUDED.usage_metric_storage + ,usage_metric_pushes = usage_metrics.usage_metric_pushes + EXCLUDED.usage_metric_pushes WHERE usage_metrics.usage_metric_version = EXCLUDED.usage_metric_version - 1` db := dbtx.GetAccessor(ctx, s.db) @@ -98,6 +112,7 @@ func (s *UsageMetricsStore) Upsert(ctx context.Context, in *types.UsageMetric) e Updated: time.Now().UnixMilli(), Bandwidth: in.Bandwidth, Storage: in.Storage, + Pushes: in.Pushes, Version: s.getVersion(ctx, in.RootSpaceID, today) + 1, }) if err != nil { @@ -143,9 +158,10 @@ func (s *UsageMetricsStore) GetMetrics( const sqlQuery = ` SELECT COALESCE(SUM(usage_metric_bandwidth), 0) AS usage_metric_bandwidth, - COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage + COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage, + COALESCE(SUM(usage_metric_pushes), 0) AS usage_metric_pushes FROM usage_metrics - WHERE + WHERE usage_metric_space_id = $1 AND usage_metric_date BETWEEN $2 AND $3` @@ -165,6 +181,7 @@ func (s *UsageMetricsStore) GetMetrics( ).Scan( &result.Bandwidth, &result.Storage, + &result.Pushes, ) if err != nil { return nil, database.ProcessSQLErrorf(ctx, err, "failed to get metric") @@ -182,12 +199,13 @@ func (s *UsageMetricsStore) List( SELECT usage_metric_space_id, COALESCE(SUM(usage_metric_bandwidth), 0) AS usage_metric_bandwidth, - COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage + COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage, + COALESCE(SUM(usage_metric_pushes), 0) AS usage_metric_pushes FROM usage_metrics - WHERE + WHERE usage_metric_date BETWEEN $1 AND $2 GROUP BY usage_metric_space_id - ORDER BY usage_metric_bandwidth DESC, usage_metric_storage DESC` + ORDER BY usage_metric_bandwidth DESC, usage_metric_storage DESC, usage_metric_pushes DESC` startTime := time.UnixMilli(start) endTime := time.UnixMilli(end) @@ -195,7 +213,7 @@ func (s *UsageMetricsStore) List( db := dbtx.GetAccessor(ctx, s.db) rows, err := db.QueryContext(ctx, sqlQuery, s.Date(startTime), s.Date(endTime)) if err != nil { - return nil, database.ProcessSQLErrorf(ctx, err, "failed to list usage_metrics") + return nil, database.ProcessSQLErrorf(ctx, err, "failed to list usage metrics") } defer rows.Close() @@ -206,6 +224,7 @@ func (s *UsageMetricsStore) List( &metric.RootSpaceID, &metric.Bandwidth, &metric.Storage, + &metric.Pushes, ) if err != nil { return nil, database.ProcessSQLErrorf(ctx, err, "failed to scan usage_metrics") @@ -223,13 +242,3 @@ func (s *UsageMetricsStore) Date(t time.Time) int64 { year, month, day := t.Date() return time.Date(year, month, day, 0, 0, 0, 0, time.UTC).UnixMilli() } - -type usageMetric struct { - RootSpaceID int64 `db:"usage_metric_space_id"` - Date int64 `db:"usage_metric_date"` - Created int64 `db:"usage_metric_created"` - Updated int64 `db:"usage_metric_updated"` - Bandwidth int64 `db:"usage_metric_bandwidth"` - Storage int64 `db:"usage_metric_storage"` - Version int64 `db:"usage_metric_version"` -} diff --git a/app/store/database/usage_metrics_test.go b/app/store/database/usage_metrics_test.go index 2c428a20b..5b3dd609b 100644 --- a/app/store/database/usage_metrics_test.go +++ b/app/store/database/usage_metrics_test.go @@ -43,6 +43,7 @@ func TestUsageMetricsStore_Upsert(t *testing.T) { RootSpaceID: 1, Bandwidth: 100, Storage: 100, + Pushes: 21, }) require.NoError(t, err) @@ -51,6 +52,7 @@ func TestUsageMetricsStore_Upsert(t *testing.T) { RootSpaceID: 1, Bandwidth: 100, Storage: 0, + Pushes: 3, }) require.NoError(t, err) @@ -60,7 +62,8 @@ func TestUsageMetricsStore_Upsert(t *testing.T) { usage_metric_space_id, usage_metric_date, usage_metric_bandwidth, - usage_metric_storage + usage_metric_storage, + usage_metric_pushes FROM usage_metrics WHERE usage_metric_space_id = ? LIMIT 1`, @@ -73,12 +76,14 @@ func TestUsageMetricsStore_Upsert(t *testing.T) { &date, &metric.Bandwidth, &metric.Storage, + &metric.Pushes, ) require.NoError(t, err) require.Equal(t, int64(1), metric.RootSpaceID) require.Equal(t, metricsStore.Date(time.Now()), date) require.Equal(t, int64(200), metric.Bandwidth) require.Equal(t, int64(100), metric.Storage) + require.Equal(t, int64(24), metric.Pushes) } func TestUsageMetricsStore_UpsertOptimistic(t *testing.T) { @@ -102,6 +107,7 @@ func TestUsageMetricsStore_UpsertOptimistic(t *testing.T) { RootSpaceID: 1, Bandwidth: 100, Storage: 100, + Pushes: 21, }) }) } @@ -115,6 +121,7 @@ func TestUsageMetricsStore_UpsertOptimistic(t *testing.T) { require.Equal(t, int64(100*100), metric.Bandwidth) require.Equal(t, int64(100*100), metric.Storage) + require.Equal(t, int64(21*100), metric.Pushes) } func TestUsageMetricsStore_GetMetrics(t *testing.T) { @@ -134,6 +141,7 @@ func TestUsageMetricsStore_GetMetrics(t *testing.T) { RootSpaceID: 1, Bandwidth: 100, Storage: 100, + Pushes: 21, }) require.NoError(t, err) @@ -145,6 +153,7 @@ func TestUsageMetricsStore_GetMetrics(t *testing.T) { require.Equal(t, int64(1), metric.RootSpaceID, "expected spaceID = %d, got %d", 1, metric.RootSpaceID) require.Equal(t, int64(100), metric.Bandwidth, "expected bandwidth = %d, got %d", 100, metric.Bandwidth) require.Equal(t, int64(100), metric.Storage, "expected storage = %d, got %d", 100, metric.Storage) + require.Equal(t, int64(21), metric.Pushes, "expected pushes = %d, got %d", 21, metric.Pushes) } func TestUsageMetricsStore_List(t *testing.T) { @@ -164,6 +173,7 @@ func TestUsageMetricsStore_List(t *testing.T) { RootSpaceID: 1, Bandwidth: 100, Storage: 100, + Pushes: 21, }) require.NoError(t, err) @@ -171,6 +181,7 @@ func TestUsageMetricsStore_List(t *testing.T) { RootSpaceID: 1, Bandwidth: 50, Storage: 50, + Pushes: 21, }) require.NoError(t, err) @@ -178,6 +189,7 @@ func TestUsageMetricsStore_List(t *testing.T) { RootSpaceID: 2, Bandwidth: 200, Storage: 200, + Pushes: 21, }) require.NoError(t, err) @@ -188,4 +200,13 @@ func TestUsageMetricsStore_List(t *testing.T) { // list use desc order so first row should be spaceID = 2 require.Equal(t, int64(2), metrics[0].RootSpaceID) + require.Equal(t, int64(200), metrics[0].Bandwidth) + require.Equal(t, int64(200), metrics[0].Storage) + require.Equal(t, int64(21), metrics[0].Pushes) + + // second row should be spaceID = 1 + require.Equal(t, int64(1), metrics[1].RootSpaceID) + require.Equal(t, int64(150), metrics[1].Bandwidth) + require.Equal(t, int64(150), metrics[1].Storage) + require.Equal(t, int64(42), metrics[1].Pushes) } diff --git a/cmd/gitness/wire_gen.go b/cmd/gitness/wire_gen.go index 5caa6afe9..926984aa8 100644 --- a/cmd/gitness/wire_gen.go +++ b/cmd/gitness/wire_gen.go @@ -557,7 +557,14 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro rpmHandler := api2.NewRpmHandlerProvider(rpmController, packagesHandler) handler4 := router.PackageHandlerProvider(packagesHandler, mavenHandler, genericHandler, pythonHandler, nugetHandler, npmHandler, rpmHandler) appRouter := router.AppRouterProvider(registryOCIHandler, apiHandler, handler2, handler3, handler4) - sender := usage.ProvideMediator(ctx, config, spaceFinder, usageMetricStore) + readerFactory3, err := events3.ProvideReaderFactory(eventsSystem) + if err != nil { + return nil, err + } + sender, err := usage.ProvideMediator(ctx, config, spaceFinder, repoFinder, usageMetricStore, readerFactory3) + if err != nil { + return nil, err + } routerRouter := router2.ProvideRouter(ctx, config, authenticator, repoController, reposettingsController, executionController, logsController, spaceController, pipelineController, secretController, triggerController, connectorController, templateController, pluginController, pullreqController, webhookController, githookController, gitInterface, serviceaccountController, controller, principalController, usergroupController, checkController, systemController, uploadController, keywordsearchController, infraproviderController, gitspaceController, migrateController, provider, openapiService, appRouter, sender, lfsController) serverServer := server2.ProvideServer(config, routerRouter) publickeyService := publickey.ProvidePublicKey(publicKeyStore, principalInfoCache) @@ -579,11 +586,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro if err != nil { return nil, err } - readerFactory3, err := events2.ProvideReaderFactory(eventsSystem) - if err != nil { - return nil, err - } - readerFactory4, err := events3.ProvideReaderFactory(eventsSystem) + readerFactory4, err := events2.ProvideReaderFactory(eventsSystem) if err != nil { return nil, err } @@ -591,7 +594,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro if err != nil { return nil, err } - submitter, err := metric.ProvideSubmitter(ctx, config, values, principalStore, principalInfoCache, pullReqStore, ruleStore, readerFactory3, readerFactory4, eventsReaderFactory, readerFactory5, publicaccessService, spaceFinder, repoFinder) + submitter, err := metric.ProvideSubmitter(ctx, config, values, principalStore, principalInfoCache, pullReqStore, ruleStore, readerFactory4, readerFactory3, eventsReaderFactory, readerFactory5, publicaccessService, spaceFinder, repoFinder) if err != nil { return nil, err } @@ -603,7 +606,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro if err != nil { return nil, err } - repoService, err := repo2.ProvideService(ctx, config, eventsReporter, readerFactory4, repoStore, provider, gitInterface, lockerLocker) + repoService, err := repo2.ProvideService(ctx, config, eventsReporter, readerFactory3, repoStore, provider, gitInterface, lockerLocker) if err != nil { return nil, err } @@ -620,7 +623,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro return nil, err } keywordsearchConfig := server.ProvideKeywordSearchConfig(config) - keywordsearchService, err := keywordsearch.ProvideService(ctx, keywordsearchConfig, readerFactory, readerFactory4, repoStore, indexer) + keywordsearchService, err := keywordsearch.ProvideService(ctx, keywordsearchConfig, readerFactory, readerFactory3, repoStore, indexer) if err != nil { return nil, err } diff --git a/types/usage_metric.go b/types/usage_metric.go index 530cc5319..a2b22be98 100644 --- a/types/usage_metric.go +++ b/types/usage_metric.go @@ -18,4 +18,5 @@ type UsageMetric struct { RootSpaceID int64 `json:"root_space_id"` Bandwidth int64 `json:"bandwidth"` Storage int64 `json:"storage"` + Pushes int64 `json:"pushes"` }