Commit 424fcb28 by 孙龙

Merge branch 'master' of http://119.23.72.7/sunlong_v5/go-crontab

parents 306ed9fe 7afe1042
Showing with 73 additions and 65 deletions
......@@ -66,8 +66,6 @@ ERR:
func handleJobDelete(resp http.ResponseWriter, req *http.Request) {
var (
err error // interface{}
job_etcd_name string
node string
oldJob *common.Job
bytes []byte
)
......@@ -78,11 +76,11 @@ func handleJobDelete(resp http.ResponseWriter, req *http.Request) {
}
// 删除的任务名、节点
job_etcd_name = req.PostForm.Get("job_etcd_name")
node = req.PostForm.Get("node")
//job_etcd_name = req.PostForm.Get("job_etcd_name")
//node = req.PostForm.Get("node")
// 去删除任务
if oldJob, err = G_jobMgr.DeleteJob(job_etcd_name, node); err != nil {
if oldJob, err = G_jobMgr.DeleteJob(req); err != nil {
goto ERR
}
......
......@@ -3,15 +3,14 @@ package master
import (
"context"
"encoding/json"
"fmt"
_ "fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"go-crontab/common"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
_"go.mongodb.org/mongo-driver/bson"
_"go.mongodb.org/mongo-driver/bson/primitive"
_"go.mongodb.org/mongo-driver/mongo"
_"go.mongodb.org/mongo-driver/mongo/options"
"net/http"
"strconv"
"time"
......@@ -22,8 +21,8 @@ type JobMgr struct {
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
mongoClient *mongo.Client
jobCollection *mongo.Collection
//mongoClient *mongo.Client
//jobCollection *mongo.Collection
}
var (
......@@ -38,8 +37,8 @@ func InitJobMgr() (err error) {
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
mongoClient *mongo.Client
jobCollection *mongo.Collection
//mongoClient *mongo.Client
//jobCollection *mongo.Collection
)
// 初始化配置
......@@ -58,21 +57,21 @@ func InitJobMgr() (err error) {
lease = clientv3.NewLease(client)
// 建立mongodb连接
mongoOptions := options.Client().ApplyURI(G_config.MongodbUri)
if mongoClient, err = mongo.Connect(context.TODO(),mongoOptions); err != nil {
return
}
//mongoOptions := options.Client().ApplyURI(G_config.MongodbUri)
//if mongoClient, err = mongo.Connect(context.TODO(),mongoOptions); err != nil {
// return
//}
// 获取数据库和集合
jobCollection = mongoClient.Database("ichunt").Collection("cron_jobs")
//jobCollection = mongoClient.Database("ichunt").Collection("cron_jobs")
// 赋值单例
G_jobMgr = &JobMgr{
client: client,
kv: kv,
lease: lease,
mongoClient: mongoClient,
jobCollection: jobCollection,
//mongoClient: mongoClient,
//jobCollection: jobCollection,
}
return
}
......@@ -86,7 +85,7 @@ func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error)
jobValue []byte
putResp *clientv3.PutResponse
oldJobObj common.Job
cronJob common.CronJobs
//cronJob common.CronJobs
)
// 赋值给job
......@@ -123,60 +122,71 @@ func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error)
oldJob = &oldJobObj
}
// 同步保存到mongo
cron_job_id := req.FormValue("id")
cronJob.JobName = req.FormValue("job_name")
cronJob.JobEtcdName = req.FormValue("job_etcd_name")
cronJob.EtcdKey = jobKey
cronJob.Node = req.FormValue("node")
cronJob.Group, err = strconv.ParseInt(req.FormValue("group"), 10, 64)
cronJob.Command = req.FormValue("command")
cronJob.CronExpr = req.FormValue("cron_expr")
cronJob.ConcurrencyNum, err = strconv.ParseInt(req.FormValue("concurrency_num"), 10, 64)
cronJob.JobType = job_type
cronJob.Status = 1
if cron_job_id == "" { // 新增
cronJob.Creator, err = strconv.ParseInt(req.FormValue("creator"), 10, 64)
cronJob.CreateTime = time.Now().Unix()
cronJob.UpdateTime = time.Now().Unix()
_, err = jobMgr.jobCollection.InsertOne(context.TODO(), &cronJob)
} else { // 修改
modifier := req.FormValue("modifier")
if modifier != "" {
cronJob.Modifier, err = strconv.ParseInt(modifier, 10, 64)
}
cronJob.UpdateTime = time.Now().Unix()
// 修改部分值
update := bson.M{"$set": bson.M{"job_etcd_name": cronJob.JobEtcdName, "etcd_key": cronJob.EtcdKey, "node": cronJob.Node, "group": cronJob.Group, "command": cronJob.Command,
"cron_expr": cronJob.CronExpr, "concurrency_num": cronJob.ConcurrencyNum, "modifier": cronJob.Modifier, "update_time": cronJob.UpdateTime}}
objectId, err := primitive.ObjectIDFromHex(cron_job_id)
_, err = jobMgr.jobCollection.UpdateOne(context.TODO(), bson.M{"_id": objectId}, update)
fmt.Println(err)
}
if err != nil {
return
}
//// 同步保存到mongo
//cron_job_id := req.FormValue("id")
//cronJob.JobName = req.FormValue("job_name")
//cronJob.JobEtcdName = req.FormValue("job_etcd_name")
//cronJob.EtcdKey = jobKey
//cronJob.Node = req.FormValue("node")
//cronJob.Group, err = strconv.ParseInt(req.FormValue("group"), 10, 64)
//cronJob.Command = req.FormValue("command")
//cronJob.CronExpr = req.FormValue("cron_expr")
//cronJob.ConcurrencyNum, err = strconv.ParseInt(req.FormValue("concurrency_num"), 10, 64)
//cronJob.JobType = job_type
//cronJob.Status = 1
//
//if cron_job_id == "" { // 新增
// cronJob.Creator, err = strconv.ParseInt(req.FormValue("creator"), 10, 64)
// cronJob.CreateTime = time.Now().Unix()
// cronJob.UpdateTime = time.Now().Unix()
//
// _, err = jobMgr.jobCollection.InsertOne(context.TODO(), &cronJob)
//} else { // 修改
// modifier := req.FormValue("modifier")
//
// if modifier != "" {
// cronJob.Modifier, err = strconv.ParseInt(modifier, 10, 64)
// }
//
// cronJob.UpdateTime = time.Now().Unix()
//
// // 修改部分值
// update := bson.M{"$set": bson.M{"job_etcd_name": cronJob.JobEtcdName, "etcd_key": cronJob.EtcdKey, "node": cronJob.Node, "group": cronJob.Group, "command": cronJob.Command,
// "cron_expr": cronJob.CronExpr, "concurrency_num": cronJob.ConcurrencyNum, "modifier": cronJob.Modifier, "update_time": cronJob.UpdateTime}}
// objectId, err := primitive.ObjectIDFromHex(cron_job_id)
//
// _, err = jobMgr.jobCollection.UpdateOne(context.TODO(), bson.M{"_id": objectId}, update)
// fmt.Println(err)
//}
//
//if err != nil {
// return
//}
return
}
// 删除任务
func (jobMgr *JobMgr) DeleteJob(job_etcd_name string, node_ip string) (oldJob *common.Job, err error) {
func (jobMgr *JobMgr) DeleteJob(req *http.Request) (oldJob *common.Job, err error) {
var (
job_etcd_name string
node_ip string
jobKey string
delResp *clientv3.DeleteResponse
oldJobObj common.Job
)
// etcd中保存任务的key
jobKey = common.JOB_SAVE_DIR + node_ip + "/" + job_etcd_name
job_etcd_name = req.PostForm.Get("job_etcd_name")
node_ip = req.PostForm.Get("node")
// 任务类型:1-普通任务,2-一次性任务
job_type, err := strconv.ParseInt(req.PostForm.Get("job_type"), 10, 64)
if job_type == 1 {
jobKey = common.JOB_SAVE_DIR + node_ip + "/" + job_etcd_name // etcd的保存key: 目录 + IP + 任务名称
} else {
jobKey = common.JOB_ONCE_SAVE_DIR + node_ip + "/" + job_etcd_name // etcd的保存key: 目录 + IP + 任务名称
}
// 从etcd中删除它
if delResp, err = jobMgr.kv.Delete(context.TODO(), jobKey, clientv3.WithPrevKV()); err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment