Commit febb2d80 by 朱继来

删除任务

parent de7b48d9
package common package common
const ( const (
// 定时任务保存目录
JOB_DIR = "/cron/"
// 定时任务任务保存目录 // 定时任务任务保存目录
JOB_SAVE_DIR = "/cron/jobs/" JOB_SAVE_DIR = "/cron/jobs/"
......
package master package master
import ( import (
_"encoding/json" _ "encoding/json"
_"fmt" _ "fmt"
"go-crontab/common" "go-crontab/common"
"net" "net"
"net/http" "net/http"
...@@ -66,7 +66,8 @@ ERR: ...@@ -66,7 +66,8 @@ ERR:
func handleJobDelete(resp http.ResponseWriter, req *http.Request) { func handleJobDelete(resp http.ResponseWriter, req *http.Request) {
var ( var (
err error // interface{} err error // interface{}
name string job_name string
node string
oldJob *common.Job oldJob *common.Job
bytes []byte bytes []byte
) )
...@@ -76,11 +77,12 @@ func handleJobDelete(resp http.ResponseWriter, req *http.Request) { ...@@ -76,11 +77,12 @@ func handleJobDelete(resp http.ResponseWriter, req *http.Request) {
goto ERR goto ERR
} }
// 删除的任务名 // 删除的任务名、节点
name = req.PostForm.Get("name") job_name = req.PostForm.Get("job_name")
node = req.PostForm.Get("node")
// 去删除任务 // 去删除任务
if oldJob, err = G_jobMgr.DeleteJob(name); err != nil { if oldJob, err = G_jobMgr.DeleteJob(job_name, node); err != nil {
goto ERR goto ERR
} }
......
...@@ -4,17 +4,17 @@ import ( ...@@ -4,17 +4,17 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
_"fmt" _ "fmt"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/mvcc/mvccpb"
"go-crontab/common" "go-crontab/common"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
) )
// 任务管理器 // 任务管理器
...@@ -95,8 +95,15 @@ func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error) ...@@ -95,8 +95,15 @@ func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error)
job.CronExpr = req.FormValue("cron_expr"); job.CronExpr = req.FormValue("cron_expr");
nodeIp := req.FormValue("node"); nodeIp := req.FormValue("node");
// etcd的保存key: 目录 + IP + 任务名称 // 任务类型:1-普通任务,2-一次性任务
jobKey = common.JOB_SAVE_DIR + nodeIp + "/" + job.Name job_type, err := strconv.ParseInt(req.FormValue("job_type"), 10, 64)
if job_type == 1 {
jobKey = common.JOB_SAVE_DIR + nodeIp + "/" + job.Name // etcd的保存key: 目录 + IP + 任务名称
} else {
jobKey = common.JOB_ONCE_SAVE_DIR + nodeIp + "/" + job.Name // etcd的保存key: 目录 + IP + 任务名称
}
// 任务信息json // 任务信息json
if jobValue, err = json.Marshal(job); err != nil { if jobValue, err = json.Marshal(job); err != nil {
return return
...@@ -124,7 +131,7 @@ func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error) ...@@ -124,7 +131,7 @@ func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error)
cronJob.Command = req.FormValue("command") cronJob.Command = req.FormValue("command")
cronJob.CronExpr = req.FormValue("cron_expr") cronJob.CronExpr = req.FormValue("cron_expr")
cronJob.ConcurrencyNum, err = strconv.ParseInt(req.FormValue("concurrency_num"), 10, 64) cronJob.ConcurrencyNum, err = strconv.ParseInt(req.FormValue("concurrency_num"), 10, 64)
cronJob.JobType, err = strconv.ParseInt(req.FormValue("job_type"), 10, 64) cronJob.JobType = job_type
cronJob.Status = 1 cronJob.Status = 1
if cron_job_id == "" { // 新增 if cron_job_id == "" { // 新增
...@@ -144,7 +151,7 @@ func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error) ...@@ -144,7 +151,7 @@ func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error)
// 修改部分值 // 修改部分值
update := bson.M{"$set": bson.M{"node": cronJob.Node, "group": cronJob.Group, "command": cronJob.Command, update := bson.M{"$set": bson.M{"node": cronJob.Node, "group": cronJob.Group, "command": cronJob.Command,
"cron_expr": cronJob.CronExpr, "concurrency_num": cronJob.ConcurrencyNum, "job_type": cronJob.JobType, "update_time": cronJob.UpdateTime}} "cron_expr": cronJob.CronExpr, "concurrency_num": cronJob.ConcurrencyNum, "update_time": cronJob.UpdateTime}}
objectId, err := primitive.ObjectIDFromHex(cron_job_id) objectId, err := primitive.ObjectIDFromHex(cron_job_id)
_, err = jobMgr.jobCollection.UpdateOne(context.TODO(), bson.M{"_id": objectId}, update) _, err = jobMgr.jobCollection.UpdateOne(context.TODO(), bson.M{"_id": objectId}, update)
...@@ -159,7 +166,7 @@ func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error) ...@@ -159,7 +166,7 @@ func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error)
} }
// 删除任务 // 删除任务
func (jobMgr *JobMgr) DeleteJob(name string) (oldJob *common.Job, err error) { func (jobMgr *JobMgr) DeleteJob(job_name string, node_ip string) (oldJob *common.Job, err error) {
var ( var (
jobKey string jobKey string
delResp *clientv3.DeleteResponse delResp *clientv3.DeleteResponse
...@@ -167,7 +174,7 @@ func (jobMgr *JobMgr) DeleteJob(name string) (oldJob *common.Job, err error) { ...@@ -167,7 +174,7 @@ func (jobMgr *JobMgr) DeleteJob(name string) (oldJob *common.Job, err error) {
) )
// etcd中保存任务的key // etcd中保存任务的key
jobKey = common.JOB_SAVE_DIR + name jobKey = common.JOB_SAVE_DIR + node_ip + "/" + job_name
// 从etcd中删除它 // 从etcd中删除它
if delResp, err = jobMgr.kv.Delete(context.TODO(), jobKey, clientv3.WithPrevKV()); err != nil { if delResp, err = jobMgr.kv.Delete(context.TODO(), jobKey, clientv3.WithPrevKV()); err != nil {
...@@ -183,6 +190,7 @@ func (jobMgr *JobMgr) DeleteJob(name string) (oldJob *common.Job, err error) { ...@@ -183,6 +190,7 @@ func (jobMgr *JobMgr) DeleteJob(name string) (oldJob *common.Job, err error) {
} }
oldJob = &oldJobObj oldJob = &oldJobObj
} }
return return
} }
...@@ -196,7 +204,7 @@ func (jobMgr *JobMgr) ListJobs() (jobList []*common.Job, err error) { ...@@ -196,7 +204,7 @@ func (jobMgr *JobMgr) ListJobs() (jobList []*common.Job, err error) {
) )
// 任务保存的目录 // 任务保存的目录
dirKey = common.JOB_SAVE_DIR dirKey = common.JOB_DIR
// 获取目录下所有任务信息 // 获取目录下所有任务信息
if getResp, err = jobMgr.kv.Get(context.TODO(), dirKey, clientv3.WithPrefix()); err != nil { if getResp, err = jobMgr.kv.Get(context.TODO(), dirKey, clientv3.WithPrefix()); err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment