Commit 1efdb899 by 朱继来

Merge branch 'master' of http://119.23.72.7/sunlong_v5/go-crontab

parents 15fbcc7b c97b9a7c
......@@ -182,7 +182,7 @@ func BuildOnceJobExecuteInfo(job *Job) (jobExecuteInfo *JobExecuteInfo){
PlanTime: time.Now(), // 计算调度时间
RealTime: time.Now(), // 真实调度时间
}
jobExecuteInfo.CancelCtx, jobExecuteInfo.CancelFunc = context.WithCancel(context.TODO())
jobExecuteInfo.CancelCtx, jobExecuteInfo.CancelFunc = context.WithCancel(context.Background())
return
}
......
package main
import (
"context"
"os/exec"
"fmt"
"github.com/coreos/etcd/clientv3"
"context"
"time"
)
func main(){
func main() {
var (
config clientv3.Config
cmd *exec.Cmd
output []byte
err error
client *clientv3.Client
kv clientv3.KV
getResp *clientv3.GetResponse
)
//配置
config = clientv3.Config{
Endpoints:[]string{"192.168.2.232:2379"},
DialTimeout:time.Second*5,
}
//连接 床见一个客户端
if client,err = clientv3.New(config);err != nil{
fmt.Println(err)
return
}
//用于读写etcd的键值对
kv = clientv3.NewKV(client)
//删除key
//kv.Delete(context.TODO(),"/cron/jobs/192.168.2.246/job3",clientv3.WithPrefix())
//
//return
ctx, cancelFunc := context.WithCancel(context.TODO())
//新增定时任务
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job1","{\"name\":\"job1\",\"command\":\"D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/a.php\",\"cronExpr\":\"*/7 * * * * * *\"}",clientv3.WithPrevKV())
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job2","{\"name\":\"job2\",\"command\":\" echo hello world\",\"cronExpr\":\"*/5 * * * * * *\"}",clientv3.WithPrevKV())
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job3","{\"name\":\"job3\",\"command\":\" echo hello boy\",\"cronExpr\":\"*/10 * * * * * *\"}",clientv3.WithPrevKV())
//新增一次性任务
putResp, err := kv.Put(context.TODO(),"/cron/oncejobs/192.168.2.246/job10","{\"name\":\"job10\",\"command\":\" D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/b.php \"}",clientv3.WithPrevKV())
if err != nil{
fmt.Println(err)
}else{
fmt.Println("Revision:",putResp.Header.Revision)
if putResp.PrevKv != nil{
fmt.Println("key:",string(putResp.PrevKv.Key))
fmt.Println("Value:",string(putResp.PrevKv.Value))
fmt.Println("Version:",string(putResp.PrevKv.Version))
go func() {
// 生成Cmd
cmd = exec.CommandContext(ctx, "C:\\cygwin64\\bin\\bash.exe", "-c", "D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/b.php")
fmt.Println("111111111111")
// 执行了命令, 捕获了子进程的输出( pipe )
if output, err = cmd.CombinedOutput(); err != nil {
fmt.Println(err)
return
}
}
//查询
getResp,err = kv.Get(context.TODO(),"/cron/jobs",clientv3.WithPrefix())
if err != nil {
fmt.Println(err)
return
}
for _, kvpair := range getResp.Kvs {
fmt.Println(kvpair)
}
getResp,err = kv.Get(context.TODO(),"/cron/oncejobs",clientv3.WithPrefix())
if err != nil {
fmt.Println(err)
return
}
for _, kvpair := range getResp.Kvs {
fmt.Println(kvpair)
}
}
fmt.Println(string(output))
}()
time.Sleep(time.Second*10)
cancelFunc()
fmt.Println("22222222222222222")
// 打印子进程的输出
time.Sleep(time.Second*1000)
}
\ No newline at end of file
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)
func main(){
var (
config clientv3.Config
err error
client *clientv3.Client
kv clientv3.KV
getResp *clientv3.GetResponse
)
//配置
config = clientv3.Config{
Endpoints:[]string{"192.168.2.232:2379"},
DialTimeout:time.Second*5,
}
//连接 床见一个客户端
if client,err = clientv3.New(config);err != nil{
fmt.Println(err)
return
}
//用于读写etcd的键值对
kv = clientv3.NewKV(client)
//删除key
//kv.Delete(context.TODO(),"/cron/jobs/192.168.2.246/job1",clientv3.WithPrefix())
//kv.Delete(context.TODO(),"/cron/jobs",clientv3.WithPrefix())
//return
//新增定时任务
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job1","{\"name\":\"job1\",\"command\":\"D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/a.php\",\"cronExpr\":\"*/7 * * * * * *\"}",clientv3.WithPrevKV())
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job2","{\"name\":\"job2\",\"command\":\" echo hello world\",\"cronExpr\":\"*/5 * * * * * *\"}",clientv3.WithPrevKV())
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job3","{\"name\":\"job3\",\"command\":\" echo hello boy\",\"cronExpr\":\"*/10 * * * * * *\"}",clientv3.WithPrevKV())
//新增一次性任务
//putResp, err := kv.Put(context.TODO(),"/cron/oncejobs/192.168.2.246/job10","{\"name\":\"job10\",\"command\":\" D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/b.php \"}",clientv3.WithPrevKV())
//强杀任务
putResp, err := kv.Put(context.TODO(),"/cron/killer/192.168.2.246/job10","")
if err != nil{
fmt.Println(err)
}else{
fmt.Println("Revision:",putResp.Header.Revision)
if putResp.PrevKv != nil{
fmt.Println("key:",string(putResp.PrevKv.Key))
fmt.Println("Value:",string(putResp.PrevKv.Value))
fmt.Println("Version:",string(putResp.PrevKv.Version))
}
}
//查询
getResp,err = kv.Get(context.TODO(),"/cron/jobs",clientv3.WithPrefix())
if err != nil {
fmt.Println(err)
return
}
for _, kvpair := range getResp.Kvs {
fmt.Println(kvpair)
}
getResp,err = kv.Get(context.TODO(),"/cron/oncejobs",clientv3.WithPrefix())
if err != nil {
fmt.Println(err)
return
}
for _, kvpair := range getResp.Kvs {
fmt.Println(kvpair)
}
}
......@@ -2,9 +2,9 @@ package worker
import (
"go-crontab/common"
"math/rand"
"os/exec"
"time"
"math/rand"
)
// 任务执行器
......
......@@ -2,6 +2,7 @@ package worker
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"go-crontab/common"
......@@ -142,6 +143,8 @@ func(jobMgr *JobMgr) watchOnceJobs(){
// 事件推给scheduler
G_scheduler.PushOnceJobEvent(jobEvent)
case mvccpb.DELETE: // killer标记过期, 被自动删除
//一次性任务执行完成后 就不会继续执行 所以此处不需要监听删除任务
//想要删除任务可以 监听一个强杀任务
}
}
}
......@@ -176,6 +179,9 @@ func (jobMgr *JobMgr) watchKiller() {
switch watchEvent.Type {
case mvccpb.PUT: // 杀死任务事件
jobName = common.ExtractKillerName(string(watchEvent.Kv.Key))
fmt.Println("监听到了强杀任务")
fmt.Println(jobName)
jobName = common.TrimIp(localIp,jobName)
job = &common.Job{Name: jobName}
jobEvent = common.BuildJobEvent(common.JOB_EVENT_KILL, job)
// 事件推给scheduler
......@@ -228,8 +234,8 @@ func InitJobMgr() (err error) {
G_jobMgr.watchOnceJobs()
// 启动监听killer
//G_jobMgr.watchKiller()
// 启动监听killer 强杀任务
G_jobMgr.watchKiller()
return
}
......
......@@ -40,35 +40,16 @@ func (scheduler *Scheduler) handleJobEvent(jobEvent *common.JobEvent) {
}
case common.JOB_EVENT_KILL: // 强杀任务事件
// 取消掉Command执行, 判断任务是否在执行中
fmt.Println("在任务执行表中查看是否有改任务进行中,有就取消正在执行的任务")
if jobExecuteInfo, jobExecuting = scheduler.jobExecutingTable[jobEvent.Job.Name]; jobExecuting {
fmt.Println("找到正在执行的任务了,开始取消任务",jobEvent.Job.Name)
fmt.Printf("%+v",jobExecuteInfo)
jobExecuteInfo.CancelFunc() // 触发command杀死shell子进程, 任务得到退出
}
}
}
// 处理任务事件 一次性任务
func (scheduler *Scheduler) handleOnceJobEvent(jobEvent *common.JobEvent) {
var (
jobSchedulePlan *common.JobSchedulePlan
jobExecuteInfo *common.JobExecuteInfo
jobExecuting bool
err error
)
switch jobEvent.EventType {
case common.JOB_EVENT_SAVE: // 保存任务事件
if jobSchedulePlan, err = common.BuildJobSchedulePlan(jobEvent.Job); err != nil {
return
}
scheduler.jobPlanTable[jobEvent.Job.Name] = jobSchedulePlan
case common.JOB_EVENT_KILL: // 强杀任务事件
// 取消掉Command执行, 判断任务是否在执行中
if jobExecuteInfo, jobExecuting = scheduler.jobExecutingTable[jobEvent.Job.Name]; jobExecuting {
jobExecuteInfo.CancelFunc() // 触发command杀死shell子进程, 任务得到退出
}
}
}
// 尝试执行任务
func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment