feat: [CODE-3514]: Track git writes per account (#3671)

This commit is contained in:
Johannes Batzill 2025-04-22 00:51:51 +00:00 committed by Harness
parent bdfe97f77c
commit dbb193e834
22 changed files with 312 additions and 80 deletions

View File

@ -17,7 +17,6 @@ package githook
import ( import (
"context" "context"
"fmt" "fmt"
"slices"
"strings" "strings"
"time" "time"
@ -70,7 +69,7 @@ func (c *Controller) PostReceive(
c.handleEmptyRepoPush(ctx, repo, in.PostReceiveInput, &out) c.handleEmptyRepoPush(ctx, repo, in.PostReceiveInput, &out)
// always update last git push time - best effort // always update last git push time - best effort
c.updateLastGITPushTime(ctx, repo, in) c.updateLastGITPushTime(ctx, repo)
// report ref events if repo is in an active state - best effort // report ref events if repo is in an active state - best effort
if repo.State == enum.RepoStateActive { if repo.State == enum.RepoStateActive {
@ -364,16 +363,7 @@ func (c *Controller) handleEmptyRepoPush(
func (c *Controller) updateLastGITPushTime( func (c *Controller) updateLastGITPushTime(
ctx context.Context, ctx context.Context,
repo *types.Repository, repo *types.Repository,
in types.GithookPostReceiveInput,
) { ) {
isNonePRRefFn := func(refUpdate hook.ReferenceUpdate) bool {
return !strings.HasPrefix(refUpdate.Ref, gitReferenceNamePullReq)
}
// ignore push that only contains pr refs for last git push time updates
if !slices.ContainsFunc(in.RefUpdates, isNonePRRefFn) {
return
}
newRepo, err := c.repoStore.UpdateOptLock(ctx, repo, func(r *types.Repository) error { newRepo, err := c.repoStore.UpdateOptLock(ctx, repo, func(r *types.Repository) error {
r.LastGITPush = time.Now().UnixMilli() r.LastGITPush = time.Now().UnixMilli()
return nil return nil

View File

@ -294,8 +294,14 @@ func (c *Controller) Merge(
}(in.Method) }(in.Method)
if checkMergeability { if checkMergeability {
// for merge-check we can skip git hooks explicitly (we don't update any refs anyway)
writeParams, err := controller.CreateRPCSystemReferencesWriteParams(ctx, c.urlProvider, session, targetRepo)
if err != nil {
return nil, nil, fmt.Errorf("failed to create RPC write params: %w", err)
}
mergeOutput, err = c.git.Merge(ctx, &git.MergeParams{ mergeOutput, err = c.git.Merge(ctx, &git.MergeParams{
WriteParams: targetWriteParams, WriteParams: writeParams,
BaseBranch: pr.TargetBranch, BaseBranch: pr.TargetBranch,
HeadRepoUID: sourceRepo.GitUID, HeadRepoUID: sourceRepo.GitUID,
HeadBranch: pr.SourceBranch, HeadBranch: pr.SourceBranch,

View File

@ -115,7 +115,7 @@ func (c *Controller) Create(
return nil, err return nil, err
} }
targetWriteParams, err := controller.CreateRPCInternalWriteParams( targetWriteParams, err := controller.CreateRPCSystemReferencesWriteParams(
ctx, c.urlProvider, session, targetRepo, ctx, c.urlProvider, session, targetRepo,
) )
if err != nil { if err != nil {

View File

@ -142,7 +142,7 @@ func (c *Controller) State(ctx context.Context,
stateChange = changeClose stateChange = changeClose
} }
targetWriteParams, err := controller.CreateRPCInternalWriteParams(ctx, c.urlProvider, session, targetRepo) targetWriteParams, err := controller.CreateRPCSystemReferencesWriteParams(ctx, c.urlProvider, session, targetRepo)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create RPC write params: %w", err) return nil, fmt.Errorf("failed to create RPC write params: %w", err)
} }

View File

@ -27,12 +27,13 @@ import (
) )
// createRPCWriteParams creates base write parameters for git write operations. // createRPCWriteParams creates base write parameters for git write operations.
// TODO: this function should be in git package and should accept params as interface (contract) // TODO: this function should be in git package and should accept params as interface (contract).
func createRPCWriteParams( func createRPCWriteParams(
ctx context.Context, ctx context.Context,
urlProvider url.Provider, urlProvider url.Provider,
session *auth.Session, session *auth.Session,
repo *types.RepositoryCore, repo *types.RepositoryCore,
disabled bool,
isInternal bool, isInternal bool,
) (git.WriteParams, error) { ) (git.WriteParams, error) {
// generate envars (add everything githook CLI needs for execution) // generate envars (add everything githook CLI needs for execution)
@ -41,7 +42,7 @@ func createRPCWriteParams(
urlProvider.GetInternalAPIURL(ctx), urlProvider.GetInternalAPIURL(ctx),
repo.ID, repo.ID,
session.Principal.ID, session.Principal.ID,
false, disabled,
isInternal, isInternal,
) )
if err != nil { if err != nil {
@ -66,7 +67,7 @@ func CreateRPCExternalWriteParams(
session *auth.Session, session *auth.Session,
repo *types.RepositoryCore, repo *types.RepositoryCore,
) (git.WriteParams, error) { ) (git.WriteParams, error) {
return createRPCWriteParams(ctx, urlProvider, session, repo, false) return createRPCWriteParams(ctx, urlProvider, session, repo, false, false)
} }
// CreateRPCInternalWriteParams creates base write parameters for git internal write operations. // CreateRPCInternalWriteParams creates base write parameters for git internal write operations.
@ -77,7 +78,18 @@ func CreateRPCInternalWriteParams(
session *auth.Session, session *auth.Session,
repo *types.RepositoryCore, repo *types.RepositoryCore,
) (git.WriteParams, error) { ) (git.WriteParams, error) {
return createRPCWriteParams(ctx, urlProvider, session, repo, true) return createRPCWriteParams(ctx, urlProvider, session, repo, false, true)
}
// CreateRPCSystemReferencesWriteParams creates base write parameters for write operations
// on system references (e.g. pullreq references).
func CreateRPCSystemReferencesWriteParams(
ctx context.Context,
urlProvider url.Provider,
session *auth.Session,
repo *types.RepositoryCore,
) (git.WriteParams, error) {
return createRPCWriteParams(ctx, urlProvider, session, repo, true, true)
} }
func MapBranch(b git.Branch) (types.Branch, error) { func MapBranch(b git.Branch) (types.Branch, error) {

View File

@ -577,7 +577,7 @@ func (r *Repository) createEnvVars(ctx context.Context,
r.urlProvider.GetInternalAPIURL(ctx), r.urlProvider.GetInternalAPIURL(ctx),
repoID, repoID,
principal.ID, principal.ID,
false, true,
true, true,
) )
if err != nil { if err != nil {

View File

@ -36,7 +36,7 @@ func (s *Service) updateHeadRefOnBranchUpdate(ctx context.Context,
return fmt.Errorf("failed to get repo git info: %w", err) return fmt.Errorf("failed to get repo git info: %w", err)
} }
writeParams, err := createSystemRPCWriteParams(ctx, s.urlProvider, repoGit.ID, repoGit.GitUID) writeParams, err := createRPCSystemReferencesWriteParams(ctx, s.urlProvider, repoGit.ID, repoGit.GitUID)
if err != nil { if err != nil {
return fmt.Errorf("failed to generate rpc write params: %w", err) return fmt.Errorf("failed to generate rpc write params: %w", err)
} }

View File

@ -155,7 +155,7 @@ func (s *Service) updateMergeData(
} }
} }
writeParams, err := createSystemRPCWriteParams(ctx, s.urlProvider, targetRepo.ID, targetRepo.GitUID) writeParams, err := createRPCSystemReferencesWriteParams(ctx, s.urlProvider, targetRepo.ID, targetRepo.GitUID)
if err != nil { if err != nil {
return fmt.Errorf("failed to generate rpc write params: %w", err) return fmt.Errorf("failed to generate rpc write params: %w", err)
} }

View File

@ -248,8 +248,8 @@ func New(ctx context.Context,
return service, nil return service, nil
} }
// createSystemRPCWriteParams creates base write parameters for write operations. // createRPCSystemReferencesWriteParams creates base write parameters for write operations.
func createSystemRPCWriteParams( func createRPCSystemReferencesWriteParams(
ctx context.Context, ctx context.Context,
urlProvider url.Provider, urlProvider url.Provider,
repoID int64, repoID int64,
@ -257,13 +257,13 @@ func createSystemRPCWriteParams(
) (git.WriteParams, error) { ) (git.WriteParams, error) {
principal := bootstrap.NewSystemServiceSession().Principal principal := bootstrap.NewSystemServiceSession().Principal
// generate envars (add everything githook CLI needs for execution) // generate envars - skip githook execution since it's system references only
envVars, err := githook.GenerateEnvironmentVariables( envVars, err := githook.GenerateEnvironmentVariables(
ctx, ctx,
urlProvider.GetInternalAPIURL(ctx), urlProvider.GetInternalAPIURL(ctx),
repoID, repoID,
principal.ID, principal.ID,
false, true,
true, true,
) )
if err != nil { if err != nil {

View File

@ -0,0 +1,95 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package usage
import (
"context"
"fmt"
"time"
repoevents "github.com/harness/gitness/app/events/repo"
"github.com/harness/gitness/app/paths"
"github.com/harness/gitness/events"
"github.com/harness/gitness/stream"
"github.com/harness/gitness/types"
)
type RepoFinder interface {
FindByID(ctx context.Context, id int64) (*types.RepositoryCore, error)
}
func registerEventListeners(
ctx context.Context,
instanceID string,
sender Sender,
repoEvReaderFactory *events.ReaderFactory[*repoevents.Reader],
repoFinder RepoFinder,
) error {
// repo events
const groupRepo = "gitness:usage:repo"
_, err := repoEvReaderFactory.Launch(ctx, groupRepo, instanceID,
func(r *repoevents.Reader) error {
const idleTimeout = 10 * time.Second
r.Configure(
stream.WithConcurrency(1),
stream.WithHandlerOptions(
stream.WithIdleTimeout(idleTimeout),
stream.WithMaxRetries(2),
))
_ = r.RegisterCreated(repoCreateHandler(sender, repoFinder))
_ = r.RegisterPushed(repoPushHandler(sender, repoFinder))
return nil
})
if err != nil {
return fmt.Errorf("failed to launch repo event reader: %w", err)
}
return nil
}
func repoCreateHandler(sender Sender, repoFinder RepoFinder) events.HandlerFunc[*repoevents.CreatedPayload] {
return func(ctx context.Context, event *events.Event[*repoevents.CreatedPayload]) error {
return sendRepoPushUsage(ctx, sender, repoFinder, event.Payload.RepoID)
}
}
func repoPushHandler(sender Sender, repoFinder RepoFinder) events.HandlerFunc[*repoevents.PushedPayload] {
return func(ctx context.Context, event *events.Event[*repoevents.PushedPayload]) error {
return sendRepoPushUsage(ctx, sender, repoFinder, event.Payload.RepoID)
}
}
func sendRepoPushUsage(ctx context.Context, sender Sender, repoFinder RepoFinder, repoID int64) error {
repo, err := repoFinder.FindByID(ctx, repoID)
if err != nil {
return fmt.Errorf("failed to find repo with id %d: %w", repoID, err)
}
rootSpace, _, err := paths.DisectRoot(repo.Path)
if err != nil {
return fmt.Errorf("failed to disect repo path %q: %w", repo.Path, err)
}
m := Metric{
SpaceRef: rootSpace,
Pushes: 1,
}
if err := sender.Send(ctx, m); err != nil {
return fmt.Errorf("failed to send usage metric: %w", err)
}
return nil
}

View File

@ -54,6 +54,20 @@ func (s *SpaceFinderMock) FindByRef(
return s.FindByRefFn(ctx, spaceRef) return s.FindByRefFn(ctx, spaceRef)
} }
type RepoFinderMock struct {
FindByIDFn func(
ctx context.Context,
id int64,
) (*types.RepositoryCore, error)
}
func (r *RepoFinderMock) FindByID(
ctx context.Context,
id int64,
) (*types.RepositoryCore, error) {
return r.FindByIDFn(ctx, id)
}
type MetricsMock struct { type MetricsMock struct {
UpsertOptimisticFn func(ctx context.Context, in *types.UsageMetric) error UpsertOptimisticFn func(ctx context.Context, in *types.UsageMetric) error
GetMetricsFn func( GetMetricsFn func(

View File

@ -25,6 +25,10 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
var (
days30 = time.Duration(30*24) * time.Hour
)
type Bandwidth struct { type Bandwidth struct {
Out int64 Out int64
In int64 In int64
@ -33,6 +37,7 @@ type Bandwidth struct {
type Metric struct { type Metric struct {
SpaceRef string SpaceRef string
Bandwidth Bandwidth
Pushes int64
} }
type SpaceFinder interface { type SpaceFinder interface {
@ -54,7 +59,7 @@ type MetricStore interface {
) ([]types.UsageMetric, error) ) ([]types.UsageMetric, error)
} }
type Mediator struct { type mediator struct {
queue *queue queue *queue
workers []*worker workers []*worker
@ -67,13 +72,13 @@ type Mediator struct {
config Config config Config
} }
func NewMediator( func newMediator(
ctx context.Context, ctx context.Context,
spaceFinder SpaceFinder, spaceFinder SpaceFinder,
usageMetricsStore MetricStore, usageMetricsStore MetricStore,
config Config, config Config,
) *Mediator { ) *mediator {
m := &Mediator{ m := &mediator{
queue: newQueue(), queue: newQueue(),
spaceFinder: spaceFinder, spaceFinder: spaceFinder,
metricsStore: usageMetricsStore, metricsStore: usageMetricsStore,
@ -86,7 +91,7 @@ func NewMediator(
return m return m
} }
func (m *Mediator) Start(ctx context.Context) { func (m *mediator) Start(ctx context.Context) {
for i := range m.workers { for i := range m.workers {
w := newWorker(i, m.queue) w := newWorker(i, m.queue)
go w.start(ctx, m.process) go w.start(ctx, m.process)
@ -94,29 +99,29 @@ func (m *Mediator) Start(ctx context.Context) {
} }
} }
func (m *Mediator) Stop() { func (m *mediator) Stop() {
for i := range m.workers { for i := range m.workers {
m.workers[i].stop() m.workers[i].stop()
} }
} }
func (m *Mediator) Send(ctx context.Context, payload Metric) error { func (m *mediator) Send(ctx context.Context, payload Metric) error {
m.wg.Add(1) m.wg.Add(1)
m.queue.Add(ctx, payload) m.queue.Add(ctx, payload)
return nil return nil
} }
func (m *Mediator) Wait() { func (m *mediator) Wait() {
m.wg.Wait() m.wg.Wait()
} }
func (m *Mediator) Size(ctx context.Context, spaceRef string) (Bandwidth, error) { func (m *mediator) Size(ctx context.Context, spaceRef string) (Bandwidth, error) {
space, err := m.spaceFinder.FindByRef(ctx, spaceRef) space, err := m.spaceFinder.FindByRef(ctx, spaceRef)
if err != nil { if err != nil {
return Bandwidth{}, fmt.Errorf("could not find space: %w", err) return Bandwidth{}, fmt.Errorf("could not find space: %w", err)
} }
now := time.Now() now := time.Now()
metric, err := m.metricsStore.GetMetrics(ctx, space.ID, now.Add(-m.days30()).UnixMilli(), now.UnixMilli()) metric, err := m.metricsStore.GetMetrics(ctx, space.ID, now.Add(-days30).UnixMilli(), now.UnixMilli())
if err != nil { if err != nil {
return Bandwidth{}, err return Bandwidth{}, err
} }
@ -126,11 +131,7 @@ func (m *Mediator) Size(ctx context.Context, spaceRef string) (Bandwidth, error)
}, nil }, nil
} }
func (m *Mediator) days30() time.Duration { func (m *mediator) process(ctx context.Context, payload *Metric) {
return time.Duration(30*24) * time.Hour
}
func (m *Mediator) process(ctx context.Context, payload *Metric) {
defer m.wg.Done() defer m.wg.Done()
space, err := m.spaceFinder.FindByRef(ctx, payload.SpaceRef) space, err := m.spaceFinder.FindByRef(ctx, payload.SpaceRef)
@ -143,6 +144,7 @@ func (m *Mediator) process(ctx context.Context, payload *Metric) {
RootSpaceID: space.ID, RootSpaceID: space.ID,
Bandwidth: payload.Out, Bandwidth: payload.Out,
Storage: payload.In, Storage: payload.In,
Pushes: payload.Pushes,
}); err != nil { }); err != nil {
log.Ctx(ctx).Err(err).Msg("failed to upsert usage metrics") log.Ctx(ctx).Err(err).Msg("failed to upsert usage metrics")
} }

View File

@ -20,7 +20,10 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time"
repoevents "github.com/harness/gitness/app/events/repo"
"github.com/harness/gitness/events"
"github.com/harness/gitness/types" "github.com/harness/gitness/types"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -36,10 +39,38 @@ func TestMediator_basic(t *testing.T) {
return space, nil return space, nil
}, },
} }
repo := &types.RepositoryCore{
ID: 2,
Path: "space/repo",
}
repoFinderMock := &RepoFinderMock{
FindByIDFn: func(_ context.Context, id int64) (*types.RepositoryCore, error) {
if id != repo.ID {
return nil, fmt.Errorf("expected id to be %d, got %d", repo.ID, id)
}
return repo, nil
},
}
eventSystem, err := events.ProvideSystem(events.Config{
Mode: events.ModeInMemory,
MaxStreamLength: 100,
}, nil)
if err != nil {
t.Fatalf("failed to create event system: %v", err)
}
repoEvReaderFactory, err := repoevents.NewReaderFactory(eventSystem)
if err != nil {
t.Fatalf("failed to create repo event reader factory: %v", err)
}
repoEvReporter, err := repoevents.NewReporter(eventSystem)
if err != nil {
t.Fatalf("failed to create repo event reporter: %v", err)
}
out := atomic.Int64{} out := atomic.Int64{}
in := atomic.Int64{} in := atomic.Int64{}
counter := atomic.Int64{} pushes := atomic.Int64{}
usageMock := &MetricsMock{ usageMock := &MetricsMock{
UpsertOptimisticFn: func(_ context.Context, metric *types.UsageMetric) error { UpsertOptimisticFn: func(_ context.Context, metric *types.UsageMetric) error {
@ -48,7 +79,7 @@ func TestMediator_basic(t *testing.T) {
} }
out.Add(metric.Bandwidth) out.Add(metric.Bandwidth)
in.Add(metric.Storage) in.Add(metric.Storage)
counter.Add(1) pushes.Add(metric.Pushes)
return nil return nil
}, },
GetMetricsFn: func( GetMetricsFn: func(
@ -67,9 +98,11 @@ func TestMediator_basic(t *testing.T) {
}, },
} }
numRoutines := 10 numBandwidthRoutines := 10
numEventsCreated := 4
numEventsPushed := 5
defaultSize := 512 defaultSize := 512
mediator := NewMediator( mediator := newMediator(
context.Background(), context.Background(),
spaceFinderMock, spaceFinderMock,
usageMock, usageMock,
@ -77,8 +110,13 @@ func TestMediator_basic(t *testing.T) {
MaxWorkers: 5, MaxWorkers: 5,
}, },
) )
err = registerEventListeners(context.Background(), "test", mediator, repoEvReaderFactory, repoFinderMock)
if err != nil {
t.Fatalf("failed to register event listeners: %v", err)
}
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for range numRoutines { for range numBandwidthRoutines {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
@ -93,8 +131,29 @@ func TestMediator_basic(t *testing.T) {
} }
wg.Wait() wg.Wait()
for range numEventsCreated {
repoEvReporter.Created(context.Background(), &repoevents.CreatedPayload{
Base: repoevents.Base{
RepoID: repo.ID,
},
})
}
for range numEventsPushed {
repoEvReporter.Pushed(context.Background(), &repoevents.PushedPayload{
Base: repoevents.Base{
RepoID: repo.ID,
},
})
}
// todo: add ability to wait for event system to complete
time.Sleep(200 * time.Millisecond)
mediator.Wait() mediator.Wait()
require.Equal(t, int64(numRoutines*defaultSize), out.Load()) require.Equal(t, int64(numBandwidthRoutines*defaultSize), out.Load())
require.Equal(t, int64(numRoutines*defaultSize), in.Load()) require.Equal(t, int64(numBandwidthRoutines*defaultSize), in.Load())
require.Equal(t, int64(numEventsCreated+numEventsPushed), pushes.Load())
} }

View File

@ -16,9 +16,12 @@ package usage
import ( import (
"context" "context"
"fmt"
repoevents "github.com/harness/gitness/app/events/repo"
"github.com/harness/gitness/app/services/refcache" "github.com/harness/gitness/app/services/refcache"
"github.com/harness/gitness/app/store" "github.com/harness/gitness/app/store"
"github.com/harness/gitness/events"
"github.com/harness/gitness/types" "github.com/harness/gitness/types"
"github.com/google/wire" "github.com/google/wire"
@ -32,15 +35,24 @@ func ProvideMediator(
ctx context.Context, ctx context.Context,
config *types.Config, config *types.Config,
spaceFinder refcache.SpaceFinder, spaceFinder refcache.SpaceFinder,
repoFinder refcache.RepoFinder,
metricsStore store.UsageMetricStore, metricsStore store.UsageMetricStore,
) Sender { repoEvReaderFactory *events.ReaderFactory[*repoevents.Reader],
) (Sender, error) {
if !config.UsageMetrics.Enabled { if !config.UsageMetrics.Enabled {
return &Noop{} return &Noop{}, nil
} }
return NewMediator(
m := newMediator(
ctx, ctx,
spaceFinder, spaceFinder,
metricsStore, metricsStore,
NewConfig(config), NewConfig(config),
) )
if err := registerEventListeners(ctx, config.InstanceID, m, repoEvReaderFactory, repoFinder); err != nil {
return nil, fmt.Errorf("failed to register event listeners: %w", err)
}
return m, nil
} }

View File

@ -0,0 +1,2 @@
ALTER TABLE usage_metrics
DROP COLUMN usage_metric_pushes;

View File

@ -0,0 +1,2 @@
ALTER TABLE usage_metrics
ADD COLUMN usage_metric_pushes INTEGER NOT NULL DEFAULT 0;

View File

@ -0,0 +1,2 @@
ALTER TABLE usage_metrics
DROP COLUMN usage_metric_pushes;

View File

@ -0,0 +1,2 @@
ALTER TABLE usage_metrics
ADD COLUMN usage_metric_pushes INTEGER NOT NULL DEFAULT 0;

View File

@ -30,6 +30,17 @@ import (
var _ store.UsageMetricStore = (*UsageMetricsStore)(nil) var _ store.UsageMetricStore = (*UsageMetricsStore)(nil)
type usageMetric struct {
RootSpaceID int64 `db:"usage_metric_space_id"`
Date int64 `db:"usage_metric_date"`
Created int64 `db:"usage_metric_created"`
Updated int64 `db:"usage_metric_updated"`
Bandwidth int64 `db:"usage_metric_bandwidth"`
Storage int64 `db:"usage_metric_storage"`
Pushes int64 `db:"usage_metric_pushes"`
Version int64 `db:"usage_metric_version"`
}
// NewUsageMetricsStore returns a new UsageMetricsStore. // NewUsageMetricsStore returns a new UsageMetricsStore.
func NewUsageMetricsStore(db *sqlx.DB) *UsageMetricsStore { func NewUsageMetricsStore(db *sqlx.DB) *UsageMetricsStore {
return &UsageMetricsStore{ return &UsageMetricsStore{
@ -48,9 +59,9 @@ func (s *UsageMetricsStore) getVersion(
date int64, date int64,
) int64 { ) int64 {
const sqlQuery = ` const sqlQuery = `
SELECT SELECT
usage_metric_version usage_metric_version
FROM usage_metrics FROM usage_metrics
WHERE usage_metric_space_id = $1 AND usage_metric_date = $2 WHERE usage_metric_space_id = $1 AND usage_metric_date = $2
` `
var version int64 var version int64
@ -70,6 +81,7 @@ func (s *UsageMetricsStore) Upsert(ctx context.Context, in *types.UsageMetric) e
,usage_metric_updated ,usage_metric_updated
,usage_metric_bandwidth ,usage_metric_bandwidth
,usage_metric_storage ,usage_metric_storage
,usage_metric_pushes
,usage_metric_version ,usage_metric_version
) VALUES ( ) VALUES (
:usage_metric_space_id :usage_metric_space_id
@ -78,8 +90,9 @@ func (s *UsageMetricsStore) Upsert(ctx context.Context, in *types.UsageMetric) e
,:usage_metric_updated ,:usage_metric_updated
,:usage_metric_bandwidth ,:usage_metric_bandwidth
,:usage_metric_storage ,:usage_metric_storage
,:usage_metric_pushes
,:usage_metric_version ,:usage_metric_version
) )
ON CONFLICT (usage_metric_space_id, usage_metric_date) ON CONFLICT (usage_metric_space_id, usage_metric_date)
DO UPDATE DO UPDATE
SET SET
@ -87,6 +100,7 @@ func (s *UsageMetricsStore) Upsert(ctx context.Context, in *types.UsageMetric) e
,usage_metric_updated = EXCLUDED.usage_metric_updated ,usage_metric_updated = EXCLUDED.usage_metric_updated
,usage_metric_bandwidth = usage_metrics.usage_metric_bandwidth + EXCLUDED.usage_metric_bandwidth ,usage_metric_bandwidth = usage_metrics.usage_metric_bandwidth + EXCLUDED.usage_metric_bandwidth
,usage_metric_storage = usage_metrics.usage_metric_storage + EXCLUDED.usage_metric_storage ,usage_metric_storage = usage_metrics.usage_metric_storage + EXCLUDED.usage_metric_storage
,usage_metric_pushes = usage_metrics.usage_metric_pushes + EXCLUDED.usage_metric_pushes
WHERE usage_metrics.usage_metric_version = EXCLUDED.usage_metric_version - 1` WHERE usage_metrics.usage_metric_version = EXCLUDED.usage_metric_version - 1`
db := dbtx.GetAccessor(ctx, s.db) db := dbtx.GetAccessor(ctx, s.db)
@ -98,6 +112,7 @@ func (s *UsageMetricsStore) Upsert(ctx context.Context, in *types.UsageMetric) e
Updated: time.Now().UnixMilli(), Updated: time.Now().UnixMilli(),
Bandwidth: in.Bandwidth, Bandwidth: in.Bandwidth,
Storage: in.Storage, Storage: in.Storage,
Pushes: in.Pushes,
Version: s.getVersion(ctx, in.RootSpaceID, today) + 1, Version: s.getVersion(ctx, in.RootSpaceID, today) + 1,
}) })
if err != nil { if err != nil {
@ -143,9 +158,10 @@ func (s *UsageMetricsStore) GetMetrics(
const sqlQuery = ` const sqlQuery = `
SELECT SELECT
COALESCE(SUM(usage_metric_bandwidth), 0) AS usage_metric_bandwidth, COALESCE(SUM(usage_metric_bandwidth), 0) AS usage_metric_bandwidth,
COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage,
COALESCE(SUM(usage_metric_pushes), 0) AS usage_metric_pushes
FROM usage_metrics FROM usage_metrics
WHERE WHERE
usage_metric_space_id = $1 AND usage_metric_space_id = $1 AND
usage_metric_date BETWEEN $2 AND $3` usage_metric_date BETWEEN $2 AND $3`
@ -165,6 +181,7 @@ func (s *UsageMetricsStore) GetMetrics(
).Scan( ).Scan(
&result.Bandwidth, &result.Bandwidth,
&result.Storage, &result.Storage,
&result.Pushes,
) )
if err != nil { if err != nil {
return nil, database.ProcessSQLErrorf(ctx, err, "failed to get metric") return nil, database.ProcessSQLErrorf(ctx, err, "failed to get metric")
@ -182,12 +199,13 @@ func (s *UsageMetricsStore) List(
SELECT SELECT
usage_metric_space_id, usage_metric_space_id,
COALESCE(SUM(usage_metric_bandwidth), 0) AS usage_metric_bandwidth, COALESCE(SUM(usage_metric_bandwidth), 0) AS usage_metric_bandwidth,
COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage,
COALESCE(SUM(usage_metric_pushes), 0) AS usage_metric_pushes
FROM usage_metrics FROM usage_metrics
WHERE WHERE
usage_metric_date BETWEEN $1 AND $2 usage_metric_date BETWEEN $1 AND $2
GROUP BY usage_metric_space_id GROUP BY usage_metric_space_id
ORDER BY usage_metric_bandwidth DESC, usage_metric_storage DESC` ORDER BY usage_metric_bandwidth DESC, usage_metric_storage DESC, usage_metric_pushes DESC`
startTime := time.UnixMilli(start) startTime := time.UnixMilli(start)
endTime := time.UnixMilli(end) endTime := time.UnixMilli(end)
@ -195,7 +213,7 @@ func (s *UsageMetricsStore) List(
db := dbtx.GetAccessor(ctx, s.db) db := dbtx.GetAccessor(ctx, s.db)
rows, err := db.QueryContext(ctx, sqlQuery, s.Date(startTime), s.Date(endTime)) rows, err := db.QueryContext(ctx, sqlQuery, s.Date(startTime), s.Date(endTime))
if err != nil { if err != nil {
return nil, database.ProcessSQLErrorf(ctx, err, "failed to list usage_metrics") return nil, database.ProcessSQLErrorf(ctx, err, "failed to list usage metrics")
} }
defer rows.Close() defer rows.Close()
@ -206,6 +224,7 @@ func (s *UsageMetricsStore) List(
&metric.RootSpaceID, &metric.RootSpaceID,
&metric.Bandwidth, &metric.Bandwidth,
&metric.Storage, &metric.Storage,
&metric.Pushes,
) )
if err != nil { if err != nil {
return nil, database.ProcessSQLErrorf(ctx, err, "failed to scan usage_metrics") return nil, database.ProcessSQLErrorf(ctx, err, "failed to scan usage_metrics")
@ -223,13 +242,3 @@ func (s *UsageMetricsStore) Date(t time.Time) int64 {
year, month, day := t.Date() year, month, day := t.Date()
return time.Date(year, month, day, 0, 0, 0, 0, time.UTC).UnixMilli() return time.Date(year, month, day, 0, 0, 0, 0, time.UTC).UnixMilli()
} }
type usageMetric struct {
RootSpaceID int64 `db:"usage_metric_space_id"`
Date int64 `db:"usage_metric_date"`
Created int64 `db:"usage_metric_created"`
Updated int64 `db:"usage_metric_updated"`
Bandwidth int64 `db:"usage_metric_bandwidth"`
Storage int64 `db:"usage_metric_storage"`
Version int64 `db:"usage_metric_version"`
}

View File

@ -43,6 +43,7 @@ func TestUsageMetricsStore_Upsert(t *testing.T) {
RootSpaceID: 1, RootSpaceID: 1,
Bandwidth: 100, Bandwidth: 100,
Storage: 100, Storage: 100,
Pushes: 21,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -51,6 +52,7 @@ func TestUsageMetricsStore_Upsert(t *testing.T) {
RootSpaceID: 1, RootSpaceID: 1,
Bandwidth: 100, Bandwidth: 100,
Storage: 0, Storage: 0,
Pushes: 3,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -60,7 +62,8 @@ func TestUsageMetricsStore_Upsert(t *testing.T) {
usage_metric_space_id, usage_metric_space_id,
usage_metric_date, usage_metric_date,
usage_metric_bandwidth, usage_metric_bandwidth,
usage_metric_storage usage_metric_storage,
usage_metric_pushes
FROM usage_metrics FROM usage_metrics
WHERE usage_metric_space_id = ? WHERE usage_metric_space_id = ?
LIMIT 1`, LIMIT 1`,
@ -73,12 +76,14 @@ func TestUsageMetricsStore_Upsert(t *testing.T) {
&date, &date,
&metric.Bandwidth, &metric.Bandwidth,
&metric.Storage, &metric.Storage,
&metric.Pushes,
) )
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, int64(1), metric.RootSpaceID) require.Equal(t, int64(1), metric.RootSpaceID)
require.Equal(t, metricsStore.Date(time.Now()), date) require.Equal(t, metricsStore.Date(time.Now()), date)
require.Equal(t, int64(200), metric.Bandwidth) require.Equal(t, int64(200), metric.Bandwidth)
require.Equal(t, int64(100), metric.Storage) require.Equal(t, int64(100), metric.Storage)
require.Equal(t, int64(24), metric.Pushes)
} }
func TestUsageMetricsStore_UpsertOptimistic(t *testing.T) { func TestUsageMetricsStore_UpsertOptimistic(t *testing.T) {
@ -102,6 +107,7 @@ func TestUsageMetricsStore_UpsertOptimistic(t *testing.T) {
RootSpaceID: 1, RootSpaceID: 1,
Bandwidth: 100, Bandwidth: 100,
Storage: 100, Storage: 100,
Pushes: 21,
}) })
}) })
} }
@ -115,6 +121,7 @@ func TestUsageMetricsStore_UpsertOptimistic(t *testing.T) {
require.Equal(t, int64(100*100), metric.Bandwidth) require.Equal(t, int64(100*100), metric.Bandwidth)
require.Equal(t, int64(100*100), metric.Storage) require.Equal(t, int64(100*100), metric.Storage)
require.Equal(t, int64(21*100), metric.Pushes)
} }
func TestUsageMetricsStore_GetMetrics(t *testing.T) { func TestUsageMetricsStore_GetMetrics(t *testing.T) {
@ -134,6 +141,7 @@ func TestUsageMetricsStore_GetMetrics(t *testing.T) {
RootSpaceID: 1, RootSpaceID: 1,
Bandwidth: 100, Bandwidth: 100,
Storage: 100, Storage: 100,
Pushes: 21,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -145,6 +153,7 @@ func TestUsageMetricsStore_GetMetrics(t *testing.T) {
require.Equal(t, int64(1), metric.RootSpaceID, "expected spaceID = %d, got %d", 1, metric.RootSpaceID) require.Equal(t, int64(1), metric.RootSpaceID, "expected spaceID = %d, got %d", 1, metric.RootSpaceID)
require.Equal(t, int64(100), metric.Bandwidth, "expected bandwidth = %d, got %d", 100, metric.Bandwidth) require.Equal(t, int64(100), metric.Bandwidth, "expected bandwidth = %d, got %d", 100, metric.Bandwidth)
require.Equal(t, int64(100), metric.Storage, "expected storage = %d, got %d", 100, metric.Storage) require.Equal(t, int64(100), metric.Storage, "expected storage = %d, got %d", 100, metric.Storage)
require.Equal(t, int64(21), metric.Pushes, "expected pushes = %d, got %d", 21, metric.Pushes)
} }
func TestUsageMetricsStore_List(t *testing.T) { func TestUsageMetricsStore_List(t *testing.T) {
@ -164,6 +173,7 @@ func TestUsageMetricsStore_List(t *testing.T) {
RootSpaceID: 1, RootSpaceID: 1,
Bandwidth: 100, Bandwidth: 100,
Storage: 100, Storage: 100,
Pushes: 21,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -171,6 +181,7 @@ func TestUsageMetricsStore_List(t *testing.T) {
RootSpaceID: 1, RootSpaceID: 1,
Bandwidth: 50, Bandwidth: 50,
Storage: 50, Storage: 50,
Pushes: 21,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -178,6 +189,7 @@ func TestUsageMetricsStore_List(t *testing.T) {
RootSpaceID: 2, RootSpaceID: 2,
Bandwidth: 200, Bandwidth: 200,
Storage: 200, Storage: 200,
Pushes: 21,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -188,4 +200,13 @@ func TestUsageMetricsStore_List(t *testing.T) {
// list use desc order so first row should be spaceID = 2 // list use desc order so first row should be spaceID = 2
require.Equal(t, int64(2), metrics[0].RootSpaceID) require.Equal(t, int64(2), metrics[0].RootSpaceID)
require.Equal(t, int64(200), metrics[0].Bandwidth)
require.Equal(t, int64(200), metrics[0].Storage)
require.Equal(t, int64(21), metrics[0].Pushes)
// second row should be spaceID = 1
require.Equal(t, int64(1), metrics[1].RootSpaceID)
require.Equal(t, int64(150), metrics[1].Bandwidth)
require.Equal(t, int64(150), metrics[1].Storage)
require.Equal(t, int64(42), metrics[1].Pushes)
} }

View File

@ -557,7 +557,14 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
rpmHandler := api2.NewRpmHandlerProvider(rpmController, packagesHandler) rpmHandler := api2.NewRpmHandlerProvider(rpmController, packagesHandler)
handler4 := router.PackageHandlerProvider(packagesHandler, mavenHandler, genericHandler, pythonHandler, nugetHandler, npmHandler, rpmHandler) handler4 := router.PackageHandlerProvider(packagesHandler, mavenHandler, genericHandler, pythonHandler, nugetHandler, npmHandler, rpmHandler)
appRouter := router.AppRouterProvider(registryOCIHandler, apiHandler, handler2, handler3, handler4) appRouter := router.AppRouterProvider(registryOCIHandler, apiHandler, handler2, handler3, handler4)
sender := usage.ProvideMediator(ctx, config, spaceFinder, usageMetricStore) readerFactory3, err := events3.ProvideReaderFactory(eventsSystem)
if err != nil {
return nil, err
}
sender, err := usage.ProvideMediator(ctx, config, spaceFinder, repoFinder, usageMetricStore, readerFactory3)
if err != nil {
return nil, err
}
routerRouter := router2.ProvideRouter(ctx, config, authenticator, repoController, reposettingsController, executionController, logsController, spaceController, pipelineController, secretController, triggerController, connectorController, templateController, pluginController, pullreqController, webhookController, githookController, gitInterface, serviceaccountController, controller, principalController, usergroupController, checkController, systemController, uploadController, keywordsearchController, infraproviderController, gitspaceController, migrateController, provider, openapiService, appRouter, sender, lfsController) routerRouter := router2.ProvideRouter(ctx, config, authenticator, repoController, reposettingsController, executionController, logsController, spaceController, pipelineController, secretController, triggerController, connectorController, templateController, pluginController, pullreqController, webhookController, githookController, gitInterface, serviceaccountController, controller, principalController, usergroupController, checkController, systemController, uploadController, keywordsearchController, infraproviderController, gitspaceController, migrateController, provider, openapiService, appRouter, sender, lfsController)
serverServer := server2.ProvideServer(config, routerRouter) serverServer := server2.ProvideServer(config, routerRouter)
publickeyService := publickey.ProvidePublicKey(publicKeyStore, principalInfoCache) publickeyService := publickey.ProvidePublicKey(publicKeyStore, principalInfoCache)
@ -579,11 +586,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
if err != nil { if err != nil {
return nil, err return nil, err
} }
readerFactory3, err := events2.ProvideReaderFactory(eventsSystem) readerFactory4, err := events2.ProvideReaderFactory(eventsSystem)
if err != nil {
return nil, err
}
readerFactory4, err := events3.ProvideReaderFactory(eventsSystem)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -591,7 +594,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
if err != nil { if err != nil {
return nil, err return nil, err
} }
submitter, err := metric.ProvideSubmitter(ctx, config, values, principalStore, principalInfoCache, pullReqStore, ruleStore, readerFactory3, readerFactory4, eventsReaderFactory, readerFactory5, publicaccessService, spaceFinder, repoFinder) submitter, err := metric.ProvideSubmitter(ctx, config, values, principalStore, principalInfoCache, pullReqStore, ruleStore, readerFactory4, readerFactory3, eventsReaderFactory, readerFactory5, publicaccessService, spaceFinder, repoFinder)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -603,7 +606,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
if err != nil { if err != nil {
return nil, err return nil, err
} }
repoService, err := repo2.ProvideService(ctx, config, eventsReporter, readerFactory4, repoStore, provider, gitInterface, lockerLocker) repoService, err := repo2.ProvideService(ctx, config, eventsReporter, readerFactory3, repoStore, provider, gitInterface, lockerLocker)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -620,7 +623,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
return nil, err return nil, err
} }
keywordsearchConfig := server.ProvideKeywordSearchConfig(config) keywordsearchConfig := server.ProvideKeywordSearchConfig(config)
keywordsearchService, err := keywordsearch.ProvideService(ctx, keywordsearchConfig, readerFactory, readerFactory4, repoStore, indexer) keywordsearchService, err := keywordsearch.ProvideService(ctx, keywordsearchConfig, readerFactory, readerFactory3, repoStore, indexer)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -18,4 +18,5 @@ type UsageMetric struct {
RootSpaceID int64 `json:"root_space_id"` RootSpaceID int64 `json:"root_space_id"`
Bandwidth int64 `json:"bandwidth"` Bandwidth int64 `json:"bandwidth"`
Storage int64 `json:"storage"` Storage int64 `json:"storage"`
Pushes int64 `json:"pushes"`
} }