mirror of
https://github.com/goharbor/harbor
synced 2024-09-20 14:15:31 +00:00
Refresh the status of execution for every status changing of task
Refresh the status of execution for every status changing of task to support filtering executions by status directly Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
parent
2de10700d8
commit
0fd230c2d6
|
@ -13,6 +13,7 @@ CREATE TABLE IF NOT EXISTS execution (
|
||||||
extra_attrs JSON,
|
extra_attrs JSON,
|
||||||
start_time timestamp DEFAULT CURRENT_TIMESTAMP,
|
start_time timestamp DEFAULT CURRENT_TIMESTAMP,
|
||||||
end_time timestamp,
|
end_time timestamp,
|
||||||
|
revision int,
|
||||||
PRIMARY KEY (id)
|
PRIMARY KEY (id)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,10 @@ package dao
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/goharbor/harbor/src/lib/log"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
"github.com/goharbor/harbor/src/lib/errors"
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
"github.com/goharbor/harbor/src/lib/orm"
|
"github.com/goharbor/harbor/src/lib/orm"
|
||||||
"github.com/goharbor/harbor/src/lib/q"
|
"github.com/goharbor/harbor/src/lib/q"
|
||||||
|
@ -36,14 +39,23 @@ type ExecutionDAO interface {
|
||||||
Update(ctx context.Context, execution *Execution, props ...string) (err error)
|
Update(ctx context.Context, execution *Execution, props ...string) (err error)
|
||||||
// Delete the specified execution
|
// Delete the specified execution
|
||||||
Delete(ctx context.Context, id int64) (err error)
|
Delete(ctx context.Context, id int64) (err error)
|
||||||
|
// GetMetrics returns the task metrics for the specified execution
|
||||||
|
GetMetrics(ctx context.Context, id int64) (metrics *Metrics, err error)
|
||||||
|
// RefreshStatus refreshes the status of the specified execution according to it's tasks. If it's status
|
||||||
|
// is final, update the end time as well
|
||||||
|
RefreshStatus(ctx context.Context, id int64) (err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewExecutionDAO returns an instance of ExecutionDAO
|
// NewExecutionDAO returns an instance of ExecutionDAO
|
||||||
func NewExecutionDAO() ExecutionDAO {
|
func NewExecutionDAO() ExecutionDAO {
|
||||||
return &executionDAO{}
|
return &executionDAO{
|
||||||
|
taskDAO: NewTaskDAO(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type executionDAO struct{}
|
type executionDAO struct {
|
||||||
|
taskDAO TaskDAO
|
||||||
|
}
|
||||||
|
|
||||||
func (e *executionDAO) Count(ctx context.Context, query *q.Query) (int64, error) {
|
func (e *executionDAO) Count(ctx context.Context, query *q.Query) (int64, error) {
|
||||||
if query != nil {
|
if query != nil {
|
||||||
|
@ -132,3 +144,151 @@ func (e *executionDAO) Delete(ctx context.Context, id int64) error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *executionDAO) GetMetrics(ctx context.Context, id int64) (*Metrics, error) {
|
||||||
|
scs, err := e.taskDAO.ListStatusCount(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
metrics := &Metrics{}
|
||||||
|
if len(scs) == 0 {
|
||||||
|
return metrics, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, sc := range scs {
|
||||||
|
switch sc.Status {
|
||||||
|
case job.SuccessStatus.String():
|
||||||
|
metrics.SuccessTaskCount = sc.Count
|
||||||
|
case job.ErrorStatus.String():
|
||||||
|
metrics.ErrorTaskCount = sc.Count
|
||||||
|
case job.PendingStatus.String():
|
||||||
|
metrics.PendingTaskCount = sc.Count
|
||||||
|
case job.RunningStatus.String():
|
||||||
|
metrics.RunningTaskCount = sc.Count
|
||||||
|
case job.ScheduledStatus.String():
|
||||||
|
metrics.ScheduledTaskCount = sc.Count
|
||||||
|
case job.StoppedStatus.String():
|
||||||
|
metrics.StoppedTaskCount = sc.Count
|
||||||
|
default:
|
||||||
|
log.Errorf("unknown task status: %s", sc.Status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
metrics.TaskCount = metrics.SuccessTaskCount + metrics.ErrorTaskCount +
|
||||||
|
metrics.PendingTaskCount + metrics.RunningTaskCount +
|
||||||
|
metrics.ScheduledTaskCount + metrics.StoppedTaskCount
|
||||||
|
return metrics, nil
|
||||||
|
}
|
||||||
|
func (e *executionDAO) RefreshStatus(ctx context.Context, id int64) error {
|
||||||
|
// as the status of the execution can be refreshed by multiple operators concurrently
|
||||||
|
// we use the optimistic locking to avoid the conflict and retry 5 times at most
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
retry, err := e.refreshStatus(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !retry {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("failed to refresh the status of the execution %d after %d retries", id, 5)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *executionDAO) refreshStatus(ctx context.Context, id int64) (bool, error) {
|
||||||
|
execution, err := e.Get(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
metrics, err := e.GetMetrics(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
// no task, return directly
|
||||||
|
if metrics.TaskCount == 0 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var status string
|
||||||
|
if metrics.PendingTaskCount > 0 || metrics.RunningTaskCount > 0 || metrics.ScheduledTaskCount > 0 {
|
||||||
|
status = job.RunningStatus.String()
|
||||||
|
} else if metrics.ErrorTaskCount > 0 {
|
||||||
|
status = job.ErrorStatus.String()
|
||||||
|
} else if metrics.StoppedTaskCount > 0 {
|
||||||
|
status = job.StoppedStatus.String()
|
||||||
|
} else if metrics.SuccessTaskCount > 0 {
|
||||||
|
status = job.SuccessStatus.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
ormer, err := orm.FromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
sql := `update execution set status = ?, revision = revision+1 where id = ? and revision = ?`
|
||||||
|
result, err := ormer.Raw(sql, status, id, execution.Revision).Exec()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
n, err := result.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
// if the count of affected rows is 0, that means the execution is updating by others, retry
|
||||||
|
if n == 0 {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/* this is another solution to solve the concurrency issue for refreshing the execution status
|
||||||
|
// set a score for each status:
|
||||||
|
// pending, running, scheduled - 4
|
||||||
|
// error - 3
|
||||||
|
// stopped - 2
|
||||||
|
// success - 1
|
||||||
|
// and set the status of record with highest score as the status of execution
|
||||||
|
sql := `with status_score as (
|
||||||
|
select status,
|
||||||
|
case
|
||||||
|
when status='%s' or status='%s' or status='%s' then 4
|
||||||
|
when status='%s' then 3
|
||||||
|
when status='%s' then 2
|
||||||
|
when status='%s' then 1
|
||||||
|
else 0
|
||||||
|
end as score
|
||||||
|
from task
|
||||||
|
where execution_id=?
|
||||||
|
group by status
|
||||||
|
)
|
||||||
|
update execution
|
||||||
|
set status=(
|
||||||
|
select
|
||||||
|
case
|
||||||
|
when max(score)=4 then '%s'
|
||||||
|
when max(score)=3 then '%s'
|
||||||
|
when max(score)=2 then '%s'
|
||||||
|
when max(score)=1 then '%s'
|
||||||
|
when max(score)=0 then ''
|
||||||
|
end as status
|
||||||
|
from status_score)
|
||||||
|
where id = ?`
|
||||||
|
sql = fmt.Sprintf(sql, job.PendingStatus.String(), job.RunningStatus.String(), job.ScheduledStatus.String(),
|
||||||
|
job.ErrorStatus.String(), job.StoppedStatus.String(), job.SuccessStatus.String(),
|
||||||
|
job.RunningStatus.String(), job.ErrorStatus.String(), job.StoppedStatus.String(), job.SuccessStatus.String())
|
||||||
|
if _, err = ormer.Raw(sql, id, id).Exec(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
// update the end time if the status is final, otherwise set the end time as NULL, this is useful
|
||||||
|
// for retrying jobs
|
||||||
|
sql = `update execution
|
||||||
|
set end_time = (
|
||||||
|
case
|
||||||
|
when status='%s' or status='%s' or status='%s' then (
|
||||||
|
select max(end_time)
|
||||||
|
from task
|
||||||
|
where execution_id=?)
|
||||||
|
else NULL
|
||||||
|
end)
|
||||||
|
where id=?`
|
||||||
|
sql = fmt.Sprintf(sql, job.ErrorStatus.String(), job.StoppedStatus.String(), job.SuccessStatus.String())
|
||||||
|
_, err = ormer.Raw(sql, id, id).Exec()
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
|
@ -17,8 +17,10 @@ package dao
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/common/dao"
|
"github.com/goharbor/harbor/src/common/dao"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
"github.com/goharbor/harbor/src/lib/errors"
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
"github.com/goharbor/harbor/src/lib/orm"
|
"github.com/goharbor/harbor/src/lib/orm"
|
||||||
"github.com/goharbor/harbor/src/lib/q"
|
"github.com/goharbor/harbor/src/lib/q"
|
||||||
|
@ -29,13 +31,17 @@ type executionDAOTestSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
executionDAO *executionDAO
|
executionDAO *executionDAO
|
||||||
|
taskDao *taskDAO
|
||||||
executionID int64
|
executionID int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *executionDAOTestSuite) SetupSuite() {
|
func (e *executionDAOTestSuite) SetupSuite() {
|
||||||
dao.PrepareTestForPostgresSQL()
|
dao.PrepareTestForPostgresSQL()
|
||||||
e.ctx = orm.Context()
|
e.ctx = orm.Context()
|
||||||
e.executionDAO = &executionDAO{}
|
e.taskDao = &taskDAO{}
|
||||||
|
e.executionDAO = &executionDAO{
|
||||||
|
taskDAO: e.taskDao,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *executionDAOTestSuite) SetupTest() {
|
func (e *executionDAOTestSuite) SetupTest() {
|
||||||
|
@ -116,6 +122,163 @@ func (e *executionDAOTestSuite) TestDelete() {
|
||||||
// happy pass is covered by TearDownTest
|
// happy pass is covered by TearDownTest
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *executionDAOTestSuite) TestGetMetrics() {
|
||||||
|
taskID01, err := e.taskDao.Create(e.ctx, &Task{
|
||||||
|
ExecutionID: e.executionID,
|
||||||
|
Status: job.SuccessStatus.String(),
|
||||||
|
StatusCode: job.SuccessStatus.Code(),
|
||||||
|
ExtraAttrs: "{}",
|
||||||
|
})
|
||||||
|
e.Require().Nil(err)
|
||||||
|
defer e.taskDao.Delete(e.ctx, taskID01)
|
||||||
|
|
||||||
|
taskID02, err := e.taskDao.Create(e.ctx, &Task{
|
||||||
|
ExecutionID: e.executionID,
|
||||||
|
Status: job.StoppedStatus.String(),
|
||||||
|
StatusCode: job.StoppedStatus.Code(),
|
||||||
|
ExtraAttrs: "{}",
|
||||||
|
})
|
||||||
|
e.Require().Nil(err)
|
||||||
|
defer e.taskDao.Delete(e.ctx, taskID02)
|
||||||
|
|
||||||
|
taskID03, err := e.taskDao.Create(e.ctx, &Task{
|
||||||
|
ExecutionID: e.executionID,
|
||||||
|
Status: job.ErrorStatus.String(),
|
||||||
|
StatusCode: job.ErrorStatus.Code(),
|
||||||
|
ExtraAttrs: "{}",
|
||||||
|
})
|
||||||
|
e.Require().Nil(err)
|
||||||
|
defer e.taskDao.Delete(e.ctx, taskID03)
|
||||||
|
|
||||||
|
taskID04, err := e.taskDao.Create(e.ctx, &Task{
|
||||||
|
ExecutionID: e.executionID,
|
||||||
|
Status: job.PendingStatus.String(),
|
||||||
|
StatusCode: job.PendingStatus.Code(),
|
||||||
|
ExtraAttrs: "{}",
|
||||||
|
})
|
||||||
|
e.Require().Nil(err)
|
||||||
|
defer e.taskDao.Delete(e.ctx, taskID04)
|
||||||
|
|
||||||
|
taskID05, err := e.taskDao.Create(e.ctx, &Task{
|
||||||
|
ExecutionID: e.executionID,
|
||||||
|
Status: job.RunningStatus.String(),
|
||||||
|
StatusCode: job.RunningStatus.Code(),
|
||||||
|
ExtraAttrs: "{}",
|
||||||
|
})
|
||||||
|
e.Require().Nil(err)
|
||||||
|
defer e.taskDao.Delete(e.ctx, taskID05)
|
||||||
|
|
||||||
|
taskID06, err := e.taskDao.Create(e.ctx, &Task{
|
||||||
|
ExecutionID: e.executionID,
|
||||||
|
Status: job.ScheduledStatus.String(),
|
||||||
|
StatusCode: job.ScheduledStatus.Code(),
|
||||||
|
ExtraAttrs: "{}",
|
||||||
|
})
|
||||||
|
e.Require().Nil(err)
|
||||||
|
defer e.taskDao.Delete(e.ctx, taskID06)
|
||||||
|
|
||||||
|
metrics, err := e.executionDAO.GetMetrics(e.ctx, e.executionID)
|
||||||
|
e.Require().Nil(err)
|
||||||
|
e.Equal(int64(6), metrics.TaskCount)
|
||||||
|
e.Equal(int64(1), metrics.SuccessTaskCount)
|
||||||
|
e.Equal(int64(1), metrics.StoppedTaskCount)
|
||||||
|
e.Equal(int64(1), metrics.ErrorTaskCount)
|
||||||
|
e.Equal(int64(1), metrics.PendingTaskCount)
|
||||||
|
e.Equal(int64(1), metrics.RunningTaskCount)
|
||||||
|
e.Equal(int64(1), metrics.ScheduledTaskCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *executionDAOTestSuite) TestRefreshStatus() {
|
||||||
|
// contains tasks with status: success
|
||||||
|
taskID01, err := e.taskDao.Create(e.ctx, &Task{
|
||||||
|
ExecutionID: e.executionID,
|
||||||
|
Status: job.SuccessStatus.String(),
|
||||||
|
StatusCode: job.SuccessStatus.Code(),
|
||||||
|
ExtraAttrs: "{}",
|
||||||
|
EndTime: time.Now(),
|
||||||
|
})
|
||||||
|
e.Require().Nil(err)
|
||||||
|
defer e.taskDao.Delete(e.ctx, taskID01)
|
||||||
|
|
||||||
|
err = e.executionDAO.RefreshStatus(e.ctx, e.executionID)
|
||||||
|
e.Require().Nil(err)
|
||||||
|
execution, err := e.executionDAO.Get(e.ctx, e.executionID)
|
||||||
|
e.Require().Nil(err)
|
||||||
|
e.Equal(job.SuccessStatus.String(), execution.Status)
|
||||||
|
e.NotEmpty(execution.EndTime)
|
||||||
|
|
||||||
|
// contains tasks with status: stopped
|
||||||
|
taskID02, err := e.taskDao.Create(e.ctx, &Task{
|
||||||
|
ExecutionID: e.executionID,
|
||||||
|
Status: job.StoppedStatus.String(),
|
||||||
|
StatusCode: job.StoppedStatus.Code(),
|
||||||
|
ExtraAttrs: "{}",
|
||||||
|
EndTime: time.Now(),
|
||||||
|
})
|
||||||
|
e.Require().Nil(err)
|
||||||
|
defer e.taskDao.Delete(e.ctx, taskID02)
|
||||||
|
|
||||||
|
err = e.executionDAO.RefreshStatus(e.ctx, e.executionID)
|
||||||
|
e.Require().Nil(err)
|
||||||
|
execution, err = e.executionDAO.Get(e.ctx, e.executionID)
|
||||||
|
e.Require().Nil(err)
|
||||||
|
e.Equal(job.StoppedStatus.String(), execution.Status)
|
||||||
|
e.NotEmpty(execution.EndTime)
|
||||||
|
|
||||||
|
// contains tasks with status: error
|
||||||
|
taskID03, err := e.taskDao.Create(e.ctx, &Task{
|
||||||
|
ExecutionID: e.executionID,
|
||||||
|
Status: job.ErrorStatus.String(),
|
||||||
|
StatusCode: job.ErrorStatus.Code(),
|
||||||
|
ExtraAttrs: "{}",
|
||||||
|
EndTime: time.Now(),
|
||||||
|
})
|
||||||
|
e.Require().Nil(err)
|
||||||
|
defer e.taskDao.Delete(e.ctx, taskID03)
|
||||||
|
|
||||||
|
err = e.executionDAO.RefreshStatus(e.ctx, e.executionID)
|
||||||
|
e.Require().Nil(err)
|
||||||
|
execution, err = e.executionDAO.Get(e.ctx, e.executionID)
|
||||||
|
e.Require().Nil(err)
|
||||||
|
e.Equal(job.ErrorStatus.String(), execution.Status)
|
||||||
|
e.NotEmpty(execution.EndTime)
|
||||||
|
|
||||||
|
// contains tasks with status: pending, running, scheduled
|
||||||
|
taskID04, err := e.taskDao.Create(e.ctx, &Task{
|
||||||
|
ExecutionID: e.executionID,
|
||||||
|
Status: job.PendingStatus.String(),
|
||||||
|
StatusCode: job.PendingStatus.Code(),
|
||||||
|
ExtraAttrs: "{}",
|
||||||
|
})
|
||||||
|
e.Require().Nil(err)
|
||||||
|
defer e.taskDao.Delete(e.ctx, taskID04)
|
||||||
|
|
||||||
|
taskID05, err := e.taskDao.Create(e.ctx, &Task{
|
||||||
|
ExecutionID: e.executionID,
|
||||||
|
Status: job.RunningStatus.String(),
|
||||||
|
StatusCode: job.RunningStatus.Code(),
|
||||||
|
ExtraAttrs: "{}",
|
||||||
|
})
|
||||||
|
e.Require().Nil(err)
|
||||||
|
defer e.taskDao.Delete(e.ctx, taskID05)
|
||||||
|
|
||||||
|
taskID06, err := e.taskDao.Create(e.ctx, &Task{
|
||||||
|
ExecutionID: e.executionID,
|
||||||
|
Status: job.ScheduledStatus.String(),
|
||||||
|
StatusCode: job.ScheduledStatus.Code(),
|
||||||
|
ExtraAttrs: "{}",
|
||||||
|
})
|
||||||
|
e.Require().Nil(err)
|
||||||
|
defer e.taskDao.Delete(e.ctx, taskID06)
|
||||||
|
|
||||||
|
err = e.executionDAO.RefreshStatus(e.ctx, e.executionID)
|
||||||
|
e.Require().Nil(err)
|
||||||
|
execution, err = e.executionDAO.Get(e.ctx, e.executionID)
|
||||||
|
e.Require().Nil(err)
|
||||||
|
e.Equal(job.RunningStatus.String(), execution.Status)
|
||||||
|
e.Empty(execution.EndTime)
|
||||||
|
}
|
||||||
|
|
||||||
func TestExecutionDAOSuite(t *testing.T) {
|
func TestExecutionDAOSuite(t *testing.T) {
|
||||||
suite.Run(t, &executionDAOTestSuite{})
|
suite.Run(t, &executionDAOTestSuite{})
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,18 @@ type Execution struct {
|
||||||
ExtraAttrs string `orm:"column(extra_attrs)"` // json string
|
ExtraAttrs string `orm:"column(extra_attrs)"` // json string
|
||||||
StartTime time.Time `orm:"column(start_time)"`
|
StartTime time.Time `orm:"column(start_time)"`
|
||||||
EndTime time.Time `orm:"column(end_time)"`
|
EndTime time.Time `orm:"column(end_time)"`
|
||||||
|
Revision int64 `orm:"column(revision)"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Metrics is the task metrics for one execution
|
||||||
|
type Metrics struct {
|
||||||
|
TaskCount int64 `json:"task_count"`
|
||||||
|
SuccessTaskCount int64 `json:"success_task_count"`
|
||||||
|
ErrorTaskCount int64 `json:"error_task_count"`
|
||||||
|
PendingTaskCount int64 `json:"pending_task_count"`
|
||||||
|
RunningTaskCount int64 `json:"running_task_count"`
|
||||||
|
ScheduledTaskCount int64 `json:"scheduled_task_count"`
|
||||||
|
StoppedTaskCount int64 `json:"stopped_task_count"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Task database model
|
// Task database model
|
||||||
|
|
|
@ -203,88 +203,13 @@ func (e *executionManager) populateExecution(ctx context.Context, execution *dao
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the status isn't null which means the status is set manually, return directly
|
|
||||||
if len(exec.Status) > 0 {
|
|
||||||
return exec
|
|
||||||
}
|
|
||||||
|
|
||||||
// populate task metrics
|
// populate task metrics
|
||||||
e.populateExecutionMetrics(ctx, exec)
|
metrics, err := e.executionDAO.GetMetrics(ctx, execution.ID)
|
||||||
// populate status
|
if err != nil {
|
||||||
e.populateExecutionStatus(exec)
|
log.Errorf("failed to get metrics of the execution %d: %v", execution.ID, err)
|
||||||
// populate the end time
|
} else {
|
||||||
e.populateExecutionEndTime(ctx, exec)
|
exec.Metrics = metrics
|
||||||
|
}
|
||||||
|
|
||||||
return exec
|
return exec
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *executionManager) populateExecutionMetrics(ctx context.Context, execution *Execution) {
|
|
||||||
scs, err := e.taskDAO.ListStatusCount(ctx, execution.ID)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed to list status count of execution %d: %v", execution.ID, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if len(scs) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
metrics := &Metrics{}
|
|
||||||
for _, sc := range scs {
|
|
||||||
switch sc.Status {
|
|
||||||
case job.SuccessStatus.String():
|
|
||||||
metrics.SuccessTaskCount = sc.Count
|
|
||||||
case job.ErrorStatus.String():
|
|
||||||
metrics.ErrorTaskCount = sc.Count
|
|
||||||
case job.PendingStatus.String():
|
|
||||||
metrics.PendingTaskCount = sc.Count
|
|
||||||
case job.RunningStatus.String():
|
|
||||||
metrics.RunningTaskCount = sc.Count
|
|
||||||
case job.ScheduledStatus.String():
|
|
||||||
metrics.ScheduledTaskCount = sc.Count
|
|
||||||
case job.StoppedStatus.String():
|
|
||||||
metrics.StoppedTaskCount = sc.Count
|
|
||||||
default:
|
|
||||||
log.Errorf("unknown task status: %s", sc.Status)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
metrics.TaskCount = metrics.SuccessTaskCount + metrics.ErrorTaskCount +
|
|
||||||
metrics.PendingTaskCount + metrics.RunningTaskCount +
|
|
||||||
metrics.ScheduledTaskCount + metrics.StoppedTaskCount
|
|
||||||
execution.Metrics = metrics
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *executionManager) populateExecutionStatus(execution *Execution) {
|
|
||||||
metrics := execution.Metrics
|
|
||||||
if metrics == nil {
|
|
||||||
execution.Status = job.RunningStatus.String()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if metrics.PendingTaskCount > 0 || metrics.RunningTaskCount > 0 || metrics.ScheduledTaskCount > 0 {
|
|
||||||
execution.Status = job.RunningStatus.String()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if metrics.ErrorTaskCount > 0 {
|
|
||||||
execution.Status = job.ErrorStatus.String()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if metrics.StoppedTaskCount > 0 {
|
|
||||||
execution.Status = job.StoppedStatus.String()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if metrics.SuccessTaskCount > 0 {
|
|
||||||
execution.Status = job.SuccessStatus.String()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *executionManager) populateExecutionEndTime(ctx context.Context, execution *Execution) {
|
|
||||||
if !job.Status(execution.Status).Final() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
endTime, err := e.taskDAO.GetMaxEndTime(ctx, execution.ID)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed to get the max end time of the execution %d: %v", execution.ID, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
execution.EndTime = endTime
|
|
||||||
}
|
|
||||||
|
|
|
@ -16,7 +16,6 @@ package task
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
"github.com/goharbor/harbor/src/lib/errors"
|
"github.com/goharbor/harbor/src/lib/errors"
|
||||||
|
@ -127,10 +126,16 @@ func (e *executionManagerTestSuite) TestGet() {
|
||||||
ID: 1,
|
ID: 1,
|
||||||
Status: job.SuccessStatus.String(),
|
Status: job.SuccessStatus.String(),
|
||||||
}, nil)
|
}, nil)
|
||||||
|
e.execDAO.On("GetMetrics", mock.Anything, mock.Anything).Return(&dao.Metrics{
|
||||||
|
TaskCount: 1,
|
||||||
|
SuccessTaskCount: 1,
|
||||||
|
}, nil)
|
||||||
exec, err := e.execMgr.Get(nil, 1)
|
exec, err := e.execMgr.Get(nil, 1)
|
||||||
e.Require().Nil(err)
|
e.Require().Nil(err)
|
||||||
e.Equal(int64(1), exec.ID)
|
e.Equal(int64(1), exec.ID)
|
||||||
e.Equal(job.SuccessStatus.String(), exec.Status)
|
e.Equal(job.SuccessStatus.String(), exec.Status)
|
||||||
|
e.Equal(int64(1), exec.Metrics.TaskCount)
|
||||||
|
e.Equal(int64(1), exec.Metrics.SuccessTaskCount)
|
||||||
e.execDAO.AssertExpectations(e.T())
|
e.execDAO.AssertExpectations(e.T())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,136 +146,20 @@ func (e *executionManagerTestSuite) TestList() {
|
||||||
Status: job.SuccessStatus.String(),
|
Status: job.SuccessStatus.String(),
|
||||||
},
|
},
|
||||||
}, nil)
|
}, nil)
|
||||||
|
e.execDAO.On("GetMetrics", mock.Anything, mock.Anything).Return(&dao.Metrics{
|
||||||
|
TaskCount: 1,
|
||||||
|
SuccessTaskCount: 1,
|
||||||
|
}, nil)
|
||||||
execs, err := e.execMgr.List(nil, nil)
|
execs, err := e.execMgr.List(nil, nil)
|
||||||
e.Require().Nil(err)
|
e.Require().Nil(err)
|
||||||
e.Require().Len(execs, 1)
|
e.Require().Len(execs, 1)
|
||||||
e.Equal(int64(1), execs[0].ID)
|
e.Equal(int64(1), execs[0].ID)
|
||||||
e.Equal(job.SuccessStatus.String(), execs[0].Status)
|
e.Equal(job.SuccessStatus.String(), execs[0].Status)
|
||||||
|
e.Equal(int64(1), execs[0].Metrics.TaskCount)
|
||||||
|
e.Equal(int64(1), execs[0].Metrics.SuccessTaskCount)
|
||||||
e.execDAO.AssertExpectations(e.T())
|
e.execDAO.AssertExpectations(e.T())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *executionManagerTestSuite) TestPopulateExecutionMetrics() {
|
|
||||||
e.taskDAO.On("ListStatusCount", mock.Anything, mock.Anything).Return([]*dao.StatusCount{
|
|
||||||
{
|
|
||||||
Status: job.SuccessStatus.String(),
|
|
||||||
Count: 1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Status: job.ErrorStatus.String(),
|
|
||||||
Count: 1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Status: job.StoppedStatus.String(),
|
|
||||||
Count: 1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Status: job.RunningStatus.String(),
|
|
||||||
Count: 1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Status: job.PendingStatus.String(),
|
|
||||||
Count: 1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Status: job.ScheduledStatus.String(),
|
|
||||||
Count: 1,
|
|
||||||
},
|
|
||||||
}, nil)
|
|
||||||
exec := &Execution{}
|
|
||||||
e.execMgr.populateExecutionMetrics(nil, exec)
|
|
||||||
e.Require().NotNil(exec.Metrics)
|
|
||||||
e.Equal(int64(6), exec.Metrics.TaskCount)
|
|
||||||
e.Equal(int64(1), exec.Metrics.SuccessTaskCount)
|
|
||||||
e.Equal(int64(1), exec.Metrics.ErrorTaskCount)
|
|
||||||
e.Equal(int64(1), exec.Metrics.StoppedTaskCount)
|
|
||||||
e.Equal(int64(1), exec.Metrics.PendingTaskCount)
|
|
||||||
e.Equal(int64(1), exec.Metrics.RunningTaskCount)
|
|
||||||
e.Equal(int64(1), exec.Metrics.ScheduledTaskCount)
|
|
||||||
e.taskDAO.AssertExpectations(e.T())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *executionManagerTestSuite) TestPopulateExecutionStatus() {
|
|
||||||
// running
|
|
||||||
exec := &Execution{}
|
|
||||||
e.execMgr.populateExecutionStatus(exec)
|
|
||||||
e.Equal(job.RunningStatus.String(), exec.Status)
|
|
||||||
|
|
||||||
// running
|
|
||||||
exec = &Execution{
|
|
||||||
Metrics: &Metrics{
|
|
||||||
SuccessTaskCount: 1,
|
|
||||||
ErrorTaskCount: 1,
|
|
||||||
PendingTaskCount: 1,
|
|
||||||
RunningTaskCount: 1,
|
|
||||||
ScheduledTaskCount: 1,
|
|
||||||
StoppedTaskCount: 1,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
e.execMgr.populateExecutionStatus(exec)
|
|
||||||
e.Equal(job.RunningStatus.String(), exec.Status)
|
|
||||||
|
|
||||||
// error
|
|
||||||
exec = &Execution{
|
|
||||||
Metrics: &Metrics{
|
|
||||||
SuccessTaskCount: 1,
|
|
||||||
ErrorTaskCount: 1,
|
|
||||||
PendingTaskCount: 0,
|
|
||||||
RunningTaskCount: 0,
|
|
||||||
ScheduledTaskCount: 0,
|
|
||||||
StoppedTaskCount: 1,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
e.execMgr.populateExecutionStatus(exec)
|
|
||||||
e.Equal(job.ErrorStatus.String(), exec.Status)
|
|
||||||
|
|
||||||
// stopped
|
|
||||||
exec = &Execution{
|
|
||||||
Metrics: &Metrics{
|
|
||||||
SuccessTaskCount: 1,
|
|
||||||
ErrorTaskCount: 0,
|
|
||||||
PendingTaskCount: 0,
|
|
||||||
RunningTaskCount: 0,
|
|
||||||
ScheduledTaskCount: 0,
|
|
||||||
StoppedTaskCount: 1,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
e.execMgr.populateExecutionStatus(exec)
|
|
||||||
e.Equal(job.StoppedStatus.String(), exec.Status)
|
|
||||||
|
|
||||||
// success
|
|
||||||
exec = &Execution{
|
|
||||||
Metrics: &Metrics{
|
|
||||||
SuccessTaskCount: 1,
|
|
||||||
ErrorTaskCount: 0,
|
|
||||||
PendingTaskCount: 0,
|
|
||||||
RunningTaskCount: 0,
|
|
||||||
ScheduledTaskCount: 0,
|
|
||||||
StoppedTaskCount: 0,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
e.execMgr.populateExecutionStatus(exec)
|
|
||||||
e.Equal(job.SuccessStatus.String(), exec.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *executionManagerTestSuite) TestPopulateExecutionEndTime() {
|
|
||||||
// isn't final status
|
|
||||||
exec := &Execution{
|
|
||||||
Status: job.RunningStatus.String(),
|
|
||||||
}
|
|
||||||
e.execMgr.populateExecutionEndTime(nil, exec)
|
|
||||||
e.Equal(time.Time{}, exec.EndTime)
|
|
||||||
|
|
||||||
// final status
|
|
||||||
now := time.Now()
|
|
||||||
exec = &Execution{
|
|
||||||
Status: job.SuccessStatus.String(),
|
|
||||||
}
|
|
||||||
e.taskDAO.On("GetMaxEndTime", mock.Anything, mock.Anything).Return(now, nil)
|
|
||||||
e.execMgr.populateExecutionEndTime(nil, exec)
|
|
||||||
e.Equal(now, exec.EndTime)
|
|
||||||
e.taskDAO.AssertExpectations(e.T())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestExecutionManagerSuite(t *testing.T) {
|
func TestExecutionManagerSuite(t *testing.T) {
|
||||||
suite.Run(t, &executionManagerTestSuite{})
|
suite.Run(t, &executionManagerTestSuite{})
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,12 +38,12 @@ type HookHandler struct {
|
||||||
|
|
||||||
// Handle the job status changing webhook
|
// Handle the job status changing webhook
|
||||||
func (h *HookHandler) Handle(ctx context.Context, taskID int64, sc *job.StatusChange) error {
|
func (h *HookHandler) Handle(ctx context.Context, taskID int64, sc *job.StatusChange) error {
|
||||||
|
task, err := h.taskDAO.Get(ctx, taskID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
// process check in data
|
// process check in data
|
||||||
if len(sc.CheckIn) > 0 {
|
if len(sc.CheckIn) > 0 {
|
||||||
task, err := h.taskDAO.Get(ctx, taskID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
execution, err := h.executionDAO.Get(ctx, task.ExecutionID)
|
execution, err := h.executionDAO.Get(ctx, task.ExecutionID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -58,6 +58,10 @@ func (h *HookHandler) Handle(ctx context.Context, taskID int64, sc *job.StatusCh
|
||||||
return processor(ctx, t, sc)
|
return processor(ctx, t, sc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// update status
|
// update task status
|
||||||
return h.taskDAO.UpdateStatus(ctx, taskID, sc.Status, sc.Metadata.Revision)
|
if err = h.taskDAO.UpdateStatus(ctx, taskID, sc.Status, sc.Metadata.Revision); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// update execution status
|
||||||
|
return h.executionDAO.RefreshStatus(ctx, task.ExecutionID)
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,7 +64,12 @@ func (h *hookHandlerTestSuite) TestHandle() {
|
||||||
h.SetupTest()
|
h.SetupTest()
|
||||||
|
|
||||||
// handle status changing
|
// handle status changing
|
||||||
|
h.taskDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Task{
|
||||||
|
ID: 1,
|
||||||
|
ExecutionID: 1,
|
||||||
|
}, nil)
|
||||||
h.taskDAO.On("UpdateStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
h.taskDAO.On("UpdateStatus", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
|
h.execDAO.On("RefreshStatus", mock.Anything, mock.Anything).Return(nil)
|
||||||
sc = &job.StatusChange{
|
sc = &job.StatusChange{
|
||||||
Status: job.SuccessStatus.String(),
|
Status: job.SuccessStatus.String(),
|
||||||
Metadata: &job.StatsInfo{
|
Metadata: &job.StatsInfo{
|
||||||
|
|
|
@ -95,6 +95,29 @@ func (_m *mockExecutionDAO) Get(ctx context.Context, id int64) (*dao.Execution,
|
||||||
return r0, r1
|
return r0, r1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetMetrics provides a mock function with given fields: ctx, id
|
||||||
|
func (_m *mockExecutionDAO) GetMetrics(ctx context.Context, id int64) (*dao.Metrics, error) {
|
||||||
|
ret := _m.Called(ctx, id)
|
||||||
|
|
||||||
|
var r0 *dao.Metrics
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, int64) *dao.Metrics); ok {
|
||||||
|
r0 = rf(ctx, id)
|
||||||
|
} else {
|
||||||
|
if ret.Get(0) != nil {
|
||||||
|
r0 = ret.Get(0).(*dao.Metrics)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var r1 error
|
||||||
|
if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok {
|
||||||
|
r1 = rf(ctx, id)
|
||||||
|
} else {
|
||||||
|
r1 = ret.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0, r1
|
||||||
|
}
|
||||||
|
|
||||||
// List provides a mock function with given fields: ctx, query
|
// List provides a mock function with given fields: ctx, query
|
||||||
func (_m *mockExecutionDAO) List(ctx context.Context, query *q.Query) ([]*dao.Execution, error) {
|
func (_m *mockExecutionDAO) List(ctx context.Context, query *q.Query) ([]*dao.Execution, error) {
|
||||||
ret := _m.Called(ctx, query)
|
ret := _m.Called(ctx, query)
|
||||||
|
@ -118,6 +141,20 @@ func (_m *mockExecutionDAO) List(ctx context.Context, query *q.Query) ([]*dao.Ex
|
||||||
return r0, r1
|
return r0, r1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RefreshStatus provides a mock function with given fields: ctx, id
|
||||||
|
func (_m *mockExecutionDAO) RefreshStatus(ctx context.Context, id int64) error {
|
||||||
|
ret := _m.Called(ctx, id)
|
||||||
|
|
||||||
|
var r0 error
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok {
|
||||||
|
r0 = rf(ctx, id)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0
|
||||||
|
}
|
||||||
|
|
||||||
// Update provides a mock function with given fields: ctx, execution, props
|
// Update provides a mock function with given fields: ctx, execution, props
|
||||||
func (_m *mockExecutionDAO) Update(ctx context.Context, execution *dao.Execution, props ...string) error {
|
func (_m *mockExecutionDAO) Update(ctx context.Context, execution *dao.Execution, props ...string) error {
|
||||||
_va := make([]interface{}, len(props))
|
_va := make([]interface{}, len(props))
|
||||||
|
|
|
@ -49,8 +49,8 @@ type Execution struct {
|
||||||
// 1. After creating the execution, there may be some errors before creating tasks, the
|
// 1. After creating the execution, there may be some errors before creating tasks, the
|
||||||
// "StatusMessage" can contain the error message
|
// "StatusMessage" can contain the error message
|
||||||
// 2. The execution may contain no tasks, "StatusMessage" can be used to explain the case
|
// 2. The execution may contain no tasks, "StatusMessage" can be used to explain the case
|
||||||
StatusMessage string `json:"status_message"`
|
StatusMessage string `json:"status_message"`
|
||||||
Metrics *Metrics `json:"metrics"`
|
Metrics *dao.Metrics `json:"metrics"`
|
||||||
// trigger type: manual/schedule/event
|
// trigger type: manual/schedule/event
|
||||||
Trigger string `json:"trigger"`
|
Trigger string `json:"trigger"`
|
||||||
// the customized attributes for different kinds of consumers
|
// the customized attributes for different kinds of consumers
|
||||||
|
@ -59,17 +59,6 @@ type Execution struct {
|
||||||
EndTime time.Time `json:"end_time"`
|
EndTime time.Time `json:"end_time"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Metrics for tasks
|
|
||||||
type Metrics struct {
|
|
||||||
TaskCount int64 `json:"task_count"`
|
|
||||||
SuccessTaskCount int64 `json:"success_task_count"`
|
|
||||||
ErrorTaskCount int64 `json:"error_task_count"`
|
|
||||||
PendingTaskCount int64 `json:"pending_task_count"`
|
|
||||||
RunningTaskCount int64 `json:"running_task_count"`
|
|
||||||
ScheduledTaskCount int64 `json:"scheduled_task_count"`
|
|
||||||
StoppedTaskCount int64 `json:"stopped_task_count"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// Task is the unit for running. It stores the jobservice job records and related information
|
// Task is the unit for running. It stores the jobservice job records and related information
|
||||||
type Task struct {
|
type Task struct {
|
||||||
ID int64 `json:"id"`
|
ID int64 `json:"id"`
|
||||||
|
|
|
@ -56,7 +56,7 @@ func (j *jobStatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
if err = j.handler.Handle(r.Context(), taskID, sc); err != nil {
|
if err = j.handler.Handle(r.Context(), taskID, sc); err != nil {
|
||||||
// ignore the not found error to avoid the jobservice re-sending the hook
|
// ignore the not found error to avoid the jobservice re-sending the hook
|
||||||
if errors.IsNotFoundErr(err) {
|
if errors.IsNotFoundErr(err) {
|
||||||
log.Warningf("got the status change hook for a non existing task %d", taskID)
|
log.Warningf("task %d does not exist, ignore the not found error to avoid subsequent retrying webhooks from jobservice", taskID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
libhttp.SendError(w, err)
|
libhttp.SendError(w, err)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user