Commit e1782c3c by 孙龙

up

parent e9a3f50d
/worker/main/worker.json /worker/main/worker.json
/cmd/* /cmd/*
/go.sum /go.sum
/logs/*
/log/*
...@@ -11,6 +11,7 @@ require ( ...@@ -11,6 +11,7 @@ require (
github.com/golang/snappy v0.0.1 // indirect github.com/golang/snappy v0.0.1 // indirect
github.com/google/uuid v1.1.1 // indirect github.com/google/uuid v1.1.1 // indirect
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
github.com/ichunt2019/logger v1.0.5
github.com/mongodb/mongo-go-driver v1.2.1 github.com/mongodb/mongo-go-driver v1.2.1
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v1.0.0 // indirect github.com/xdg/stringprep v1.0.0 // indirect
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/mvcc/mvccpb"
"github.com/ichunt2019/logger"
"go-crontab/common" "go-crontab/common"
"time" "time"
) )
...@@ -77,6 +78,7 @@ func (jobMgr *JobMgr) watchJobs() (err error) { ...@@ -77,6 +78,7 @@ func (jobMgr *JobMgr) watchJobs() (err error) {
for _, watchEvent = range watchResp.Events { for _, watchEvent = range watchResp.Events {
switch watchEvent.Type { switch watchEvent.Type {
case mvccpb.PUT: // 任务保存事件 新增或者修改 case mvccpb.PUT: // 任务保存事件 新增或者修改
logger.Info(fmt.Sprintf("添加或者修改任务 %+v",watchEvent.Kv))
//反序列化job 推送一个更新事件给scheduler //反序列化job 推送一个更新事件给scheduler
if job, err = common.UnpackJob(watchEvent.Kv.Value); err != nil { if job, err = common.UnpackJob(watchEvent.Kv.Value); err != nil {
continue continue
...@@ -85,6 +87,7 @@ func (jobMgr *JobMgr) watchJobs() (err error) { ...@@ -85,6 +87,7 @@ func (jobMgr *JobMgr) watchJobs() (err error) {
// 构建一个更新Event // 构建一个更新Event
jobEvent = common.BuildJobEvent(common.JOB_EVENT_SAVE, job) jobEvent = common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
case mvccpb.DELETE: // 任务被删除了 case mvccpb.DELETE: // 任务被删除了
logger.Info(fmt.Sprintf("删除任务 %+v",watchEvent.Kv))
//推送一个删除事件给scheduler //推送一个删除事件给scheduler
// Delete /cron/jobs/job10 // Delete /cron/jobs/job10
jobName = common.ExtractJobName(string(watchEvent.Kv.Key)) jobName = common.ExtractJobName(string(watchEvent.Kv.Key))
...@@ -132,6 +135,7 @@ func(jobMgr *JobMgr) watchOnceJobs(){ ...@@ -132,6 +135,7 @@ func(jobMgr *JobMgr) watchOnceJobs(){
for _, watchEvent = range watchResp.Events { for _, watchEvent = range watchResp.Events {
switch watchEvent.Type { switch watchEvent.Type {
case mvccpb.PUT: // 新增或者修改任务 case mvccpb.PUT: // 新增或者修改任务
logger.Info(fmt.Sprintf("添加或者修改任务 %+v",watchEvent.Kv))
//反序列化job 推送一个更新事件给scheduler //反序列化job 推送一个更新事件给scheduler
if job, err = common.UnpackJob(watchEvent.Kv.Value); err != nil { if job, err = common.UnpackJob(watchEvent.Kv.Value); err != nil {
continue continue
...@@ -178,6 +182,7 @@ func (jobMgr *JobMgr) watchKiller() { ...@@ -178,6 +182,7 @@ func (jobMgr *JobMgr) watchKiller() {
for _, watchEvent = range watchResp.Events { for _, watchEvent = range watchResp.Events {
switch watchEvent.Type { switch watchEvent.Type {
case mvccpb.PUT: // 杀死任务事件 case mvccpb.PUT: // 杀死任务事件
logger.Info(fmt.Sprintf("强杀任务 %+v",watchEvent.Kv))
jobName = common.ExtractKillerName(string(watchEvent.Kv.Key)) jobName = common.ExtractKillerName(string(watchEvent.Kv.Key))
fmt.Println("监听到了强杀任务") fmt.Println("监听到了强杀任务")
fmt.Println(jobName) fmt.Println(jobName)
......
...@@ -2,7 +2,9 @@ package worker ...@@ -2,7 +2,9 @@ package worker
import ( import (
"context" "context"
"fmt"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/ichunt2019/logger"
"go-crontab/common" "go-crontab/common"
"net" "net"
"time" "time"
...@@ -82,6 +84,7 @@ func (register *Register) keepOnline() { ...@@ -82,6 +84,7 @@ func (register *Register) keepOnline() {
// 注册到etcd // 注册到etcd
if _, err = register.kv.Put(cancelCtx, regKey, "", clientv3.WithLease(leaseGrantResp.ID)); err != nil { if _, err = register.kv.Put(cancelCtx, regKey, "", clientv3.WithLease(leaseGrantResp.ID)); err != nil {
logger.Error(fmt.Sprintf(" %s 服务注册失败:%s",regKey,err))
goto RETRY goto RETRY
} }
......
package worker package worker
import ( import (
"fmt"
"github.com/ichunt2019/logger"
"go-crontab/common" "go-crontab/common"
"time" "time"
"fmt"
) )
// 任务调度 // 任务调度
...@@ -75,6 +76,7 @@ func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) { ...@@ -75,6 +76,7 @@ func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) {
// 执行任务 // 执行任务
fmt.Println("执行任务:", jobExecuteInfo.Job.Name," 计划执行时间:", jobExecuteInfo.PlanTime.Format("2006-01-02 15:04:05")," 实际执行时间:", jobExecuteInfo.RealTime.Format("2006-01-02 15:04:05")) fmt.Println("执行任务:", jobExecuteInfo.Job.Name," 计划执行时间:", jobExecuteInfo.PlanTime.Format("2006-01-02 15:04:05")," 实际执行时间:", jobExecuteInfo.RealTime.Format("2006-01-02 15:04:05"))
logger.Info(fmt.Sprintf("执行普通任务:[%s],任务内容: [%s] , 计划执行时间:%s 实际执行时间:%s",jobExecuteInfo.Job.Name,jobExecuteInfo.Job.Command,jobExecuteInfo.PlanTime.Format("2006-01-02 15:04:05"),jobExecuteInfo.RealTime.Format("2006-01-02 15:04:05")))
G_executor.ExecuteJob(jobExecuteInfo) G_executor.ExecuteJob(jobExecuteInfo)
} }
...@@ -102,6 +104,7 @@ func (scheduler *Scheduler) TryStartOnceJob(job *common.Job) { ...@@ -102,6 +104,7 @@ func (scheduler *Scheduler) TryStartOnceJob(job *common.Job) {
// 执行任务 // 执行任务
fmt.Println("执行一次性任务:", jobExecuteInfo.Job.Name," 计划执行时间:", jobExecuteInfo.PlanTime.Format("2006-01-02 15:04:05")," 实际执行时间:", jobExecuteInfo.RealTime.Format("2006-01-02 15:04:05")) fmt.Println("执行一次性任务:", jobExecuteInfo.Job.Name," 计划执行时间:", jobExecuteInfo.PlanTime.Format("2006-01-02 15:04:05")," 实际执行时间:", jobExecuteInfo.RealTime.Format("2006-01-02 15:04:05"))
logger.Info(fmt.Sprintf("执行一次性任务:[%s],任务内容: [%s] , 计划执行时间:%s 实际执行时间:%s",jobExecuteInfo.Job.Name,jobExecuteInfo.Job.Command,jobExecuteInfo.PlanTime.Format("2006-01-02 15:04:05"),jobExecuteInfo.RealTime.Format("2006-01-02 15:04:05")))
G_executor.ExecuteJob(jobExecuteInfo) G_executor.ExecuteJob(jobExecuteInfo)
} }
......
...@@ -6,10 +6,12 @@ import ( ...@@ -6,10 +6,12 @@ import (
"go-crontab/worker" "go-crontab/worker"
"runtime" "runtime"
"time" "time"
"github.com/ichunt2019/logger"
) )
var ( var (
confFile string // 配置文件路径 confFile string // 配置文件路径
logDir string // 日志文件路径
) )
// 解析命令行参数 // 解析命令行参数
...@@ -17,6 +19,7 @@ func initArgs() { ...@@ -17,6 +19,7 @@ func initArgs() {
// worker -config ./worker.json // worker -config ./worker.json
// worker -h // worker -h
flag.StringVar(&confFile, "config", "./worker.json", "worker.json") flag.StringVar(&confFile, "config", "./worker.json", "worker.json")
flag.StringVar(&logDir, "logDir", "./logs", "日志文件目录")
flag.Parse() flag.Parse()
} }
...@@ -33,6 +36,15 @@ func main() { ...@@ -33,6 +36,15 @@ func main() {
// 初始化命令行参数 // 初始化命令行参数
initArgs() initArgs()
logConfig := make(map[string]string)
logConfig["log_path"] = logDir
logConfig["log_chan_size"] = "2"
logger.InitLogger("file",logConfig)
logger.Init()
logger.Info("ceshi")
// 初始化线程 // 初始化线程
initEnv() initEnv()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment