Commit f6ed61e0 by 孙龙

Merge branch 'master' of http://119.23.72.7/sunlong_v5/go-crontab

parents ac40368e 348cc425
package common
const (
// 定时任务保存目录
JOB_DIR = "/cron/"
// 定时任务任务保存目录
JOB_SAVE_DIR = "/cron/jobs/"
......
......@@ -6,6 +6,7 @@ import (
"github.com/gorhill/cronexpr"
"strings"
"time"
_"go.mongodb.org/mongo-driver/bson/primitive"
)
// 定时任务
......@@ -80,6 +81,24 @@ type SortLogByStartTime struct {
SortOrder int `bson:"startTime"` // {startTime: -1}
}
// 定时任务
type CronJobs struct {
//Id primitive.ObjectID `bson:"_id"`
JobName string `bson:"job_name"` // 任务名字
EtcdKey string `bson:"etcd_key"` // etcd key
Node string `bson:"node"` // 所属节点
Group int64 `bson:"group"` // 所属分组
Command string `bson:"command"` // 脚本命令
CronExpr string `bson:"cron_expr"` // cron表达式
ConcurrencyNum int64 `bson:"concurrency_num"` // 并发数
JobType int64 `bson:"job_type"` // 任务类型,1-普通任务,2-一次性任务
Status int64 `bson:"status"` // 状态,1-启用,-1-禁用
Creator int64 `bson:"creator"` // 创建人
Modifier int64 `bson:"modifier"` // 修改人
CreateTime int64 `bson:"create_time"` // 创建时间
UpdateTime int64 `bson:"update_time"` // 修改时间
}
// 应答方法
func BuildResponse(errno int, msg string, data interface{}) (resp []byte, err error) {
// 1, 定义一个response
......
......@@ -11,7 +11,7 @@ require (
github.com/golang/snappy v0.0.1 // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
github.com/mongodb/mongo-go-driver v1.2.0
github.com/mongodb/mongo-go-driver v1.2.1
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v1.0.0 // indirect
go.mongodb.org/mongo-driver v1.2.0
......
......@@ -37,6 +37,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mongodb/mongo-go-driver v1.2.0 h1:0/Qg6sZ+mVjB1C1IPFhAyRNvuT8WOIpZaQJOu/AnS6A=
github.com/mongodb/mongo-go-driver v1.2.0/go.mod h1:NK/HWDIIZkaYsnYa0hmtP443T5ELr0KDecmIioVuuyU=
github.com/mongodb/mongo-go-driver v1.2.1 h1:QtCZFfMl2khU0kUNih0H1i0gxq9OO29JTTCLh3pI/Z8=
github.com/mongodb/mongo-go-driver v1.2.1/go.mod h1:NK/HWDIIZkaYsnYa0hmtP443T5ELr0KDecmIioVuuyU=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
......
package master
import (
"net/http"
_ "encoding/json"
_ "fmt"
"go-crontab/common"
"net"
"time"
"net/http"
"strconv"
"go-crontab/common"
"encoding/json"
"time"
)
// 任务的HTTP接口
......@@ -24,8 +25,8 @@ var (
func handleJobSave(resp http.ResponseWriter, req *http.Request) {
var (
err error
postJob string
job common.Job
//postJob string
//job common.Job
oldJob *common.Job
bytes []byte
)
......@@ -34,16 +35,20 @@ func handleJobSave(resp http.ResponseWriter, req *http.Request) {
if err = req.ParseForm(); err != nil {
goto ERR
}
// 2, 取表单中的job字段
postJob = req.PostForm.Get("job")
// 3, 反序列化job
if err = json.Unmarshal([]byte(postJob), &job); err != nil {
goto ERR
}
//// 2, 取表单中的job字段
//postJob = req.PostForm.Get("job_name")
//
//// 3, 反序列化job
//if err = json.Unmarshal([]byte(postJob), &job); err != nil {
// goto ERR
//}
// 4, 保存到etcd
if oldJob, err = G_jobMgr.SaveJob(&job); err != nil {
if oldJob, err = G_jobMgr.SaveJob(req); err != nil {
goto ERR
}
// 5, 返回正常应答 ({"errno": 0, "msg": "", "data": {....}})
if bytes, err = common.BuildResponse(0, "success", oldJob); err == nil {
resp.Write(bytes)
......@@ -51,7 +56,7 @@ func handleJobSave(resp http.ResponseWriter, req *http.Request) {
return
ERR:
// 6, 返回异常应答
if bytes, err = common.BuildResponse(-1, err.Error(), nil); err == nil {
if bytes, err = common.BuildResponse(-1, err.Error(), nil); err == nil {
resp.Write(bytes)
}
}
......@@ -61,7 +66,8 @@ ERR:
func handleJobDelete(resp http.ResponseWriter, req *http.Request) {
var (
err error // interface{}
name string
job_name string
node string
oldJob *common.Job
bytes []byte
)
......@@ -71,11 +77,12 @@ func handleJobDelete(resp http.ResponseWriter, req *http.Request) {
goto ERR
}
// 删除的任务名
name = req.PostForm.Get("name")
// 删除的任务名、节点
job_name = req.PostForm.Get("job_name")
node = req.PostForm.Get("node")
// 去删除任务
if oldJob, err = G_jobMgr.DeleteJob(name); err != nil {
if oldJob, err = G_jobMgr.DeleteJob(job_name, node); err != nil {
goto ERR
}
......
package master
import (
"github.com/coreos/etcd/clientv3"
"time"
"go-crontab/common"
"encoding/json"
"context"
"encoding/json"
"fmt"
_ "fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"go-crontab/common"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"net/http"
"strconv"
"time"
)
// 任务管理器
......@@ -14,6 +22,8 @@ type JobMgr struct {
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
mongoClient *mongo.Client
jobCollection *mongo.Collection
}
var (
......@@ -28,6 +38,8 @@ func InitJobMgr() (err error) {
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
mongoClient *mongo.Client
jobCollection *mongo.Collection
)
// 初始化配置
......@@ -45,27 +57,53 @@ func InitJobMgr() (err error) {
kv = clientv3.NewKV(client)
lease = clientv3.NewLease(client)
// 建立mongodb连接
mongoOptions := options.Client().ApplyURI(G_config.MongodbUri)
if mongoClient, err = mongo.Connect(context.TODO(),mongoOptions); err != nil {
return
}
// 获取数据库和集合
jobCollection = mongoClient.Database("ichunt").Collection("cron_jobs")
// 赋值单例
G_jobMgr = &JobMgr{
client: client,
kv: kv,
lease: lease,
mongoClient: mongoClient,
jobCollection: jobCollection,
}
return
}
// 保存任务
func (jobMgr *JobMgr) SaveJob(job *common.Job) (oldJob *common.Job, err error) {
func (jobMgr *JobMgr) SaveJob(req *http.Request) (oldJob *common.Job, err error) {
// 把任务保存到/cron/jobs/任务名 -> json
var (
job common.Job
jobKey string
jobValue []byte
putResp *clientv3.PutResponse
oldJobObj common.Job
cronJob common.CronJobs
)
// etcd的保存key
jobKey = common.JOB_SAVE_DIR + job.Name
// 赋值给job
job.Name = req.FormValue("job_name");
job.Command = req.FormValue("command");
job.CronExpr = req.FormValue("cron_expr");
nodeIp := req.FormValue("node");
// 任务类型:1-普通任务,2-一次性任务
job_type, err := strconv.ParseInt(req.FormValue("job_type"), 10, 64)
if job_type == 1 {
jobKey = common.JOB_SAVE_DIR + nodeIp + "/" + job.Name // etcd的保存key: 目录 + IP + 任务名称
} else {
jobKey = common.JOB_ONCE_SAVE_DIR + nodeIp + "/" + job.Name // etcd的保存key: 目录 + IP + 任务名称
}
// 任务信息json
if jobValue, err = json.Marshal(job); err != nil {
return
......@@ -83,11 +121,52 @@ func (jobMgr *JobMgr) SaveJob(job *common.Job) (oldJob *common.Job, err error) {
}
oldJob = &oldJobObj
}
// 同步保存到mongo
cron_job_id := req.FormValue("id")
cronJob.JobName = req.FormValue("job_name")
cronJob.EtcdKey = jobKey
cronJob.Node = req.FormValue("node")
cronJob.Group, err = strconv.ParseInt(req.FormValue("group"), 10, 64)
cronJob.Command = req.FormValue("command")
cronJob.CronExpr = req.FormValue("cron_expr")
cronJob.ConcurrencyNum, err = strconv.ParseInt(req.FormValue("concurrency_num"), 10, 64)
cronJob.JobType = job_type
cronJob.Status = 1
if cron_job_id == "" { // 新增
cronJob.Creator, err = strconv.ParseInt(req.FormValue("creator"), 10, 64)
cronJob.CreateTime = time.Now().Unix()
cronJob.UpdateTime = time.Now().Unix()
_, err = jobMgr.jobCollection.InsertOne(context.TODO(), &cronJob)
} else { // 修改
modifier := req.FormValue("modifier")
if modifier != "" {
cronJob.Modifier, err = strconv.ParseInt(modifier, 10, 64)
}
cronJob.UpdateTime = time.Now().Unix()
// 修改部分值
update := bson.M{"$set": bson.M{"node": cronJob.Node, "group": cronJob.Group, "command": cronJob.Command,
"cron_expr": cronJob.CronExpr, "concurrency_num": cronJob.ConcurrencyNum, "update_time": cronJob.UpdateTime}}
objectId, err := primitive.ObjectIDFromHex(cron_job_id)
_, err = jobMgr.jobCollection.UpdateOne(context.TODO(), bson.M{"_id": objectId}, update)
fmt.Println(err)
}
if err != nil {
return
}
return
}
// 删除任务
func (jobMgr *JobMgr) DeleteJob(name string) (oldJob *common.Job, err error) {
func (jobMgr *JobMgr) DeleteJob(job_name string, node_ip string) (oldJob *common.Job, err error) {
var (
jobKey string
delResp *clientv3.DeleteResponse
......@@ -95,7 +174,7 @@ func (jobMgr *JobMgr) DeleteJob(name string) (oldJob *common.Job, err error) {
)
// etcd中保存任务的key
jobKey = common.JOB_SAVE_DIR + name
jobKey = common.JOB_SAVE_DIR + node_ip + "/" + job_name
// 从etcd中删除它
if delResp, err = jobMgr.kv.Delete(context.TODO(), jobKey, clientv3.WithPrevKV()); err != nil {
......@@ -111,6 +190,7 @@ func (jobMgr *JobMgr) DeleteJob(name string) (oldJob *common.Job, err error) {
}
oldJob = &oldJobObj
}
return
}
......@@ -124,7 +204,7 @@ func (jobMgr *JobMgr) ListJobs() (jobList []*common.Job, err error) {
)
// 任务保存的目录
dirKey = common.JOB_SAVE_DIR
dirKey = common.JOB_DIR
// 获取目录下所有任务信息
if getResp, err = jobMgr.kv.Get(context.TODO(), dirKey, clientv3.WithPrefix()); err != nil {
......
......@@ -39,38 +39,38 @@ func InitLogMgr() (err error) {
// 查看任务日志
func (logMgr *LogMgr) ListLog(name string, skip int, limit int) (logArr []*common.JobLog, err error){
var (
filter *common.JobLogFilter
logSort *common.SortLogByStartTime
cursor mongo.Cursor
jobLog *common.JobLog
)
// len(logArr)
logArr = make([]*common.JobLog, 0)
// 过滤条件
filter = &common.JobLogFilter{JobName: name}
// 按照任务开始时间倒排
logSort = &common.SortLogByStartTime{SortOrder: -1}
//var (
// filter *common.JobLogFilter
// logSort *common.SortLogByStartTime
// cursor mongo.Cursor
// jobLog *common.JobLog
//)
//
//// len(logArr)
//logArr = make([]*common.JobLog, 0)
//
//// 过滤条件
//filter = &common.JobLogFilter{JobName: name}
//
//// 按照任务开始时间倒排
//logSort = &common.SortLogByStartTime{SortOrder: -1}
// 查询
if cursor, err = logMgr.logCollection.Find(context.TODO(), filter, findopt.Sort(logSort), findopt.Skip(int64(skip)), findopt.Limit(int64(limit))); err != nil {
return
}
//if cursor, err = logMgr.logCollection.Find(context.TODO(), filter, findopt.Sort(logSort), findopt.Skip(int64(skip)), findopt.Limit(int64(limit))); err != nil {
// return
//}
// 延迟释放游标
defer cursor.Close(context.TODO())
for cursor.Next(context.TODO()) {
jobLog = &common.JobLog{}
// 反序列化BSON
if err = cursor.Decode(jobLog); err != nil {
continue // 有日志不合法
}
logArr = append(logArr, jobLog)
}
//defer cursor.Close(context.TODO())
//
//for cursor.Next(context.TODO()) {
// jobLog = &common.JobLog{}
//
// // 反序列化BSON
// if err = cursor.Decode(jobLog); err != nil {
// continue // 有日志不合法
// }
//
// logArr = append(logArr, jobLog)
//}
return
}
\ No newline at end of file
......@@ -18,7 +18,7 @@
"webroot": "./webroot",
"mongodb地址": "采用mongodb URI",
"mongodbUri": "mongodb://36.111.184.221:27017",
"mongodbUri": "mongodb://ichunt:huntmon6699@192.168.1.237:27017/ichunt?authMechanism=SCRAM-SHA-1",
"mongodb连接超时时间": "单位毫秒",
"mongodbConnectTimeout": 5000
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment