feat: [CODE-3514]: make mediator public (#3709)

This commit is contained in:
Johannes Batzill 2025-04-22 01:33:04 +00:00 committed by Harness
parent dbb193e834
commit 952a6d852f
4 changed files with 15 additions and 15 deletions

View File

@ -30,7 +30,7 @@ type RepoFinder interface {
FindByID(ctx context.Context, id int64) (*types.RepositoryCore, error) FindByID(ctx context.Context, id int64) (*types.RepositoryCore, error)
} }
func registerEventListeners( func RegisterEventListeners(
ctx context.Context, ctx context.Context,
instanceID string, instanceID string,
sender Sender, sender Sender,

View File

@ -59,7 +59,7 @@ type MetricStore interface {
) ([]types.UsageMetric, error) ) ([]types.UsageMetric, error)
} }
type mediator struct { type Mediator struct {
queue *queue queue *queue
workers []*worker workers []*worker
@ -72,13 +72,13 @@ type mediator struct {
config Config config Config
} }
func newMediator( func NewMediator(
ctx context.Context, ctx context.Context,
spaceFinder SpaceFinder, spaceFinder SpaceFinder,
usageMetricsStore MetricStore, usageMetricsStore MetricStore,
config Config, config Config,
) *mediator { ) *Mediator {
m := &mediator{ m := &Mediator{
queue: newQueue(), queue: newQueue(),
spaceFinder: spaceFinder, spaceFinder: spaceFinder,
metricsStore: usageMetricsStore, metricsStore: usageMetricsStore,
@ -91,7 +91,7 @@ func newMediator(
return m return m
} }
func (m *mediator) Start(ctx context.Context) { func (m *Mediator) Start(ctx context.Context) {
for i := range m.workers { for i := range m.workers {
w := newWorker(i, m.queue) w := newWorker(i, m.queue)
go w.start(ctx, m.process) go w.start(ctx, m.process)
@ -99,23 +99,23 @@ func (m *mediator) Start(ctx context.Context) {
} }
} }
func (m *mediator) Stop() { func (m *Mediator) Stop() {
for i := range m.workers { for i := range m.workers {
m.workers[i].stop() m.workers[i].stop()
} }
} }
func (m *mediator) Send(ctx context.Context, payload Metric) error { func (m *Mediator) Send(ctx context.Context, payload Metric) error {
m.wg.Add(1) m.wg.Add(1)
m.queue.Add(ctx, payload) m.queue.Add(ctx, payload)
return nil return nil
} }
func (m *mediator) Wait() { func (m *Mediator) Wait() {
m.wg.Wait() m.wg.Wait()
} }
func (m *mediator) Size(ctx context.Context, spaceRef string) (Bandwidth, error) { func (m *Mediator) Size(ctx context.Context, spaceRef string) (Bandwidth, error) {
space, err := m.spaceFinder.FindByRef(ctx, spaceRef) space, err := m.spaceFinder.FindByRef(ctx, spaceRef)
if err != nil { if err != nil {
return Bandwidth{}, fmt.Errorf("could not find space: %w", err) return Bandwidth{}, fmt.Errorf("could not find space: %w", err)
@ -131,7 +131,7 @@ func (m *mediator) Size(ctx context.Context, spaceRef string) (Bandwidth, error)
}, nil }, nil
} }
func (m *mediator) process(ctx context.Context, payload *Metric) { func (m *Mediator) process(ctx context.Context, payload *Metric) {
defer m.wg.Done() defer m.wg.Done()
space, err := m.spaceFinder.FindByRef(ctx, payload.SpaceRef) space, err := m.spaceFinder.FindByRef(ctx, payload.SpaceRef)

View File

@ -102,7 +102,7 @@ func TestMediator_basic(t *testing.T) {
numEventsCreated := 4 numEventsCreated := 4
numEventsPushed := 5 numEventsPushed := 5
defaultSize := 512 defaultSize := 512
mediator := newMediator( mediator := NewMediator(
context.Background(), context.Background(),
spaceFinderMock, spaceFinderMock,
usageMock, usageMock,
@ -110,7 +110,7 @@ func TestMediator_basic(t *testing.T) {
MaxWorkers: 5, MaxWorkers: 5,
}, },
) )
err = registerEventListeners(context.Background(), "test", mediator, repoEvReaderFactory, repoFinderMock) err = RegisterEventListeners(context.Background(), "test", mediator, repoEvReaderFactory, repoFinderMock)
if err != nil { if err != nil {
t.Fatalf("failed to register event listeners: %v", err) t.Fatalf("failed to register event listeners: %v", err)
} }

View File

@ -43,14 +43,14 @@ func ProvideMediator(
return &Noop{}, nil return &Noop{}, nil
} }
m := newMediator( m := NewMediator(
ctx, ctx,
spaceFinder, spaceFinder,
metricsStore, metricsStore,
NewConfig(config), NewConfig(config),
) )
if err := registerEventListeners(ctx, config.InstanceID, m, repoEvReaderFactory, repoFinder); err != nil { if err := RegisterEventListeners(ctx, config.InstanceID, m, repoEvReaderFactory, repoFinder); err != nil {
return nil, fmt.Errorf("failed to register event listeners: %w", err) return nil, fmt.Errorf("failed to register event listeners: %w", err)
} }