Commit 70a1b3d5 by 孙龙

init

parent 8a801ddb
package common package common
const ( const (
// 任务保存目录 // 定时任务任务保存目录
JOB_SAVE_DIR = "/cron/jobs/" JOB_SAVE_DIR = "/cron/jobs/"
// 定时任务任务保存目录
JOB_ONCE_SAVE_DIR = "/cron/oncejobs/"
// 任务强杀目录 // 任务强杀目录
JOB_KILLER_DIR = "/cron/killer/" JOB_KILLER_DIR = "/cron/killer/"
......
package common package common
import ( import (
"context"
"encoding/json" "encoding/json"
"strings"
"github.com/gorhill/cronexpr" "github.com/gorhill/cronexpr"
"strings"
"time" "time"
"context"
) )
// 定时任务 // 定时任务
...@@ -116,6 +116,20 @@ func ExtractJobName(jobKey string) (string) { ...@@ -116,6 +116,20 @@ func ExtractJobName(jobKey string) (string) {
return strings.TrimPrefix(jobKey, JOB_SAVE_DIR) return strings.TrimPrefix(jobKey, JOB_SAVE_DIR)
} }
// 从etcd的key中提取任务名
func ExtractOnceJobName(jobKey string) (string) {
return strings.TrimPrefix(jobKey, JOB_ONCE_SAVE_DIR)
}
// /cron/jobs/job10抹掉/cron/jobs/
func TrimIp(localIp string ,jobKey string) (string) {
return strings.TrimPrefix(jobKey, localIp+"/")
}
// 从 /cron/killer/job10提取job10 // 从 /cron/killer/job10提取job10
func ExtractKillerName(killerKey string) (string) { func ExtractKillerName(killerKey string) (string) {
return strings.TrimPrefix(killerKey, JOB_KILLER_DIR) return strings.TrimPrefix(killerKey, JOB_KILLER_DIR)
...@@ -160,6 +174,17 @@ func BuildJobExecuteInfo(jobSchedulePlan *JobSchedulePlan) (jobExecuteInfo *JobE ...@@ -160,6 +174,17 @@ func BuildJobExecuteInfo(jobSchedulePlan *JobSchedulePlan) (jobExecuteInfo *JobE
return return
} }
//构造一次性任务执行状态
func BuildOnceJobExecuteInfo(job *Job) (jobExecuteInfo *JobExecuteInfo){
jobExecuteInfo = &JobExecuteInfo{
Job: job,
PlanTime: time.Now(), // 计算调度时间
RealTime: time.Now(), // 真实调度时间
}
jobExecuteInfo.CancelCtx, jobExecuteInfo.CancelFunc = context.WithCancel(context.TODO())
return
}
// 提取worker的IP // 提取worker的IP
func ExtractWorkerIP(regKey string) (string) { func ExtractWorkerIP(regKey string) (string) {
return strings.TrimPrefix(regKey, JOB_WORKER_DIR) return strings.TrimPrefix(regKey, JOB_WORKER_DIR)
......
...@@ -34,24 +34,31 @@ func main(){ ...@@ -34,24 +34,31 @@ func main(){
//删除key //删除key
//kv.Delete(context.TODO(),"/cron/jobs",clientv3.WithPrefix()) //kv.Delete(context.TODO(),"/cron/jobs/192.168.2.246/job3",clientv3.WithPrefix())
// //
//return //return
//新增 //新增定时任务
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job1","{\"name\":\"job1\",\"command\":\"D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/a.php\",\"cronExpr\":\"*/7 * * * * * *\"}",clientv3.WithPrevKV()) //putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job1","{\"name\":\"job1\",\"command\":\"D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/a.php\",\"cronExpr\":\"*/7 * * * * * *\"}",clientv3.WithPrevKV())
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job2","{\"name\":\"job2\",\"command\":\" echo hello world\",\"cronExpr\":\"*/5 * * * * * *\"}",clientv3.WithPrevKV()) //putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job2","{\"name\":\"job2\",\"command\":\" echo hello world\",\"cronExpr\":\"*/5 * * * * * *\"}",clientv3.WithPrevKV())
//if err != nil{ //putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job3","{\"name\":\"job3\",\"command\":\" echo hello boy\",\"cronExpr\":\"*/10 * * * * * *\"}",clientv3.WithPrevKV())
// fmt.Println(err)
//}else{
// fmt.Println("Revision:",putResp.Header.Revision) //新增一次性任务
// if putResp.PrevKv != nil{ putResp, err := kv.Put(context.TODO(),"/cron/oncejobs/192.168.2.246/job10","{\"name\":\"job10\",\"command\":\" D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/b.php \"}",clientv3.WithPrevKV())
// fmt.Println("key:",string(putResp.PrevKv.Key))
// fmt.Println("Value:",string(putResp.PrevKv.Value))
// fmt.Println("Version:",string(putResp.PrevKv.Version)) if err != nil{
// } fmt.Println(err)
//} }else{
fmt.Println("Revision:",putResp.Header.Revision)
if putResp.PrevKv != nil{
fmt.Println("key:",string(putResp.PrevKv.Key))
fmt.Println("Value:",string(putResp.PrevKv.Value))
fmt.Println("Version:",string(putResp.PrevKv.Version))
}
}
//查询 //查询
getResp,err = kv.Get(context.TODO(),"/cron/jobs",clientv3.WithPrefix()) getResp,err = kv.Get(context.TODO(),"/cron/jobs",clientv3.WithPrefix())
...@@ -62,5 +69,14 @@ func main(){ ...@@ -62,5 +69,14 @@ func main(){
for _, kvpair := range getResp.Kvs { for _, kvpair := range getResp.Kvs {
fmt.Println(kvpair) fmt.Println(kvpair)
} }
getResp,err = kv.Get(context.TODO(),"/cron/oncejobs",clientv3.WithPrefix())
if err != nil {
fmt.Println(err)
return
}
for _, kvpair := range getResp.Kvs {
fmt.Println(kvpair)
}
} }
...@@ -2,7 +2,6 @@ package worker ...@@ -2,7 +2,6 @@ package worker
import ( import (
"context" "context"
"fmt"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/mvcc/mvccpb"
"go-crontab/common" "go-crontab/common"
...@@ -81,13 +80,17 @@ func (jobMgr *JobMgr) watchJobs() (err error) { ...@@ -81,13 +80,17 @@ func (jobMgr *JobMgr) watchJobs() (err error) {
if job, err = common.UnpackJob(watchEvent.Kv.Value); err != nil { if job, err = common.UnpackJob(watchEvent.Kv.Value); err != nil {
continue continue
} }
fmt.Printf("修改了任务%s",job) //fmt.Printf("修改了任务%s",job)
// 构建一个更新Event // 构建一个更新Event
jobEvent = common.BuildJobEvent(common.JOB_EVENT_SAVE, job) jobEvent = common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
case mvccpb.DELETE: // 任务被删除了 case mvccpb.DELETE: // 任务被删除了
//推送一个删除事件给scheduler //推送一个删除事件给scheduler
// Delete /cron/jobs/job10 // Delete /cron/jobs/job10
jobName = common.ExtractJobName(string(watchEvent.Kv.Key)) jobName = common.ExtractJobName(string(watchEvent.Kv.Key))
jobName = common.TrimIp(localIp,jobName)
//fmt.Println("删除任务了")
//fmt.Println(jobName)
job = &common.Job{Name: jobName} job = &common.Job{Name: jobName}
...@@ -103,6 +106,48 @@ func (jobMgr *JobMgr) watchJobs() (err error) { ...@@ -103,6 +106,48 @@ func (jobMgr *JobMgr) watchJobs() (err error) {
return return
} }
//监听一次性任务
func(jobMgr *JobMgr) watchOnceJobs(){
var (
watchChan clientv3.WatchChan
watchResp clientv3.WatchResponse
watchEvent *clientv3.Event
jobEvent *common.JobEvent
job *common.Job
localIp string
err error
)
if localIp, err = getLocalIP(); err != nil {
return
}
// 监听/cron/oncejobs/目录
go func() { // 监听协程
// 监听/cron/oncejobs/目录的变化
watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_ONCE_SAVE_DIR+localIp, clientv3.WithPrefix())
// 处理监听事件
for watchResp = range watchChan {
for _, watchEvent = range watchResp.Events {
switch watchEvent.Type {
case mvccpb.PUT: // 新增或者修改任务
//反序列化job 推送一个更新事件给scheduler
if job, err = common.UnpackJob(watchEvent.Kv.Value); err != nil {
continue
}
//fmt.Println("监听一次性任务")
//fmt.Println(job)
// 构建一个更新Event
jobEvent = common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
// 事件推给scheduler
G_scheduler.PushOnceJobEvent(jobEvent)
case mvccpb.DELETE: // killer标记过期, 被自动删除
}
}
}
}()
}
// 监听强杀任务通知 // 监听强杀任务通知
func (jobMgr *JobMgr) watchKiller() { func (jobMgr *JobMgr) watchKiller() {
var ( var (
...@@ -176,9 +221,13 @@ func InitJobMgr() (err error) { ...@@ -176,9 +221,13 @@ func InitJobMgr() (err error) {
watcher: watcher, watcher: watcher,
} }
// 启动任务监听 // 启动任务监听 定时任务
G_jobMgr.watchJobs() G_jobMgr.watchJobs()
//启动一次性任务监听
G_jobMgr.watchOnceJobs()
// 启动监听killer // 启动监听killer
//G_jobMgr.watchKiller() //G_jobMgr.watchKiller()
......
...@@ -82,7 +82,7 @@ func InitLogSink() (err error) { ...@@ -82,7 +82,7 @@ func InitLogSink() (err error) {
) )
// 建立mongodb连接 // 建立mongodb连接
clientOptions := options.Client().ApplyURI("mongodb://ichunt:huntmon6699@192.168.1.237:27017/ichunt?authMechanism=SCRAM-SHA-1") clientOptions := options.Client().ApplyURI(G_config.MongodbUri)
if client, err = mongo.Connect( if client, err = mongo.Connect(
context.TODO(),clientOptions); err != nil { context.TODO(),clientOptions); err != nil {
return return
......
...@@ -8,7 +8,8 @@ import ( ...@@ -8,7 +8,8 @@ import (
// 任务调度 // 任务调度
type Scheduler struct { type Scheduler struct {
jobEventChan chan *common.JobEvent // etcd任务事件队列 把任务放入channel管道 jobEventChan chan *common.JobEvent // etcd任务事件队列 把定时任务任务放入channel管道
oncejobEventChan chan *common.JobEvent // etcd任务事件队列 把一次性任务放入channel管道
jobPlanTable map[string]*common.JobSchedulePlan // 任务调度计划表 jobPlanTable map[string]*common.JobSchedulePlan // 任务调度计划表
jobExecutingTable map[string]*common.JobExecuteInfo // 任务执行表 jobExecutingTable map[string]*common.JobExecuteInfo // 任务执行表
jobResultChan chan *common.JobExecuteResult // 任务结果队列 jobResultChan chan *common.JobExecuteResult // 任务结果队列
...@@ -18,7 +19,7 @@ var ( ...@@ -18,7 +19,7 @@ var (
G_scheduler *Scheduler G_scheduler *Scheduler
) )
// 处理任务事件 // 处理任务事件 定时任务
func (scheduler *Scheduler) handleJobEvent(jobEvent *common.JobEvent) { func (scheduler *Scheduler) handleJobEvent(jobEvent *common.JobEvent) {
var ( var (
jobSchedulePlan *common.JobSchedulePlan jobSchedulePlan *common.JobSchedulePlan
...@@ -45,6 +46,30 @@ func (scheduler *Scheduler) handleJobEvent(jobEvent *common.JobEvent) { ...@@ -45,6 +46,30 @@ func (scheduler *Scheduler) handleJobEvent(jobEvent *common.JobEvent) {
} }
} }
// 处理任务事件 一次性任务
func (scheduler *Scheduler) handleOnceJobEvent(jobEvent *common.JobEvent) {
var (
jobSchedulePlan *common.JobSchedulePlan
jobExecuteInfo *common.JobExecuteInfo
jobExecuting bool
err error
)
switch jobEvent.EventType {
case common.JOB_EVENT_SAVE: // 保存任务事件
if jobSchedulePlan, err = common.BuildJobSchedulePlan(jobEvent.Job); err != nil {
return
}
scheduler.jobPlanTable[jobEvent.Job.Name] = jobSchedulePlan
case common.JOB_EVENT_KILL: // 强杀任务事件
// 取消掉Command执行, 判断任务是否在执行中
if jobExecuteInfo, jobExecuting = scheduler.jobExecutingTable[jobEvent.Job.Name]; jobExecuting {
jobExecuteInfo.CancelFunc() // 触发command杀死shell子进程, 任务得到退出
}
}
}
// 尝试执行任务 // 尝试执行任务
func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) { func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) {
// 调度 和 执行 是2件事情 // 调度 和 执行 是2件事情
...@@ -72,6 +97,33 @@ func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) { ...@@ -72,6 +97,33 @@ func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) {
G_executor.ExecuteJob(jobExecuteInfo) G_executor.ExecuteJob(jobExecuteInfo)
} }
// 尝试执行任务 一次性任务
func (scheduler *Scheduler) TryStartOnceJob(job *common.Job) {
// 调度 和 执行 是2件事情
var (
jobExecuteInfo *common.JobExecuteInfo
jobExecuting bool
)
// 执行的任务可能运行很久, 1分钟会调度60次,但是只能执行1次, 防止并发!
// 如果任务正在执行,跳过本次调度
if jobExecuteInfo, jobExecuting = scheduler.jobExecutingTable[job.Name]; jobExecuting {
// fmt.Println("尚未退出,跳过执行:", jobPlan.Job.Name)
return
}
// 构建执行状态信息
jobExecuteInfo = common.BuildOnceJobExecuteInfo(job)
// 保存执行状态
scheduler.jobExecutingTable[job.Name] = jobExecuteInfo
// 执行任务
fmt.Println("执行一次性任务:", jobExecuteInfo.Job.Name," 计划执行时间:", jobExecuteInfo.PlanTime.Format("2006-01-02 15:04:05")," 实际执行时间:", jobExecuteInfo.RealTime.Format("2006-01-02 15:04:05"))
G_executor.ExecuteJob(jobExecuteInfo)
}
// 重新计算任务调度状态 // 重新计算任务调度状态
//计算出最近要过期的任务 下次要执行的任务 还有多久 //计算出最近要过期的任务 下次要执行的任务 还有多久
//5秒后一个任务要执行 //5秒后一个任务要执行
...@@ -143,7 +195,7 @@ func (scheduler *Scheduler) handleJobResult(result *common.JobExecuteResult) { ...@@ -143,7 +195,7 @@ func (scheduler *Scheduler) handleJobResult(result *common.JobExecuteResult) {
// fmt.Println("任务执行完成:", result.ExecuteInfo.Job.Name, string(result.Output), result.Err) // fmt.Println("任务执行完成:", result.ExecuteInfo.Job.Name, string(result.Output), result.Err)
} }
// 调度协程 // 调度协程定时任务
func (scheduler *Scheduler) scheduleLoop() { func (scheduler *Scheduler) scheduleLoop() {
var ( var (
jobEvent *common.JobEvent jobEvent *common.JobEvent
...@@ -178,22 +230,45 @@ func (scheduler *Scheduler) scheduleLoop() { ...@@ -178,22 +230,45 @@ func (scheduler *Scheduler) scheduleLoop() {
} }
} }
//调度一次性执行任务
func (scheduler *Scheduler) onceScheduleLoop(){
var (
jobEvent *common.JobEvent
)
for {
select {
case jobEvent = <- scheduler.oncejobEventChan: //监听任务变化事件 新增job或者修改job操作会插入到该管道 (一次性执行任务)
scheduler.TryStartOnceJob(jobEvent.Job)
}
}
}
// 推送任务变化事件 // 推送任务变化事件
//投递一个jobEvent 到jobEventChan 管道里面 //投递一个jobEvent 到jobEventChan 管道里面
func (scheduler *Scheduler) PushJobEvent(jobEvent *common.JobEvent) { func (scheduler *Scheduler) PushJobEvent(jobEvent *common.JobEvent) {
scheduler.jobEventChan <- jobEvent scheduler.jobEventChan <- jobEvent
} }
// 推送任务变化事件 一次性任务
//投递一个jobEvent 到jobEventChan 管道里面
func (scheduler *Scheduler) PushOnceJobEvent(jobEvent *common.JobEvent) {
scheduler.oncejobEventChan <- jobEvent
}
// 初始化调度器 // 初始化调度器
func InitScheduler() (err error) { func InitScheduler() (err error) {
G_scheduler = &Scheduler{ G_scheduler = &Scheduler{
jobEventChan: make(chan *common.JobEvent, 1000),//变化事件 jobEventChan: make(chan *common.JobEvent, 1000),//变化事件 定时任务
oncejobEventChan: make(chan *common.JobEvent, 1000),//变化事件 一次性执行任务
jobPlanTable: make(map[string]*common.JobSchedulePlan),//存放着所有要执行的计划任务 jobPlanTable: make(map[string]*common.JobSchedulePlan),//存放着所有要执行的计划任务
jobExecutingTable: make(map[string]*common.JobExecuteInfo),//任务执行状态 jobExecutingTable: make(map[string]*common.JobExecuteInfo),//任务执行状态
jobResultChan: make(chan *common.JobExecuteResult, 1000),// 任务执行结果 jobResultChan: make(chan *common.JobExecuteResult, 1000),// 任务执行结果
} }
// 启动调度协程 // 启动调度协程定时任务
go G_scheduler.scheduleLoop() go G_scheduler.scheduleLoop()
//启动一次性任务调度协程
go G_scheduler.onceScheduleLoop()
return return
} }
......
...@@ -52,12 +52,12 @@ func main() { ...@@ -52,12 +52,12 @@ func main() {
goto ERR goto ERR
} }
// //
// 启动执行器 //启动执行器
//if err = worker.InitExecutor(); err != nil { if err = worker.InitExecutor(); err != nil {
// goto ERR goto ERR
//} }
// //
//// 启动调度器 //// 启动调度器定时任务
if err = worker.InitScheduler(); err != nil { if err = worker.InitScheduler(); err != nil {
goto ERR goto ERR
} }
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
"etcdDialTimeout": 5000, "etcdDialTimeout": 5000,
"mongodb地址": "采用mongodb URI", "mongodb地址": "采用mongodb URI",
"mongodbUri": "mongodb://36.111.184.221:27017", "mongodbUri": "mongodb://ichunt:huntmon6699@192.168.1.237:27017/ichunt?authMechanism=SCRAM-SHA-1",
"mongodb连接超时时间": "单位毫秒", "mongodb连接超时时间": "单位毫秒",
"mongodbConnectTimeout": 5000, "mongodbConnectTimeout": 5000,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment