Commit 03a22063 by 孙龙

init

parent 0f3776a9
......@@ -40,8 +40,8 @@ func main(){
//新增
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/job1","{\"name\":\"定时任务1\",\"command\":\"D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/a.php\",\"cronExpr\":\"/5 * * * * * *\"}",clientv3.WithPrevKV())
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/job2","{\"name\":\"定时任务2\",\" echo hello world \",\"cronExpr\":\"/7 * * * * * *\"}",clientv3.WithPrevKV())
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job1","{\"name\":\"job1\",\"command\":\"D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/a.php\",\"cronExpr\":\"*/7 * * * * * *\"}",clientv3.WithPrevKV())
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job2","{\"name\":\"job2\",\"command\":\" echo hello world\",\"cronExpr\":\"*/5 * * * * * *\"}",clientv3.WithPrevKV())
//if err != nil{
// fmt.Println(err)
//}else{
......
......@@ -27,6 +27,7 @@ func (executor *Executor) ExecuteJob(info *common.JobExecuteInfo) {
jobLock *JobLock
)
// 任务结果
result = &common.JobExecuteResult{
ExecuteInfo: info,
......@@ -54,7 +55,8 @@ func (executor *Executor) ExecuteJob(info *common.JobExecuteInfo) {
result.StartTime = time.Now()
// 执行shell命令
cmd = exec.CommandContext(info.CancelCtx, "/bin/bash", "-c", info.Job.Command)
//cmd = exec.CommandContext(info.CancelCtx, "/bin/bash", "-c", info.Job.Command)
cmd = exec.CommandContext(info.CancelCtx, "C:\\cygwin64\\bin\\bash.exe", "-c", info.Job.Command)
// 执行并捕获输出
output, err = cmd.CombinedOutput()
......
package worker
import (
"github.com/coreos/etcd/clientv3"
"context"
"github.com/coreos/etcd/clientv3"
"go-crontab/common"
)
......@@ -39,8 +39,13 @@ func (jobLock *JobLock) TryLock() (err error) {
txn clientv3.Txn
lockKey string
txnResp *clientv3.TxnResponse
localIp string
)
if localIp, err = getLocalIP(); err != nil {
localIp = ""
}
// 1, 创建租约(5秒)
if leaseGrantResp, err = jobLock.lease.Grant(context.TODO(), 5); err != nil {
return
......@@ -77,7 +82,7 @@ func (jobLock *JobLock) TryLock() (err error) {
txn = jobLock.kv.Txn(context.TODO())
// 锁路径
lockKey = common.JOB_LOCK_DIR + jobLock.jobName
lockKey = common.JOB_LOCK_DIR +localIp + "/" + jobLock.jobName
// 5, 事务抢锁
txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
......
......@@ -2,6 +2,7 @@ package worker
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"go-crontab/common"
......@@ -33,16 +34,29 @@ func (jobMgr *JobMgr) watchJobs() (err error) {
watchEvent *clientv3.Event
jobName string
jobEvent *common.JobEvent
localIp string
)
// 1, get一下/cron/jobs/目录下的所有任务,并且获知当前集群的revision
if getResp, err = jobMgr.kv.Get(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithPrefix()); err != nil {
if localIp, err = getLocalIP(); err != nil {
return
}
//fmt.Println(localIp)
// 1, get一下/cron/jobs/IP/目录下的所有任务,并且获知当前集群的revision
if getResp, err = jobMgr.kv.Get(context.TODO(), common.JOB_SAVE_DIR+localIp, clientv3.WithPrefix()); err != nil {
return
}
//fmt.Println(getResp)
// 当前有哪些任务
//key:"/cron/jobs/192.168.2.246/job1"
//create_revision:61
//mod_revision:61
//version:1
//value:"{"name":"","command":" ","cronExpr":"*/5 * * * * * *"}"
for _, kvpair = range getResp.Kvs {
// 反序列化json得到Job
if job, err = common.UnpackJob(kvpair.Value); err == nil {
......@@ -57,7 +71,7 @@ func (jobMgr *JobMgr) watchJobs() (err error) {
// 从GET时刻的后续版本开始监听变化
watchStartRevision = getResp.Header.Revision + 1
// 监听/cron/jobs/目录的后续变化
watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithRev(watchStartRevision), clientv3.WithPrefix())
watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_SAVE_DIR+localIp, clientv3.WithRev(watchStartRevision), clientv3.WithPrefix())
// 处理监听事件
for watchResp = range watchChan {
for _, watchEvent = range watchResp.Events {
......@@ -67,6 +81,7 @@ func (jobMgr *JobMgr) watchJobs() (err error) {
if job, err = common.UnpackJob(watchEvent.Kv.Value); err != nil {
continue
}
fmt.Printf("修改了任务%s",job)
// 构建一个更新Event
jobEvent = common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
case mvccpb.DELETE: // 任务被删除了
......@@ -97,11 +112,19 @@ func (jobMgr *JobMgr) watchKiller() {
jobEvent *common.JobEvent
jobName string
job *common.Job
localIp string
err error
)
if localIp, err = getLocalIP(); err != nil {
return
}
// 监听/cron/killer目录
go func() { // 监听协程
// 监听/cron/killer/目录的变化
watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_KILLER_DIR, clientv3.WithPrefix())
watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_KILLER_DIR+localIp, clientv3.WithPrefix())
// 处理监听事件
for watchResp = range watchChan {
for _, watchEvent = range watchResp.Events {
......@@ -157,7 +180,7 @@ func InitJobMgr() (err error) {
G_jobMgr.watchJobs()
// 启动监听killer
G_jobMgr.watchKiller()
//G_jobMgr.watchKiller()
return
}
......
......@@ -68,7 +68,7 @@ func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) {
scheduler.jobExecutingTable[jobPlan.Job.Name] = jobExecuteInfo
// 执行任务
fmt.Println("执行任务:", jobExecuteInfo.Job.Name, jobExecuteInfo.PlanTime, jobExecuteInfo.RealTime)
fmt.Println("执行任务:", jobExecuteInfo.Job.Name," 计划执行时间:", jobExecuteInfo.PlanTime.Format("2006-01-02 15:04:05")," 实际执行时间:", jobExecuteInfo.RealTime.Format("2006-01-02 15:04:05"))
G_executor.ExecuteJob(jobExecuteInfo)
}
......@@ -159,6 +159,8 @@ func (scheduler *Scheduler) scheduleLoop() {
scheduleTimer = time.NewTimer(scheduleAfter)
// 定时任务common.Job
// 变化事件
for {
select {
case jobEvent = <- scheduler.jobEventChan: //监听任务变化事件 新增job或者修改job操作会插入到该管道
......
......@@ -53,16 +53,16 @@ func main() {
}
//
// 启动执行器
if err = worker.InitExecutor(); err != nil {
goto ERR
}
//if err = worker.InitExecutor(); err != nil {
// goto ERR
//}
//
//// 启动调度器
if err = worker.InitScheduler(); err != nil {
goto ERR
}
//
//// 初始化任务管理器
//// 初始化任务管理器 监听job
if err = worker.InitJobMgr(); err != nil {
goto ERR
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment