Commit 7afe1042 by 朱继来

调整保存、删除任务

parent a59f3f23
Showing with 73 additions and 65 deletions
...@@ -66,8 +66,6 @@ ERR: ...@@ -66,8 +66,6 @@ ERR:
func handleJobDelete(resp http.ResponseWriter, req *http.Request) { func handleJobDelete(resp http.ResponseWriter, req *http.Request) {
var ( var (
err error // interface{} err error // interface{}
job_etcd_name string
node string
oldJob *common.Job oldJob *common.Job
bytes []byte bytes []byte
) )
...@@ -78,11 +76,11 @@ func handleJobDelete(resp http.ResponseWriter, req *http.Request) { ...@@ -78,11 +76,11 @@ func handleJobDelete(resp http.ResponseWriter, req *http.Request) {
} }
// 删除的任务名、节点 // 删除的任务名、节点
job_etcd_name = req.PostForm.Get("job_etcd_name") //job_etcd_name = req.PostForm.Get("job_etcd_name")
node = req.PostForm.Get("node") //node = req.PostForm.Get("node")
// 去删除任务 // 去删除任务
if oldJob, err = G_jobMgr.DeleteJob(job_etcd_name, node); err != nil { if oldJob, err = G_jobMgr.DeleteJob(req); err != nil {
goto ERR goto ERR
} }
......
...@@ -3,15 +3,14 @@ package master ...@@ -3,15 +3,14 @@ package master
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
_ "fmt" _ "fmt"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/mvcc/mvccpb"
"go-crontab/common" "go-crontab/common"
"go.mongodb.org/mongo-driver/bson" _"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive" _"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo" _"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options" _"go.mongodb.org/mongo-driver/mongo/options"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
...@@ -22,8 +21,8 @@ type JobMgr struct { ...@@ -22,8 +21,8 @@ type JobMgr struct {
client *clientv3.Client client *clientv3.Client
kv clientv3.KV kv clientv3.KV
lease clientv3.Lease lease clientv3.Lease
mongoClient *mongo.Client //mongoClient *mongo.Client
jobCollection *mongo.Collection //jobCollection *mongo.Collection
} }
var ( var (
...@@ -38,8 +37,8 @@ func InitJobMgr() (err error) { ...@@ -38,8 +37,8 @@ func InitJobMgr() (err error) {
client *clientv3.Client client *clientv3.Client
kv clientv3.KV kv clientv3.KV
lease clientv3.Lease lease clientv3.Lease
mongoClient *mongo.Client //mongoClient *mongo.Client
jobCollection *mongo.Collection //jobCollection *mongo.Collection
) )
// 初始化配置 // 初始化配置
...@@ -58,21 +57,21 @@ func InitJobMgr() (err error) { ...@@ -58,21 +57,21 @@ func InitJobMgr() (err error) {
lease = clientv3.NewLease(client) lease = clientv3.NewLease(client)
// 建立mongodb连接 // 建立mongodb连接
mongoOptions := options.Client().ApplyURI(G_config.MongodbUri) //mongoOptions := options.Client().ApplyURI(G_config.MongodbUri)
if mongoClient, err = mongo.Connect(context.TODO(),mongoOptions); err != nil { //if mongoClient, err = mongo.Connect(context.TODO(),mongoOptions); err != nil {
return // return
} //}
// 获取数据库和集合 // 获取数据库和集合
jobCollection = mongoClient.Database("ichunt").Collection("cron_jobs") //jobCollection = mongoClient.Database("ichunt").Collection("cron_jobs")
// 赋值单例 // 赋值单例
G_jobMgr = &JobMgr{ G_jobMgr = &JobMgr{
client: client, client: client,
kv: kv, kv: kv,
lease: lease, lease: lease,
mongoClient: mongoClient, //mongoClient: mongoClient,
jobCollection: jobCollection, //jobCollection: jobCollection,
} }
return return
} }
...@@ -86,7 +85,7 @@ func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error) ...@@ -86,7 +85,7 @@ func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error)
jobValue []byte jobValue []byte
putResp *clientv3.PutResponse putResp *clientv3.PutResponse
oldJobObj common.Job oldJobObj common.Job
cronJob common.CronJobs //cronJob common.CronJobs
) )
// 赋值给job // 赋值给job
...@@ -123,60 +122,71 @@ func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error) ...@@ -123,60 +122,71 @@ func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error)
oldJob = &oldJobObj oldJob = &oldJobObj
} }
// 同步保存到mongo //// 同步保存到mongo
cron_job_id := req.FormValue("id") //cron_job_id := req.FormValue("id")
cronJob.JobName = req.FormValue("job_name") //cronJob.JobName = req.FormValue("job_name")
cronJob.JobEtcdName = req.FormValue("job_etcd_name") //cronJob.JobEtcdName = req.FormValue("job_etcd_name")
cronJob.EtcdKey = jobKey //cronJob.EtcdKey = jobKey
cronJob.Node = req.FormValue("node") //cronJob.Node = req.FormValue("node")
cronJob.Group, err = strconv.ParseInt(req.FormValue("group"), 10, 64) //cronJob.Group, err = strconv.ParseInt(req.FormValue("group"), 10, 64)
cronJob.Command = req.FormValue("command") //cronJob.Command = req.FormValue("command")
cronJob.CronExpr = req.FormValue("cron_expr") //cronJob.CronExpr = req.FormValue("cron_expr")
cronJob.ConcurrencyNum, err = strconv.ParseInt(req.FormValue("concurrency_num"), 10, 64) //cronJob.ConcurrencyNum, err = strconv.ParseInt(req.FormValue("concurrency_num"), 10, 64)
cronJob.JobType = job_type //cronJob.JobType = job_type
cronJob.Status = 1 //cronJob.Status = 1
//
if cron_job_id == "" { // 新增 //if cron_job_id == "" { // 新增
cronJob.Creator, err = strconv.ParseInt(req.FormValue("creator"), 10, 64) // cronJob.Creator, err = strconv.ParseInt(req.FormValue("creator"), 10, 64)
cronJob.CreateTime = time.Now().Unix() // cronJob.CreateTime = time.Now().Unix()
cronJob.UpdateTime = time.Now().Unix() // cronJob.UpdateTime = time.Now().Unix()
//
_, err = jobMgr.jobCollection.InsertOne(context.TODO(), &cronJob) // _, err = jobMgr.jobCollection.InsertOne(context.TODO(), &cronJob)
} else { // 修改 //} else { // 修改
modifier := req.FormValue("modifier") // modifier := req.FormValue("modifier")
//
if modifier != "" { // if modifier != "" {
cronJob.Modifier, err = strconv.ParseInt(modifier, 10, 64) // cronJob.Modifier, err = strconv.ParseInt(modifier, 10, 64)
} // }
//
cronJob.UpdateTime = time.Now().Unix() // cronJob.UpdateTime = time.Now().Unix()
//
// 修改部分值 // // 修改部分值
update := bson.M{"$set": bson.M{"job_etcd_name": cronJob.JobEtcdName, "etcd_key": cronJob.EtcdKey, "node": cronJob.Node, "group": cronJob.Group, "command": cronJob.Command, // update := bson.M{"$set": bson.M{"job_etcd_name": cronJob.JobEtcdName, "etcd_key": cronJob.EtcdKey, "node": cronJob.Node, "group": cronJob.Group, "command": cronJob.Command,
"cron_expr": cronJob.CronExpr, "concurrency_num": cronJob.ConcurrencyNum, "modifier": cronJob.Modifier, "update_time": cronJob.UpdateTime}} // "cron_expr": cronJob.CronExpr, "concurrency_num": cronJob.ConcurrencyNum, "modifier": cronJob.Modifier, "update_time": cronJob.UpdateTime}}
objectId, err := primitive.ObjectIDFromHex(cron_job_id) // objectId, err := primitive.ObjectIDFromHex(cron_job_id)
//
_, err = jobMgr.jobCollection.UpdateOne(context.TODO(), bson.M{"_id": objectId}, update) // _, err = jobMgr.jobCollection.UpdateOne(context.TODO(), bson.M{"_id": objectId}, update)
fmt.Println(err) // fmt.Println(err)
} //}
//
if err != nil { //if err != nil {
return // return
} //}
return return
} }
// 删除任务 // 删除任务
func (jobMgr *JobMgr) DeleteJob(job_etcd_name string, node_ip string) (oldJob *common.Job, err error) { func (jobMgr *JobMgr) DeleteJob(req *http.Request) (oldJob *common.Job, err error) {
var ( var (
job_etcd_name string
node_ip string
jobKey string jobKey string
delResp *clientv3.DeleteResponse delResp *clientv3.DeleteResponse
oldJobObj common.Job oldJobObj common.Job
) )
// etcd中保存任务的key job_etcd_name = req.PostForm.Get("job_etcd_name")
jobKey = common.JOB_SAVE_DIR + node_ip + "/" + job_etcd_name node_ip = req.PostForm.Get("node")
// 任务类型:1-普通任务,2-一次性任务
job_type, err := strconv.ParseInt(req.PostForm.Get("job_type"), 10, 64)
if job_type == 1 {
jobKey = common.JOB_SAVE_DIR + node_ip + "/" + job_etcd_name // etcd的保存key: 目录 + IP + 任务名称
} else {
jobKey = common.JOB_ONCE_SAVE_DIR + node_ip + "/" + job_etcd_name // etcd的保存key: 目录 + IP + 任务名称
}
// 从etcd中删除它 // 从etcd中删除它
if delResp, err = jobMgr.kv.Delete(context.TODO(), jobKey, clientv3.WithPrevKV()); err != nil { if delResp, err = jobMgr.kv.Delete(context.TODO(), jobKey, clientv3.WithPrevKV()); err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment