Commit de7b48d9 by 朱继来

新增/修改任务

parent 1efdb899
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"github.com/gorhill/cronexpr" "github.com/gorhill/cronexpr"
"strings" "strings"
"time" "time"
_"go.mongodb.org/mongo-driver/bson/primitive"
) )
// 定时任务 // 定时任务
...@@ -13,7 +14,6 @@ type Job struct { ...@@ -13,7 +14,6 @@ type Job struct {
Name string `json:"name"` // 任务名 Name string `json:"name"` // 任务名
Command string `json:"command"` // shell命令 Command string `json:"command"` // shell命令
CronExpr string `json:"cronExpr"` // cron表达式 CronExpr string `json:"cronExpr"` // cron表达式
NodeIp string `json:"nodeIp"`
} }
// 任务调度计划 // 任务调度计划
...@@ -81,6 +81,24 @@ type SortLogByStartTime struct { ...@@ -81,6 +81,24 @@ type SortLogByStartTime struct {
SortOrder int `bson:"startTime"` // {startTime: -1} SortOrder int `bson:"startTime"` // {startTime: -1}
} }
// 定时任务
type CronJobs struct {
//Id primitive.ObjectID `bson:"_id"`
JobName string `bson:"job_name"` // 任务名字
EtcdKey string `bson:"etcd_key"` // etcd key
Node string `bson:"node"` // 所属节点
Group int64 `bson:"group"` // 所属分组
Command string `bson:"command"` // 脚本命令
CronExpr string `bson:"cron_expr"` // cron表达式
ConcurrencyNum int64 `bson:"concurrency_num"` // 并发数
JobType int64 `bson:"job_type"` // 任务类型,1-普通任务,2-一次性任务
Status int64 `bson:"status"` // 状态,1-启用,-1-禁用
Creator int64 `bson:"creator"` // 创建人
Modifier int64 `bson:"modifier"` // 修改人
CreateTime int64 `bson:"create_time"` // 创建时间
UpdateTime int64 `bson:"update_time"` // 修改时间
}
// 应答方法 // 应答方法
func BuildResponse(errno int, msg string, data interface{}) (resp []byte, err error) { func BuildResponse(errno int, msg string, data interface{}) (resp []byte, err error) {
// 1, 定义一个response // 1, 定义一个response
......
...@@ -26,7 +26,7 @@ func handleJobSave(resp http.ResponseWriter, req *http.Request) { ...@@ -26,7 +26,7 @@ func handleJobSave(resp http.ResponseWriter, req *http.Request) {
var ( var (
err error err error
//postJob string //postJob string
job common.Job //job common.Job
oldJob *common.Job oldJob *common.Job
bytes []byte bytes []byte
) )
...@@ -44,14 +44,8 @@ func handleJobSave(resp http.ResponseWriter, req *http.Request) { ...@@ -44,14 +44,8 @@ func handleJobSave(resp http.ResponseWriter, req *http.Request) {
// goto ERR // goto ERR
//} //}
// 调整2、3步骤,直接赋值给job
job.Name = req.FormValue("job_name");
job.Command = req.FormValue("command");
job.CronExpr = req.FormValue("cron_expr");
job.NodeIp = req.FormValue("node_ip");
// 4, 保存到etcd // 4, 保存到etcd
if oldJob, err = G_jobMgr.SaveJob(&job); err != nil { if oldJob, err = G_jobMgr.SaveJob(req); err != nil {
goto ERR goto ERR
} }
......
package master package master
import ( import (
"github.com/coreos/etcd/clientv3"
"time"
"go-crontab/common"
"encoding/json"
"context" "context"
"encoding/json"
"fmt"
_"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/mvcc/mvccpb"
"go-crontab/common"
"net/http"
"strconv"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
) )
// 任务管理器 // 任务管理器
...@@ -14,6 +22,8 @@ type JobMgr struct { ...@@ -14,6 +22,8 @@ type JobMgr struct {
client *clientv3.Client client *clientv3.Client
kv clientv3.KV kv clientv3.KV
lease clientv3.Lease lease clientv3.Lease
mongoClient *mongo.Client
jobCollection *mongo.Collection
} }
var ( var (
...@@ -28,6 +38,8 @@ func InitJobMgr() (err error) { ...@@ -28,6 +38,8 @@ func InitJobMgr() (err error) {
client *clientv3.Client client *clientv3.Client
kv clientv3.KV kv clientv3.KV
lease clientv3.Lease lease clientv3.Lease
mongoClient *mongo.Client
jobCollection *mongo.Collection
) )
// 初始化配置 // 初始化配置
...@@ -45,27 +57,46 @@ func InitJobMgr() (err error) { ...@@ -45,27 +57,46 @@ func InitJobMgr() (err error) {
kv = clientv3.NewKV(client) kv = clientv3.NewKV(client)
lease = clientv3.NewLease(client) lease = clientv3.NewLease(client)
// 建立mongodb连接
mongoOptions := options.Client().ApplyURI(G_config.MongodbUri)
if mongoClient, err = mongo.Connect(context.TODO(),mongoOptions); err != nil {
return
}
// 获取数据库和集合
jobCollection = mongoClient.Database("ichunt").Collection("cron_jobs")
// 赋值单例 // 赋值单例
G_jobMgr = &JobMgr{ G_jobMgr = &JobMgr{
client: client, client: client,
kv: kv, kv: kv,
lease: lease, lease: lease,
mongoClient: mongoClient,
jobCollection: jobCollection,
} }
return return
} }
// 保存任务 // 保存任务
func (jobMgr *JobMgr) SaveJob(job *common.Job) (oldJob *common.Job, err error) { func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error) {
// 把任务保存到/cron/jobs/任务名 -> json // 把任务保存到/cron/jobs/任务名 -> json
var ( var (
job common.Job
jobKey string jobKey string
jobValue []byte jobValue []byte
putResp *clientv3.PutResponse putResp *clientv3.PutResponse
oldJobObj common.Job oldJobObj common.Job
cronJob common.CronJobs
) )
// etcd的保存key // 赋值给job
jobKey = common.JOB_SAVE_DIR + job.NodeIp + "/" + job.Name job.Name = req.FormValue("job_name");
job.Command = req.FormValue("command");
job.CronExpr = req.FormValue("cron_expr");
nodeIp := req.FormValue("node");
// etcd的保存key: 目录 + IP + 任务名称
jobKey = common.JOB_SAVE_DIR + nodeIp + "/" + job.Name
// 任务信息json // 任务信息json
if jobValue, err = json.Marshal(job); err != nil { if jobValue, err = json.Marshal(job); err != nil {
return return
...@@ -83,6 +114,47 @@ func (jobMgr *JobMgr) SaveJob(job *common.Job) (oldJob *common.Job, err error) { ...@@ -83,6 +114,47 @@ func (jobMgr *JobMgr) SaveJob(job *common.Job) (oldJob *common.Job, err error) {
} }
oldJob = &oldJobObj oldJob = &oldJobObj
} }
// 同步保存到mongo
cron_job_id := req.FormValue("id")
cronJob.JobName = req.FormValue("job_name")
cronJob.EtcdKey = jobKey
cronJob.Node = req.FormValue("node")
cronJob.Group, err = strconv.ParseInt(req.FormValue("group"), 10, 64)
cronJob.Command = req.FormValue("command")
cronJob.CronExpr = req.FormValue("cron_expr")
cronJob.ConcurrencyNum, err = strconv.ParseInt(req.FormValue("concurrency_num"), 10, 64)
cronJob.JobType, err = strconv.ParseInt(req.FormValue("job_type"), 10, 64)
cronJob.Status = 1
if cron_job_id == "" { // 新增
cronJob.Creator, err = strconv.ParseInt(req.FormValue("creator"), 10, 64)
cronJob.CreateTime = time.Now().Unix()
cronJob.UpdateTime = time.Now().Unix()
_, err = jobMgr.jobCollection.InsertOne(context.TODO(), &cronJob)
} else { // 修改
modifier := req.FormValue("modifier")
if modifier != "" {
cronJob.Modifier, err = strconv.ParseInt(modifier, 10, 64)
}
cronJob.UpdateTime = time.Now().Unix()
// 修改部分值
update := bson.M{"$set": bson.M{"node": cronJob.Node, "group": cronJob.Group, "command": cronJob.Command,
"cron_expr": cronJob.CronExpr, "concurrency_num": cronJob.ConcurrencyNum, "job_type": cronJob.JobType, "update_time": cronJob.UpdateTime}}
objectId, err := primitive.ObjectIDFromHex(cron_job_id)
_, err = jobMgr.jobCollection.UpdateOne(context.TODO(), bson.M{"_id": objectId}, update)
fmt.Println(err)
}
if err != nil {
return
}
return return
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
"webroot": "./webroot", "webroot": "./webroot",
"mongodb地址": "采用mongodb URI", "mongodb地址": "采用mongodb URI",
"mongodbUri": "mongodb://36.111.184.221:27017", "mongodbUri": "mongodb://ichunt:huntmon6699@192.168.1.237:27017/ichunt?authMechanism=SCRAM-SHA-1",
"mongodb连接超时时间": "单位毫秒", "mongodb连接超时时间": "单位毫秒",
"mongodbConnectTimeout": 5000 "mongodbConnectTimeout": 5000
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment