Commit c97b9a7c by 孙龙

init

parent 70a1b3d5
...@@ -181,7 +181,7 @@ func BuildOnceJobExecuteInfo(job *Job) (jobExecuteInfo *JobExecuteInfo){ ...@@ -181,7 +181,7 @@ func BuildOnceJobExecuteInfo(job *Job) (jobExecuteInfo *JobExecuteInfo){
PlanTime: time.Now(), // 计算调度时间 PlanTime: time.Now(), // 计算调度时间
RealTime: time.Now(), // 真实调度时间 RealTime: time.Now(), // 真实调度时间
} }
jobExecuteInfo.CancelCtx, jobExecuteInfo.CancelFunc = context.WithCancel(context.TODO()) jobExecuteInfo.CancelCtx, jobExecuteInfo.CancelFunc = context.WithCancel(context.Background())
return return
} }
......
package main package main
import ( import (
"context" "os/exec"
"fmt" "fmt"
"github.com/coreos/etcd/clientv3" "context"
"time" "time"
) )
func main(){ func main() {
var ( var (
config clientv3.Config cmd *exec.Cmd
output []byte
err error err error
client *clientv3.Client
kv clientv3.KV
getResp *clientv3.GetResponse
) )
//配置 ctx, cancelFunc := context.WithCancel(context.TODO())
config = clientv3.Config{
Endpoints:[]string{"192.168.2.232:2379"},
DialTimeout:time.Second*5,
}
//连接 床见一个客户端
if client,err = clientv3.New(config);err != nil{
fmt.Println(err)
return
}
//用于读写etcd的键值对
kv = clientv3.NewKV(client)
//删除key
//kv.Delete(context.TODO(),"/cron/jobs/192.168.2.246/job3",clientv3.WithPrefix())
//
//return
//新增定时任务 go func() {
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job1","{\"name\":\"job1\",\"command\":\"D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/a.php\",\"cronExpr\":\"*/7 * * * * * *\"}",clientv3.WithPrevKV()) // 生成Cmd
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job2","{\"name\":\"job2\",\"command\":\" echo hello world\",\"cronExpr\":\"*/5 * * * * * *\"}",clientv3.WithPrevKV()) cmd = exec.CommandContext(ctx, "C:\\cygwin64\\bin\\bash.exe", "-c", "D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/b.php")
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job3","{\"name\":\"job3\",\"command\":\" echo hello boy\",\"cronExpr\":\"*/10 * * * * * *\"}",clientv3.WithPrevKV()) fmt.Println("111111111111")
// 执行了命令, 捕获了子进程的输出( pipe )
if output, err = cmd.CombinedOutput(); err != nil {
//新增一次性任务 fmt.Println(err)
putResp, err := kv.Put(context.TODO(),"/cron/oncejobs/192.168.2.246/job10","{\"name\":\"job10\",\"command\":\" D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/b.php \"}",clientv3.WithPrevKV()) return
if err != nil{
fmt.Println(err)
}else{
fmt.Println("Revision:",putResp.Header.Revision)
if putResp.PrevKv != nil{
fmt.Println("key:",string(putResp.PrevKv.Key))
fmt.Println("Value:",string(putResp.PrevKv.Value))
fmt.Println("Version:",string(putResp.PrevKv.Version))
} }
} fmt.Println(string(output))
}()
//查询 time.Sleep(time.Second*10)
getResp,err = kv.Get(context.TODO(),"/cron/jobs",clientv3.WithPrefix()) cancelFunc()
if err != nil { fmt.Println("22222222222222222")
fmt.Println(err) // 打印子进程的输出
return time.Sleep(time.Second*1000)
} }
for _, kvpair := range getResp.Kvs { \ No newline at end of file
fmt.Println(kvpair)
}
getResp,err = kv.Get(context.TODO(),"/cron/oncejobs",clientv3.WithPrefix())
if err != nil {
fmt.Println(err)
return
}
for _, kvpair := range getResp.Kvs {
fmt.Println(kvpair)
}
}
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)
func main(){
var (
config clientv3.Config
err error
client *clientv3.Client
kv clientv3.KV
getResp *clientv3.GetResponse
)
//配置
config = clientv3.Config{
Endpoints:[]string{"192.168.2.232:2379"},
DialTimeout:time.Second*5,
}
//连接 床见一个客户端
if client,err = clientv3.New(config);err != nil{
fmt.Println(err)
return
}
//用于读写etcd的键值对
kv = clientv3.NewKV(client)
//删除key
//kv.Delete(context.TODO(),"/cron/jobs/192.168.2.246/job1",clientv3.WithPrefix())
//kv.Delete(context.TODO(),"/cron/jobs",clientv3.WithPrefix())
//return
//新增定时任务
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job1","{\"name\":\"job1\",\"command\":\"D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/a.php\",\"cronExpr\":\"*/7 * * * * * *\"}",clientv3.WithPrevKV())
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job2","{\"name\":\"job2\",\"command\":\" echo hello world\",\"cronExpr\":\"*/5 * * * * * *\"}",clientv3.WithPrevKV())
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job3","{\"name\":\"job3\",\"command\":\" echo hello boy\",\"cronExpr\":\"*/10 * * * * * *\"}",clientv3.WithPrevKV())
//新增一次性任务
//putResp, err := kv.Put(context.TODO(),"/cron/oncejobs/192.168.2.246/job10","{\"name\":\"job10\",\"command\":\" D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/b.php \"}",clientv3.WithPrevKV())
//强杀任务
putResp, err := kv.Put(context.TODO(),"/cron/killer/192.168.2.246/job10","")
if err != nil{
fmt.Println(err)
}else{
fmt.Println("Revision:",putResp.Header.Revision)
if putResp.PrevKv != nil{
fmt.Println("key:",string(putResp.PrevKv.Key))
fmt.Println("Value:",string(putResp.PrevKv.Value))
fmt.Println("Version:",string(putResp.PrevKv.Version))
}
}
//查询
getResp,err = kv.Get(context.TODO(),"/cron/jobs",clientv3.WithPrefix())
if err != nil {
fmt.Println(err)
return
}
for _, kvpair := range getResp.Kvs {
fmt.Println(kvpair)
}
getResp,err = kv.Get(context.TODO(),"/cron/oncejobs",clientv3.WithPrefix())
if err != nil {
fmt.Println(err)
return
}
for _, kvpair := range getResp.Kvs {
fmt.Println(kvpair)
}
}
...@@ -2,9 +2,9 @@ package worker ...@@ -2,9 +2,9 @@ package worker
import ( import (
"go-crontab/common" "go-crontab/common"
"math/rand"
"os/exec" "os/exec"
"time" "time"
"math/rand"
) )
// 任务执行器 // 任务执行器
......
...@@ -2,6 +2,7 @@ package worker ...@@ -2,6 +2,7 @@ package worker
import ( import (
"context" "context"
"fmt"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/mvcc/mvccpb"
"go-crontab/common" "go-crontab/common"
...@@ -142,6 +143,8 @@ func(jobMgr *JobMgr) watchOnceJobs(){ ...@@ -142,6 +143,8 @@ func(jobMgr *JobMgr) watchOnceJobs(){
// 事件推给scheduler // 事件推给scheduler
G_scheduler.PushOnceJobEvent(jobEvent) G_scheduler.PushOnceJobEvent(jobEvent)
case mvccpb.DELETE: // killer标记过期, 被自动删除 case mvccpb.DELETE: // killer标记过期, 被自动删除
//一次性任务执行完成后 就不会继续执行 所以此处不需要监听删除任务
//想要删除任务可以 监听一个强杀任务
} }
} }
} }
...@@ -176,6 +179,9 @@ func (jobMgr *JobMgr) watchKiller() { ...@@ -176,6 +179,9 @@ func (jobMgr *JobMgr) watchKiller() {
switch watchEvent.Type { switch watchEvent.Type {
case mvccpb.PUT: // 杀死任务事件 case mvccpb.PUT: // 杀死任务事件
jobName = common.ExtractKillerName(string(watchEvent.Kv.Key)) jobName = common.ExtractKillerName(string(watchEvent.Kv.Key))
fmt.Println("监听到了强杀任务")
fmt.Println(jobName)
jobName = common.TrimIp(localIp,jobName)
job = &common.Job{Name: jobName} job = &common.Job{Name: jobName}
jobEvent = common.BuildJobEvent(common.JOB_EVENT_KILL, job) jobEvent = common.BuildJobEvent(common.JOB_EVENT_KILL, job)
// 事件推给scheduler // 事件推给scheduler
...@@ -228,8 +234,8 @@ func InitJobMgr() (err error) { ...@@ -228,8 +234,8 @@ func InitJobMgr() (err error) {
G_jobMgr.watchOnceJobs() G_jobMgr.watchOnceJobs()
// 启动监听killer // 启动监听killer 强杀任务
//G_jobMgr.watchKiller() G_jobMgr.watchKiller()
return return
} }
......
...@@ -40,35 +40,16 @@ func (scheduler *Scheduler) handleJobEvent(jobEvent *common.JobEvent) { ...@@ -40,35 +40,16 @@ func (scheduler *Scheduler) handleJobEvent(jobEvent *common.JobEvent) {
} }
case common.JOB_EVENT_KILL: // 强杀任务事件 case common.JOB_EVENT_KILL: // 强杀任务事件
// 取消掉Command执行, 判断任务是否在执行中 // 取消掉Command执行, 判断任务是否在执行中
fmt.Println("在任务执行表中查看是否有改任务进行中,有就取消正在执行的任务")
if jobExecuteInfo, jobExecuting = scheduler.jobExecutingTable[jobEvent.Job.Name]; jobExecuting { if jobExecuteInfo, jobExecuting = scheduler.jobExecutingTable[jobEvent.Job.Name]; jobExecuting {
fmt.Println("找到正在执行的任务了,开始取消任务",jobEvent.Job.Name)
fmt.Printf("%+v",jobExecuteInfo)
jobExecuteInfo.CancelFunc() // 触发command杀死shell子进程, 任务得到退出 jobExecuteInfo.CancelFunc() // 触发command杀死shell子进程, 任务得到退出
} }
} }
} }
// 处理任务事件 一次性任务
func (scheduler *Scheduler) handleOnceJobEvent(jobEvent *common.JobEvent) {
var (
jobSchedulePlan *common.JobSchedulePlan
jobExecuteInfo *common.JobExecuteInfo
jobExecuting bool
err error
)
switch jobEvent.EventType {
case common.JOB_EVENT_SAVE: // 保存任务事件
if jobSchedulePlan, err = common.BuildJobSchedulePlan(jobEvent.Job); err != nil {
return
}
scheduler.jobPlanTable[jobEvent.Job.Name] = jobSchedulePlan
case common.JOB_EVENT_KILL: // 强杀任务事件
// 取消掉Command执行, 判断任务是否在执行中
if jobExecuteInfo, jobExecuting = scheduler.jobExecutingTable[jobEvent.Job.Name]; jobExecuting {
jobExecuteInfo.CancelFunc() // 触发command杀死shell子进程, 任务得到退出
}
}
}
// 尝试执行任务 // 尝试执行任务
func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) { func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment