move job package to the top level (#883)

This commit is contained in:
Atefeh Mohseni-Ejiyeh 2023-12-08 17:06:44 +00:00 committed by Harness
parent b1bb7f0a0f
commit 5aec7bf37f
33 changed files with 315 additions and 223 deletions

View File

@ -23,7 +23,7 @@ import (
"github.com/harness/gitness/app/api/usererror"
"github.com/harness/gitness/app/auth"
"github.com/harness/gitness/app/services/importer"
"github.com/harness/gitness/types"
"github.com/harness/gitness/job"
"github.com/harness/gitness/types/enum"
)
@ -31,23 +31,23 @@ import (
func (c *Controller) ImportProgress(ctx context.Context,
session *auth.Session,
repoRef string,
) (types.JobProgress, error) {
) (job.Progress, error) {
// note: can't use c.getRepoCheckAccess because this needs to fetch a repo being imported.
repo, err := c.repoStore.FindByRef(ctx, repoRef)
if err != nil {
return types.JobProgress{}, err
return job.Progress{}, err
}
if err = apiauth.CheckRepo(ctx, c.authorizer, session, repo, enum.PermissionRepoView, false); err != nil {
return types.JobProgress{}, err
return job.Progress{}, err
}
progress, err := c.importer.GetProgress(ctx, repo)
if errors.Is(err, importer.ErrNotFound) {
return types.JobProgress{}, usererror.NotFound("No recent or ongoing import found for repository.")
return job.Progress{}, usererror.NotFound("No recent or ongoing import found for repository.")
}
if err != nil {
return types.JobProgress{}, fmt.Errorf("failed to retrieve import progress: %w", err)
return job.Progress{}, fmt.Errorf("failed to retrieve import progress: %w", err)
}
return progress, err

View File

@ -22,14 +22,14 @@ import (
"github.com/harness/gitness/app/api/usererror"
"github.com/harness/gitness/app/auth"
"github.com/harness/gitness/app/services/exporter"
"github.com/harness/gitness/types"
"github.com/harness/gitness/job"
"github.com/harness/gitness/types/enum"
"github.com/pkg/errors"
)
type ExportProgressOutput struct {
Repos []types.JobProgress `json:"repos"`
Repos []job.Progress `json:"repos"`
}
// ExportProgress returns progress of the export job.

View File

@ -20,8 +20,8 @@ import (
"fmt"
"time"
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/job"
)
type Config struct {

View File

@ -19,8 +19,8 @@ import (
"fmt"
"time"
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/job"
"github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"

View File

@ -19,8 +19,8 @@ import (
"fmt"
"time"
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/job"
"github.com/rs/zerolog/log"
)

View File

@ -15,8 +15,8 @@
package cleanup
import (
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/job"
"github.com/google/wire"
)

View File

@ -25,12 +25,12 @@ import (
"time"
"github.com/harness/gitness/app/api/controller/repo"
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/sse"
"github.com/harness/gitness/app/store"
gitnessurl "github.com/harness/gitness/app/url"
"github.com/harness/gitness/encrypt"
"github.com/harness/gitness/git"
"github.com/harness/gitness/job"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
@ -146,7 +146,7 @@ func (r *Repository) RunManyForSpace(
return r.scheduler.RunJobs(ctx, jobGroupID, jobDefinitions)
}
func checkJobAlreadyRunning(jobs []types.JobProgress) error {
func checkJobAlreadyRunning(jobs []job.Progress) error {
if jobs == nil {
return nil
}
@ -251,7 +251,7 @@ func (r *Repository) getJobInput(data string) (Input, error) {
return input, nil
}
func (r *Repository) GetProgressForSpace(ctx context.Context, spaceID int64) ([]types.JobProgress, error) {
func (r *Repository) GetProgressForSpace(ctx context.Context, spaceID int64) ([]job.Progress, error) {
groupID := getJobGroupID(spaceID)
progress, err := r.scheduler.GetJobProgressForGroup(ctx, groupID)
if err != nil {

View File

@ -15,12 +15,12 @@
package exporter
import (
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/sse"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/app/url"
"github.com/harness/gitness/encrypt"
"github.com/harness/gitness/git"
"github.com/harness/gitness/job"
"github.com/google/wire"
)

View File

@ -26,13 +26,13 @@ import (
"github.com/harness/gitness/app/bootstrap"
"github.com/harness/gitness/app/githook"
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/services/keywordsearch"
"github.com/harness/gitness/app/sse"
"github.com/harness/gitness/app/store"
gitnessurl "github.com/harness/gitness/app/url"
"github.com/harness/gitness/encrypt"
"github.com/harness/gitness/git"
"github.com/harness/gitness/job"
gitness_store "github.com/harness/gitness/store"
"github.com/harness/gitness/store/database/dbtx"
"github.com/harness/gitness/types"
@ -328,7 +328,7 @@ func (r *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo
return "", nil
}
func (r *Repository) GetProgress(ctx context.Context, repo *types.Repository) (types.JobProgress, error) {
func (r *Repository) GetProgress(ctx context.Context, repo *types.Repository) (job.Progress, error) {
progress, err := r.scheduler.GetJobProgress(ctx, JobIDFromRepoID(repo.ID))
if errors.Is(err, gitness_store.ErrResourceNotFound) {
if repo.Importing {
@ -337,10 +337,10 @@ func (r *Repository) GetProgress(ctx context.Context, repo *types.Repository) (t
}
// otherwise there either was no import, or it completed a long time ago (job cleaned up by now)
return types.JobProgress{}, ErrNotFound
return job.Progress{}, ErrNotFound
}
if err != nil {
return types.JobProgress{}, fmt.Errorf("failed to get job progress: %w", err)
return job.Progress{}, fmt.Errorf("failed to get job progress: %w", err)
}
return progress, nil

View File

@ -15,13 +15,13 @@
package importer
import (
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/services/keywordsearch"
"github.com/harness/gitness/app/sse"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/app/url"
"github.com/harness/gitness/encrypt"
"github.com/harness/gitness/git"
"github.com/harness/gitness/job"
"github.com/harness/gitness/store/database/dbtx"
"github.com/harness/gitness/types"

View File

@ -22,8 +22,8 @@ import (
"net/http"
"time"
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/job"
"github.com/harness/gitness/types"
"github.com/harness/gitness/version"
)

View File

@ -15,8 +15,8 @@
package metric
import (
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/job"
"github.com/harness/gitness/types"
"github.com/google/wire"

View File

@ -16,13 +16,13 @@ package services
import (
"github.com/harness/gitness/app/services/cleanup"
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/services/keywordsearch"
"github.com/harness/gitness/app/services/metric"
"github.com/harness/gitness/app/services/notification"
"github.com/harness/gitness/app/services/pullreq"
"github.com/harness/gitness/app/services/trigger"
"github.com/harness/gitness/app/services/webhook"
"github.com/harness/gitness/job"
"github.com/google/wire"
)

View File

@ -478,49 +478,6 @@ type (
// Delete removes a required status checks for a repo.
Delete(ctx context.Context, repoID, reqCheckID int64) error
}
JobStore interface {
// Find fetches a job by its unique identifier.
Find(ctx context.Context, uid string) (*types.Job, error)
// ListByGroupID fetches all jobs for a group id
ListByGroupID(ctx context.Context, groupID string) ([]*types.Job, error)
// DeleteByGroupID deletes all jobs for a group id
DeleteByGroupID(ctx context.Context, groupID string) (int64, error)
// Create is used to create a new job.
Create(ctx context.Context, job *types.Job) error
// Upsert will insert the job in the database if the job didn't already exist,
// or it will update the existing one but only if its definition has changed.
Upsert(ctx context.Context, job *types.Job) error
// UpdateDefinition is used to update a job definition.
UpdateDefinition(ctx context.Context, job *types.Job) error
// UpdateExecution is used to update a job before and after execution.
UpdateExecution(ctx context.Context, job *types.Job) error
// UpdateProgress is used to update a job progress data.
UpdateProgress(ctx context.Context, job *types.Job) error
// CountRunning returns number of jobs that are currently being run.
CountRunning(ctx context.Context) (int, error)
// ListReady returns a list of jobs that are ready for execution.
ListReady(ctx context.Context, now time.Time, limit int) ([]*types.Job, error)
// ListDeadlineExceeded returns a list of jobs that have exceeded their execution deadline.
ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*types.Job, error)
// NextScheduledTime returns a scheduled time of the next ready job.
NextScheduledTime(ctx context.Context, now time.Time) (time.Time, error)
// DeleteOld removes non-recurring jobs that have finished execution or have failed.
DeleteOld(ctx context.Context, olderThan time.Time) (int64, error)
}
PipelineStore interface {
// Find returns a pipeline given a pipeline ID from the datastore.
Find(ctx context.Context, id int64) (*types.Pipeline, error)

View File

@ -21,17 +21,16 @@ import (
"fmt"
"time"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/job"
gitness_store "github.com/harness/gitness/store"
"github.com/harness/gitness/store/database"
"github.com/harness/gitness/store/database/dbtx"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/jmoiron/sqlx"
)
var _ store.JobStore = (*JobStore)(nil)
var _ job.Store = (*JobStore)(nil)
func NewJobStore(db *sqlx.DB) *JobStore {
return &JobStore{
@ -73,13 +72,13 @@ const (
)
// Find fetches a job by its unique identifier.
func (s *JobStore) Find(ctx context.Context, uid string) (*types.Job, error) {
func (s *JobStore) Find(ctx context.Context, uid string) (*job.Job, error) {
const sqlQuery = jobSelectBase + `
WHERE job_uid = $1`
db := dbtx.GetAccessor(ctx, s.db)
result := &types.Job{}
result := &job.Job{}
if err := db.GetContext(ctx, result, sqlQuery, uid); err != nil {
return nil, database.ProcessSQLErrorf(err, "Failed to find job by uid")
}
@ -114,13 +113,13 @@ func (s *JobStore) DeleteByGroupID(ctx context.Context, groupID string) (int64,
}
// ListByGroupID fetches all jobs for a group id.
func (s *JobStore) ListByGroupID(ctx context.Context, groupID string) ([]*types.Job, error) {
func (s *JobStore) ListByGroupID(ctx context.Context, groupID string) ([]*job.Job, error) {
const sqlQuery = jobSelectBase + `
WHERE job_group_id = $1`
db := dbtx.GetAccessor(ctx, s.db)
dst := make([]*types.Job, 0)
dst := make([]*job.Job, 0)
if err := db.SelectContext(ctx, &dst, sqlQuery, groupID); err != nil {
return nil, database.ProcessSQLErrorf(err, "Failed to find job by group id")
}
@ -129,7 +128,7 @@ func (s *JobStore) ListByGroupID(ctx context.Context, groupID string) ([]*types.
}
// Create creates a new job.
func (s *JobStore) Create(ctx context.Context, job *types.Job) error {
func (s *JobStore) Create(ctx context.Context, job *job.Job) error {
const sqlQuery = `
INSERT INTO jobs (` + jobColumns + `
) VALUES (
@ -172,7 +171,7 @@ func (s *JobStore) Create(ctx context.Context, job *types.Job) error {
// Upsert creates or updates a job. If the job didn't exist it will insert it in the database,
// otherwise it will update it but only if its definition has changed.
func (s *JobStore) Upsert(ctx context.Context, job *types.Job) error {
func (s *JobStore) Upsert(ctx context.Context, job *job.Job) error {
const sqlQuery = `
INSERT INTO jobs (` + jobColumns + `
) VALUES (
@ -235,7 +234,7 @@ func (s *JobStore) Upsert(ctx context.Context, job *types.Job) error {
}
// UpdateDefinition is used to update a job definition.
func (s *JobStore) UpdateDefinition(ctx context.Context, job *types.Job) error {
func (s *JobStore) UpdateDefinition(ctx context.Context, job *job.Job) error {
const sqlQuery = `
UPDATE jobs
SET
@ -278,7 +277,7 @@ func (s *JobStore) UpdateDefinition(ctx context.Context, job *types.Job) error {
}
// UpdateExecution is used to update a job before and after execution.
func (s *JobStore) UpdateExecution(ctx context.Context, job *types.Job) error {
func (s *JobStore) UpdateExecution(ctx context.Context, job *job.Job) error {
const sqlQuery = `
UPDATE jobs
SET
@ -318,7 +317,7 @@ func (s *JobStore) UpdateExecution(ctx context.Context, job *types.Job) error {
return nil
}
func (s *JobStore) UpdateProgress(ctx context.Context, job *types.Job) error {
func (s *JobStore) UpdateProgress(ctx context.Context, job *job.Job) error {
const sqlQuery = `
UPDATE jobs
SET
@ -376,7 +375,7 @@ func (s *JobStore) CountRunning(ctx context.Context) (int, error) {
// ListReady returns a list of jobs that are ready for execution:
// The jobs with state="scheduled" and scheduled time in the past.
func (s *JobStore) ListReady(ctx context.Context, now time.Time, limit int) ([]*types.Job, error) {
func (s *JobStore) ListReady(ctx context.Context, now time.Time, limit int) ([]*job.Job, error) {
stmt := database.Builder.
Select(jobColumns).
From("jobs").
@ -390,7 +389,7 @@ func (s *JobStore) ListReady(ctx context.Context, now time.Time, limit int) ([]*
return nil, fmt.Errorf("failed to convert list scheduled jobs query to sql: %w", err)
}
result := make([]*types.Job, 0)
result := make([]*job.Job, 0)
db := dbtx.GetAccessor(ctx, s.db)
@ -402,7 +401,7 @@ func (s *JobStore) ListReady(ctx context.Context, now time.Time, limit int) ([]*
}
// ListDeadlineExceeded returns a list of jobs that have exceeded their execution deadline.
func (s *JobStore) ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*types.Job, error) {
func (s *JobStore) ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*job.Job, error) {
stmt := database.Builder.
Select(jobColumns).
From("jobs").
@ -415,7 +414,7 @@ func (s *JobStore) ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*
return nil, fmt.Errorf("failed to convert list overdue jobs query to sql: %w", err)
}
result := make([]*types.Job, 0)
result := make([]*job.Job, 0)
db := dbtx.GetAccessor(ctx, s.db)

View File

@ -19,6 +19,7 @@ import (
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/app/store/database/migrate"
"github.com/harness/gitness/job"
"github.com/harness/gitness/store/database"
"github.com/google/wire"
@ -120,7 +121,7 @@ func ProvideRuleStore(
}
// ProvideJobStore provides a job store.
func ProvideJobStore(db *sqlx.DB) store.JobStore {
func ProvideJobStore(db *sqlx.DB) job.Store {
return NewJobStore(db)
}

View File

@ -52,7 +52,6 @@ import (
"github.com/harness/gitness/app/services/codeowners"
"github.com/harness/gitness/app/services/exporter"
"github.com/harness/gitness/app/services/importer"
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/services/keywordsearch"
"github.com/harness/gitness/app/services/metric"
"github.com/harness/gitness/app/services/notification"
@ -75,6 +74,7 @@ import (
"github.com/harness/gitness/git"
"github.com/harness/gitness/git/adapter"
"github.com/harness/gitness/git/storage"
"github.com/harness/gitness/job"
"github.com/harness/gitness/livelog"
"github.com/harness/gitness/lock"
"github.com/harness/gitness/pubsub"

View File

@ -52,7 +52,6 @@ import (
"github.com/harness/gitness/app/services/codeowners"
"github.com/harness/gitness/app/services/exporter"
"github.com/harness/gitness/app/services/importer"
"github.com/harness/gitness/app/services/job"
"github.com/harness/gitness/app/services/keywordsearch"
"github.com/harness/gitness/app/services/metric"
"github.com/harness/gitness/app/services/notification"
@ -75,6 +74,7 @@ import (
"github.com/harness/gitness/git"
"github.com/harness/gitness/git/adapter"
"github.com/harness/gitness/git/storage"
"github.com/harness/gitness/job"
"github.com/harness/gitness/livelog"
"github.com/harness/gitness/lock"
"github.com/harness/gitness/pubsub"

View File

@ -17,9 +17,6 @@ package job
import (
"errors"
"time"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
)
type Definition struct {
@ -50,19 +47,19 @@ func (def *Definition) Validate() error {
return nil
}
func (def *Definition) toNewJob() *types.Job {
func (def *Definition) toNewJob() *Job {
nowMilli := time.Now().UnixMilli()
return &types.Job{
return &Job{
UID: def.UID,
Created: nowMilli,
Updated: nowMilli,
Type: def.Type,
Priority: enum.JobPriorityNormal,
Priority: JobPriorityNormal,
Data: def.Data,
Result: "",
MaxDurationSeconds: int(def.Timeout / time.Second),
MaxRetries: def.MaxRetries,
State: enum.JobStateScheduled,
State: JobStateScheduled,
Scheduled: nowMilli,
TotalExecutions: 0,
RunBy: "",

89
job/enum.go Normal file
View File

@ -0,0 +1,89 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package job
import (
"golang.org/x/exp/constraints"
"golang.org/x/exp/slices"
)
// State represents state of a background job.
type State string
// State enumeration.
const (
JobStateScheduled State = "scheduled"
JobStateRunning State = "running"
JobStateFinished State = "finished"
JobStateFailed State = "failed"
JobStateCanceled State = "canceled"
)
var jobStates = sortEnum([]State{
JobStateScheduled,
JobStateRunning,
JobStateFinished,
JobStateFailed,
JobStateCanceled,
})
func (State) Enum() []interface{} { return toInterfaceSlice(jobStates) }
func (s State) Sanitize() (State, bool) {
return Sanitize(s, GetAllJobStates)
}
func GetAllJobStates() ([]State, State) {
return jobStates, ""
}
// Priority represents priority of a background job.
type Priority int
// JobPriority enumeration.
const (
JobPriorityNormal Priority = 0
JobPriorityElevated Priority = 1
)
func (s State) IsCompleted() bool {
return s == JobStateFinished || s == JobStateFailed || s == JobStateCanceled
}
func sortEnum[T constraints.Ordered](slice []T) []T {
slices.Sort(slice)
return slice
}
func toInterfaceSlice[T interface{}](vals []T) []interface{} {
res := make([]interface{}, len(vals))
for i := range vals {
res[i] = vals[i]
}
return res
}
func Sanitize[E constraints.Ordered](element E, all func() ([]E, E)) (E, bool) {
allValues, defValue := all()
var empty E
if element == empty && defValue != empty {
return defValue, true
}
idx, exists := slices.BinarySearch(allValues, element)
if exists {
return allValues[idx], true
}
return defValue, false
}

View File

@ -21,10 +21,7 @@ import (
"runtime/debug"
"time"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/pubsub"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"
)
@ -34,7 +31,7 @@ import (
type Executor struct {
handlerMap map[string]Handler
handlerComplete bool
store store.JobStore
store Store
publisher pubsub.Publisher
}
@ -56,11 +53,11 @@ type Handler interface {
var errNoHandlerDefined = errors.New("no handler registered for the job type")
// NewExecutor creates new Executor.
func NewExecutor(jobStore store.JobStore, publisher pubsub.Publisher) *Executor {
func NewExecutor(store Store, publisher pubsub.Publisher) *Executor {
return &Executor{
handlerMap: make(map[string]Handler),
handlerComplete: false,
store: jobStore,
store: store,
publisher: publisher,
}
}
@ -124,19 +121,19 @@ func (e *Executor) exec(
return errors.New("progress must be between 0 and 100")
}
jobDummy := &types.Job{
jobDummy := &Job{
UID: jobUID,
Type: jobType,
Updated: time.Now().UnixMilli(),
Result: result,
State: enum.JobStateRunning,
State: JobStateRunning,
RunProgress: progress,
}
// This doesn't need to be behind the global lock because it only updates the single row.
// While a job is running no other process should touch it.
// Even this call will fail if the context deadline has been exceeded.
// The job parameter is a dummy types.Job object that just holds fields that should be updated.
// The job parameter is a dummy Job object that just holds fields that should be updated.
if err := e.store.UpdateProgress(ctx, jobDummy); err != nil {
return err
}
@ -152,9 +149,9 @@ func (e *Executor) exec(
return exec.Handle(ctx, input, progressReporter) // runs the job
}
func FailProgress() types.JobProgress {
return types.JobProgress{
State: enum.JobStateFailed,
func FailProgress() Progress {
return Progress{
State: JobStateFailed,
Progress: ProgressMax,
Result: "",
Failure: "",

View File

@ -19,9 +19,7 @@ import (
"fmt"
"time"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/lock"
"github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"
)
@ -33,14 +31,14 @@ const (
)
type jobOverdue struct {
store store.JobStore
store Store
mxManager lock.MutexManager
scheduler *Scheduler
}
func newJobOverdue(jobStore store.JobStore, mxManager lock.MutexManager, scheduler *Scheduler) *jobOverdue {
func newJobOverdue(store Store, mxManager lock.MutexManager, scheduler *Scheduler) *jobOverdue {
return &jobOverdue{
store: jobStore,
store: store,
mxManager: mxManager,
scheduler: scheduler,
}
@ -81,7 +79,7 @@ func (j *jobOverdue) Handle(ctx context.Context, _ string, _ ProgressReporter) (
return "", fmt.Errorf("failed update overdue job")
}
if job.State == enum.JobStateScheduled {
if job.State == JobStateScheduled {
scheduled := time.UnixMilli(job.Scheduled)
if minScheduled.IsZero() || minScheduled.After(scheduled) {
minScheduled = scheduled

View File

@ -19,7 +19,6 @@ import (
"fmt"
"time"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/lock"
"github.com/rs/zerolog/log"
@ -32,18 +31,18 @@ const (
)
type jobPurge struct {
store store.JobStore
store Store
mxManager lock.MutexManager
minOldAge time.Duration
}
func newJobPurge(jobStore store.JobStore, mxManager lock.MutexManager, minOldAge time.Duration) *jobPurge {
func newJobPurge(store Store, mxManager lock.MutexManager, minOldAge time.Duration) *jobPurge {
if minOldAge < 0 {
minOldAge = 0
}
return &jobPurge{
store: jobStore,
store: store,
mxManager: mxManager,
minOldAge: minOldAge,
}

View File

@ -21,7 +21,6 @@ import (
"fmt"
"github.com/harness/gitness/pubsub"
"github.com/harness/gitness/types"
)
const (
@ -29,8 +28,8 @@ const (
PubSubTopicStateChange = "gitness:job:state_change"
)
func encodeStateChange(job *types.Job) ([]byte, error) {
stateChange := &types.JobStateChange{
func encodeStateChange(job *Job) ([]byte, error) {
stateChange := &StateChange{
UID: job.UID,
Type: job.Type,
State: job.State,
@ -47,8 +46,8 @@ func encodeStateChange(job *types.Job) ([]byte, error) {
return buffer.Bytes(), nil
}
func DecodeStateChange(payload []byte) (*types.JobStateChange, error) {
stateChange := &types.JobStateChange{}
func DecodeStateChange(payload []byte) (*StateChange, error) {
stateChange := &StateChange{}
if err := gob.NewDecoder(bytes.NewReader(payload)).Decode(stateChange); err != nil {
return nil, err
}
@ -56,15 +55,15 @@ func DecodeStateChange(payload []byte) (*types.JobStateChange, error) {
return stateChange, nil
}
func publishStateChange(ctx context.Context, publisher pubsub.Publisher, job *types.Job) error {
func publishStateChange(ctx context.Context, publisher pubsub.Publisher, job *Job) error {
payload, err := encodeStateChange(job)
if err != nil {
return fmt.Errorf("failed to gob encode JobStateChange: %w", err)
return fmt.Errorf("failed to gob encode StateChange: %w", err)
}
err = publisher.Publish(ctx, PubSubTopicStateChange, payload)
if err != nil {
return fmt.Errorf("failed to publish JobStateChange: %w", err)
return fmt.Errorf("failed to publish StateChange: %w", err)
}
return nil

View File

@ -22,11 +22,8 @@ import (
"sync"
"time"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/lock"
"github.com/harness/gitness/pubsub"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/gorhill/cronexpr"
"github.com/rs/zerolog/log"
@ -35,7 +32,7 @@ import (
// Scheduler controls execution of background jobs.
type Scheduler struct {
// dependencies
store store.JobStore
store Store
executor *Executor
mxManager lock.MutexManager
pubsubService pubsub.PubSub
@ -54,7 +51,7 @@ type Scheduler struct {
}
func NewScheduler(
jobStore store.JobStore,
store Store,
executor *Executor,
mxManager lock.MutexManager,
pubsubService pubsub.PubSub,
@ -66,7 +63,7 @@ func NewScheduler(
maxRunning = 1
}
return &Scheduler{
store: jobStore,
store: store,
executor: executor,
mxManager: mxManager,
pubsubService: pubsubService,
@ -210,14 +207,14 @@ func (s *Scheduler) CancelJob(ctx context.Context, jobUID string) error {
return errors.New("can't cancel recurring jobs")
}
if job.State != enum.JobStateScheduled && job.State != enum.JobStateRunning {
if job.State != JobStateScheduled && job.State != JobStateRunning {
return nil // return no error if the job is already canceled or has finished or failed.
}
// first we update the job in the database...
job.Updated = time.Now().UnixMilli()
job.State = enum.JobStateCanceled
job.State = JobStateCanceled
err = s.store.UpdateExecution(ctx, job)
if err != nil {
@ -299,7 +296,7 @@ func (s *Scheduler) RunJobs(ctx context.Context, groupID string, defs []Definiti
return nil
}
jobs := make([]*types.Job, len(defs))
jobs := make([]*Job, len(defs))
for i, def := range defs {
if err := def.Validate(); err != nil {
return err
@ -416,7 +413,7 @@ func (s *Scheduler) availableSlots(ctx context.Context) (int, error) {
// runJob updates the job in the database and starts it in a separate goroutine.
// The function will also log the execution.
func (s *Scheduler) runJob(ctx context.Context, j *types.Job) {
func (s *Scheduler) runJob(ctx context.Context, j *Job) {
s.wgRunning.Add(1)
go func(ctx context.Context,
jobUID, jobType, jobData string,
@ -474,19 +471,19 @@ func (s *Scheduler) runJob(ctx context.Context, j *types.Job) {
}
switch job.State {
case enum.JobStateFinished:
case JobStateFinished:
logInfo.Msg("job successfully finished")
s.scheduleIfHaveMoreJobs()
case enum.JobStateFailed:
case JobStateFailed:
logInfo.Msg("job failed")
s.scheduleIfHaveMoreJobs()
case enum.JobStateCanceled:
case JobStateCanceled:
log.Ctx(ctx).Error().Msg("job canceled")
s.scheduleIfHaveMoreJobs()
case enum.JobStateScheduled:
case JobStateScheduled:
scheduledTime := time.UnixMilli(job.Scheduled)
logInfo.
Str("job.Scheduled", scheduledTime.Format(time.RFC3339Nano)).
@ -494,7 +491,7 @@ func (s *Scheduler) runJob(ctx context.Context, j *types.Job) {
s.scheduleProcessing(scheduledTime)
case enum.JobStateRunning:
case JobStateRunning:
log.Ctx(ctx).Error().Msg("should not happen; job still has state=running after finishing")
}
@ -505,8 +502,8 @@ func (s *Scheduler) runJob(ctx context.Context, j *types.Job) {
}(ctx, j.UID, j.Type, j.Data, j.RunDeadline)
}
// preExec updates the provided types.Job before execution.
func (s *Scheduler) preExec(job *types.Job) {
// preExec updates the provided Job before execution.
func (s *Scheduler) preExec(job *Job) {
if job.MaxDurationSeconds < 1 {
job.MaxDurationSeconds = 1
}
@ -519,7 +516,7 @@ func (s *Scheduler) preExec(job *types.Job) {
job.Updated = nowMilli
job.LastExecuted = nowMilli
job.State = enum.JobStateRunning
job.State = JobStateRunning
job.RunDeadline = execDeadline.UnixMilli()
job.RunBy = s.instanceID
job.RunProgress = ProgressMin
@ -528,7 +525,7 @@ func (s *Scheduler) preExec(job *types.Job) {
job.LastFailureError = ""
}
// doExec executes the provided types.Job.
// doExec executes the provided Job.
func (s *Scheduler) doExec(ctx context.Context,
jobUID, jobType, jobData string,
jobRunDeadline int64,
@ -561,14 +558,14 @@ func (s *Scheduler) doExec(ctx context.Context,
return
}
// postExec updates the provided types.Job after execution and reschedules it if necessary.
// postExec updates the provided Job after execution and reschedules it if necessary.
//
//nolint:gocognit // refactor if needed.
func postExec(job *types.Job, resultData, resultErr string) {
func postExec(job *Job, resultData, resultErr string) {
// Proceed with the update of the job if it's in the running state or
// if it's marked as canceled but has succeeded nonetheless.
// Other states should not happen, but if they do, just leave the job as it is.
if job.State != enum.JobStateRunning && (job.State != enum.JobStateCanceled || resultErr != "") {
if job.State != JobStateRunning && (job.State != JobStateCanceled || resultErr != "") {
return
}
@ -581,10 +578,10 @@ func postExec(job *types.Job, resultData, resultErr string) {
if resultErr != "" {
job.ConsecutiveFailures++
job.State = enum.JobStateFailed
job.State = JobStateFailed
job.LastFailureError = resultErr
} else {
job.State = enum.JobStateFinished
job.State = JobStateFinished
job.RunProgress = ProgressMax
}
@ -597,7 +594,7 @@ func postExec(job *types.Job, resultData, resultErr string) {
exp, err := cronexpr.Parse(job.RecurringCron)
if err != nil {
job.State = enum.JobStateFailed
job.State = JobStateFailed
messages := fmt.Sprintf("failed to parse cron string: %s", err.Error())
if job.LastFailureError != "" {
@ -606,7 +603,7 @@ func postExec(job *types.Job, resultData, resultErr string) {
job.LastFailureError = messages
} else {
job.State = enum.JobStateScheduled
job.State = JobStateScheduled
job.Scheduled = exp.Next(now).UnixMilli()
}
@ -614,24 +611,24 @@ func postExec(job *types.Job, resultData, resultErr string) {
}
// Reschedule the failed job if retrying is allowed
if job.State == enum.JobStateFailed && job.ConsecutiveFailures <= job.MaxRetries {
if job.State == JobStateFailed && job.ConsecutiveFailures <= job.MaxRetries {
const retryDelay = 15 * time.Second
job.State = enum.JobStateScheduled
job.State = JobStateScheduled
job.Scheduled = now.Add(retryDelay).UnixMilli()
job.RunProgress = ProgressMin
}
}
func (s *Scheduler) GetJobProgress(ctx context.Context, jobUID string) (types.JobProgress, error) {
func (s *Scheduler) GetJobProgress(ctx context.Context, jobUID string) (Progress, error) {
job, err := s.store.Find(ctx, jobUID)
if err != nil {
return types.JobProgress{}, err
return Progress{}, err
}
return mapToProgress(job), nil
}
func (s *Scheduler) GetJobProgressForGroup(ctx context.Context, jobGroupUID string) ([]types.JobProgress, error) {
func (s *Scheduler) GetJobProgressForGroup(ctx context.Context, jobGroupUID string) ([]Progress, error) {
job, err := s.store.ListByGroupID(ctx, jobGroupUID)
if err != nil {
return nil, err
@ -647,19 +644,19 @@ func (s *Scheduler) PurgeJobsByGroupID(ctx context.Context, jobGroupID string) (
return n, nil
}
func mapToProgressMany(jobs []*types.Job) []types.JobProgress {
func mapToProgressMany(jobs []*Job) []Progress {
if jobs == nil {
return nil
}
j := make([]types.JobProgress, len(jobs))
j := make([]Progress, len(jobs))
for i, job := range jobs {
j[i] = mapToProgress(job)
}
return j
}
func mapToProgress(job *types.Job) types.JobProgress {
return types.JobProgress{
func mapToProgress(job *Job) Progress {
return Progress{
State: job.State,
Progress: job.RunProgress,
Result: job.Result,
@ -684,17 +681,17 @@ func (s *Scheduler) AddRecurring(
nextExec := cronExp.Next(now)
job := &types.Job{
job := &Job{
UID: jobUID,
Created: nowMilli,
Updated: nowMilli,
Type: jobType,
Priority: enum.JobPriorityElevated,
Priority: JobPriorityElevated,
Data: "",
Result: "",
MaxDurationSeconds: int(maxDur / time.Second),
MaxRetries: 0,
State: enum.JobStateScheduled,
State: JobStateScheduled,
Scheduled: nextExec.UnixMilli(),
TotalExecutions: 0,
RunBy: "",

62
job/store.go Normal file
View File

@ -0,0 +1,62 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package job
import (
"context"
"time"
)
type Store interface {
// Find fetches a job by its unique identifier.
Find(ctx context.Context, uid string) (*Job, error)
// ListByGroupID fetches all jobs for a group id
ListByGroupID(ctx context.Context, groupID string) ([]*Job, error)
// DeleteByGroupID deletes all jobs for a group id
DeleteByGroupID(ctx context.Context, groupID string) (int64, error)
// Create is used to create a new job.
Create(ctx context.Context, job *Job) error
// Upsert will insert the job in the database if the job didn't already exist,
// or it will update the existing one but only if its definition has changed.
Upsert(ctx context.Context, job *Job) error
// UpdateDefinition is used to update a job definition.
UpdateDefinition(ctx context.Context, job *Job) error
// UpdateExecution is used to update a job before and after execution.
UpdateExecution(ctx context.Context, job *Job) error
// UpdateProgress is used to update a job progress data.
UpdateProgress(ctx context.Context, job *Job) error
// CountRunning returns number of jobs that are currently being run.
CountRunning(ctx context.Context) (int, error)
// ListReady returns a list of jobs that are ready for execution.
ListReady(ctx context.Context, now time.Time, limit int) ([]*Job, error)
// ListDeadlineExceeded returns a list of jobs that have exceeded their execution deadline.
ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*Job, error)
// NextScheduledTime returns a scheduled time of the next ready job.
NextScheduledTime(ctx context.Context, now time.Time) (time.Time, error)
// DeleteOld removes non-recurring jobs that have finished execution or have failed.
DeleteOld(ctx context.Context, olderThan time.Time) (int64, error)
}

55
job/types.go Normal file
View File

@ -0,0 +1,55 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package job
type Job struct {
UID string `db:"job_uid"`
Created int64 `db:"job_created"`
Updated int64 `db:"job_updated"`
Type string `db:"job_type"`
Priority Priority `db:"job_priority"`
Data string `db:"job_data"`
Result string `db:"job_result"`
MaxDurationSeconds int `db:"job_max_duration_seconds"`
MaxRetries int `db:"job_max_retries"`
State State `db:"job_state"`
Scheduled int64 `db:"job_scheduled"`
TotalExecutions int `db:"job_total_executions"`
RunBy string `db:"job_run_by"`
RunDeadline int64 `db:"job_run_deadline"`
RunProgress int `db:"job_run_progress"`
LastExecuted int64 `db:"job_last_executed"`
IsRecurring bool `db:"job_is_recurring"`
RecurringCron string `db:"job_recurring_cron"`
ConsecutiveFailures int `db:"job_consecutive_failures"`
LastFailureError string `db:"job_last_failure_error"`
GroupID string `db:"job_group_id"`
}
type StateChange struct {
UID string `json:"uid"`
Type string `json:"type"`
State State `json:"state"`
Progress int `json:"progress"`
Result string `json:"result"`
Failure string `json:"failure"`
}
type Progress struct {
State State `json:"state"`
Progress int `json:"progress"`
Result string `json:"result,omitempty"`
Failure string `json:"failure,omitempty"`
}

View File

@ -15,7 +15,6 @@
package job
import (
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/lock"
"github.com/harness/gitness/pubsub"
"github.com/harness/gitness/types"
@ -29,24 +28,24 @@ var WireSet = wire.NewSet(
)
func ProvideExecutor(
jobStore store.JobStore,
store Store,
pubsubService pubsub.PubSub,
) *Executor {
return NewExecutor(
jobStore,
store,
pubsubService,
)
}
func ProvideScheduler(
jobStore store.JobStore,
store Store,
executor *Executor,
mutexManager lock.MutexManager,
pubsubService pubsub.PubSub,
config *types.Config,
) (*Scheduler, error) {
return NewScheduler(
jobStore,
store,
executor,
mutexManager,
pubsubService,

View File

@ -1,57 +0,0 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import "github.com/harness/gitness/types/enum"
type Job struct {
UID string `db:"job_uid"`
Created int64 `db:"job_created"`
Updated int64 `db:"job_updated"`
Type string `db:"job_type"`
Priority enum.JobPriority `db:"job_priority"`
Data string `db:"job_data"`
Result string `db:"job_result"`
MaxDurationSeconds int `db:"job_max_duration_seconds"`
MaxRetries int `db:"job_max_retries"`
State enum.JobState `db:"job_state"`
Scheduled int64 `db:"job_scheduled"`
TotalExecutions int `db:"job_total_executions"`
RunBy string `db:"job_run_by"`
RunDeadline int64 `db:"job_run_deadline"`
RunProgress int `db:"job_run_progress"`
LastExecuted int64 `db:"job_last_executed"`
IsRecurring bool `db:"job_is_recurring"`
RecurringCron string `db:"job_recurring_cron"`
ConsecutiveFailures int `db:"job_consecutive_failures"`
LastFailureError string `db:"job_last_failure_error"`
GroupID string `db:"job_group_id"`
}
type JobStateChange struct {
UID string `json:"uid"`
Type string `json:"type"`
State enum.JobState `json:"state"`
Progress int `json:"progress"`
Result string `json:"result"`
Failure string `json:"failure"`
}
type JobProgress struct {
State enum.JobState `json:"state"`
Progress int `json:"progress"`
Result string `json:"result,omitempty"`
Failure string `json:"failure,omitempty"`
}