Commit 5d1a96f5 by 朱继来

Merge branch 'master' of http://119.23.72.7/sunlong_v5/go-crontab

parents 9f3d744b 70a1b3d5
package common
const (
// 任务保存目录
// 定时任务任务保存目录
JOB_SAVE_DIR = "/cron/jobs/"
// 定时任务任务保存目录
JOB_ONCE_SAVE_DIR = "/cron/oncejobs/"
// 任务强杀目录
JOB_KILLER_DIR = "/cron/killer/"
......
package common
import (
"context"
"encoding/json"
"strings"
"github.com/gorhill/cronexpr"
"strings"
"time"
"context"
)
// 定时任务
......@@ -116,6 +116,20 @@ func ExtractJobName(jobKey string) (string) {
return strings.TrimPrefix(jobKey, JOB_SAVE_DIR)
}
// 从etcd的key中提取任务名
func ExtractOnceJobName(jobKey string) (string) {
return strings.TrimPrefix(jobKey, JOB_ONCE_SAVE_DIR)
}
// /cron/jobs/job10抹掉/cron/jobs/
func TrimIp(localIp string ,jobKey string) (string) {
return strings.TrimPrefix(jobKey, localIp+"/")
}
// 从 /cron/killer/job10提取job10
func ExtractKillerName(killerKey string) (string) {
return strings.TrimPrefix(killerKey, JOB_KILLER_DIR)
......@@ -160,6 +174,17 @@ func BuildJobExecuteInfo(jobSchedulePlan *JobSchedulePlan) (jobExecuteInfo *JobE
return
}
//构造一次性任务执行状态
func BuildOnceJobExecuteInfo(job *Job) (jobExecuteInfo *JobExecuteInfo){
jobExecuteInfo = &JobExecuteInfo{
Job: job,
PlanTime: time.Now(), // 计算调度时间
RealTime: time.Now(), // 真实调度时间
}
jobExecuteInfo.CancelCtx, jobExecuteInfo.CancelFunc = context.WithCancel(context.TODO())
return
}
// 提取worker的IP
func ExtractWorkerIP(regKey string) (string) {
return strings.TrimPrefix(regKey, JOB_WORKER_DIR)
......
......@@ -34,24 +34,31 @@ func main(){
//删除key
//kv.Delete(context.TODO(),"/cron/jobs",clientv3.WithPrefix())
//kv.Delete(context.TODO(),"/cron/jobs/192.168.2.246/job3",clientv3.WithPrefix())
//
//return
//新增
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/job1","{\"name\":\"定时任务1\",\"command\":\"D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/a.php\",\"cronExpr\":\"/5 * * * * * *\"}",clientv3.WithPrevKV())
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/job2","{\"name\":\"定时任务2\",\" echo hello world \",\"cronExpr\":\"/7 * * * * * *\"}",clientv3.WithPrevKV())
//if err != nil{
// fmt.Println(err)
//}else{
// fmt.Println("Revision:",putResp.Header.Revision)
// if putResp.PrevKv != nil{
// fmt.Println("key:",string(putResp.PrevKv.Key))
// fmt.Println("Value:",string(putResp.PrevKv.Value))
// fmt.Println("Version:",string(putResp.PrevKv.Version))
// }
//}
//新增定时任务
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job1","{\"name\":\"job1\",\"command\":\"D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/a.php\",\"cronExpr\":\"*/7 * * * * * *\"}",clientv3.WithPrevKV())
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job2","{\"name\":\"job2\",\"command\":\" echo hello world\",\"cronExpr\":\"*/5 * * * * * *\"}",clientv3.WithPrevKV())
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/192.168.2.246/job3","{\"name\":\"job3\",\"command\":\" echo hello boy\",\"cronExpr\":\"*/10 * * * * * *\"}",clientv3.WithPrevKV())
//新增一次性任务
putResp, err := kv.Put(context.TODO(),"/cron/oncejobs/192.168.2.246/job10","{\"name\":\"job10\",\"command\":\" D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/b.php \"}",clientv3.WithPrevKV())
if err != nil{
fmt.Println(err)
}else{
fmt.Println("Revision:",putResp.Header.Revision)
if putResp.PrevKv != nil{
fmt.Println("key:",string(putResp.PrevKv.Key))
fmt.Println("Value:",string(putResp.PrevKv.Value))
fmt.Println("Version:",string(putResp.PrevKv.Version))
}
}
//查询
getResp,err = kv.Get(context.TODO(),"/cron/jobs",clientv3.WithPrefix())
......@@ -62,5 +69,14 @@ func main(){
for _, kvpair := range getResp.Kvs {
fmt.Println(kvpair)
}
getResp,err = kv.Get(context.TODO(),"/cron/oncejobs",clientv3.WithPrefix())
if err != nil {
fmt.Println(err)
return
}
for _, kvpair := range getResp.Kvs {
fmt.Println(kvpair)
}
}
......@@ -27,6 +27,7 @@ func (executor *Executor) ExecuteJob(info *common.JobExecuteInfo) {
jobLock *JobLock
)
// 任务结果
result = &common.JobExecuteResult{
ExecuteInfo: info,
......@@ -54,7 +55,8 @@ func (executor *Executor) ExecuteJob(info *common.JobExecuteInfo) {
result.StartTime = time.Now()
// 执行shell命令
cmd = exec.CommandContext(info.CancelCtx, "/bin/bash", "-c", info.Job.Command)
//cmd = exec.CommandContext(info.CancelCtx, "/bin/bash", "-c", info.Job.Command)
cmd = exec.CommandContext(info.CancelCtx, "C:\\cygwin64\\bin\\bash.exe", "-c", info.Job.Command)
// 执行并捕获输出
output, err = cmd.CombinedOutput()
......
package worker
import (
"github.com/coreos/etcd/clientv3"
"context"
"github.com/coreos/etcd/clientv3"
"go-crontab/common"
)
......@@ -39,8 +39,13 @@ func (jobLock *JobLock) TryLock() (err error) {
txn clientv3.Txn
lockKey string
txnResp *clientv3.TxnResponse
localIp string
)
if localIp, err = getLocalIP(); err != nil {
localIp = ""
}
// 1, 创建租约(5秒)
if leaseGrantResp, err = jobLock.lease.Grant(context.TODO(), 5); err != nil {
return
......@@ -77,7 +82,7 @@ func (jobLock *JobLock) TryLock() (err error) {
txn = jobLock.kv.Txn(context.TODO())
// 锁路径
lockKey = common.JOB_LOCK_DIR + jobLock.jobName
lockKey = common.JOB_LOCK_DIR +localIp + "/" + jobLock.jobName
// 5, 事务抢锁
txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
......
......@@ -33,16 +33,29 @@ func (jobMgr *JobMgr) watchJobs() (err error) {
watchEvent *clientv3.Event
jobName string
jobEvent *common.JobEvent
localIp string
)
// 1, get一下/cron/jobs/目录下的所有任务,并且获知当前集群的revision
if getResp, err = jobMgr.kv.Get(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithPrefix()); err != nil {
if localIp, err = getLocalIP(); err != nil {
return
}
//fmt.Println(localIp)
// 1, get一下/cron/jobs/IP/目录下的所有任务,并且获知当前集群的revision
if getResp, err = jobMgr.kv.Get(context.TODO(), common.JOB_SAVE_DIR+localIp, clientv3.WithPrefix()); err != nil {
return
}
//fmt.Println(getResp)
// 当前有哪些任务
//key:"/cron/jobs/192.168.2.246/job1"
//create_revision:61
//mod_revision:61
//version:1
//value:"{"name":"","command":" ","cronExpr":"*/5 * * * * * *"}"
for _, kvpair = range getResp.Kvs {
// 反序列化json得到Job
if job, err = common.UnpackJob(kvpair.Value); err == nil {
......@@ -57,7 +70,7 @@ func (jobMgr *JobMgr) watchJobs() (err error) {
// 从GET时刻的后续版本开始监听变化
watchStartRevision = getResp.Header.Revision + 1
// 监听/cron/jobs/目录的后续变化
watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithRev(watchStartRevision), clientv3.WithPrefix())
watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_SAVE_DIR+localIp, clientv3.WithRev(watchStartRevision), clientv3.WithPrefix())
// 处理监听事件
for watchResp = range watchChan {
for _, watchEvent = range watchResp.Events {
......@@ -67,12 +80,17 @@ func (jobMgr *JobMgr) watchJobs() (err error) {
if job, err = common.UnpackJob(watchEvent.Kv.Value); err != nil {
continue
}
//fmt.Printf("修改了任务%s",job)
// 构建一个更新Event
jobEvent = common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
case mvccpb.DELETE: // 任务被删除了
//推送一个删除事件给scheduler
// Delete /cron/jobs/job10
jobName = common.ExtractJobName(string(watchEvent.Kv.Key))
jobName = common.TrimIp(localIp,jobName)
//fmt.Println("删除任务了")
//fmt.Println(jobName)
job = &common.Job{Name: jobName}
......@@ -88,6 +106,48 @@ func (jobMgr *JobMgr) watchJobs() (err error) {
return
}
//监听一次性任务
func(jobMgr *JobMgr) watchOnceJobs(){
var (
watchChan clientv3.WatchChan
watchResp clientv3.WatchResponse
watchEvent *clientv3.Event
jobEvent *common.JobEvent
job *common.Job
localIp string
err error
)
if localIp, err = getLocalIP(); err != nil {
return
}
// 监听/cron/oncejobs/目录
go func() { // 监听协程
// 监听/cron/oncejobs/目录的变化
watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_ONCE_SAVE_DIR+localIp, clientv3.WithPrefix())
// 处理监听事件
for watchResp = range watchChan {
for _, watchEvent = range watchResp.Events {
switch watchEvent.Type {
case mvccpb.PUT: // 新增或者修改任务
//反序列化job 推送一个更新事件给scheduler
if job, err = common.UnpackJob(watchEvent.Kv.Value); err != nil {
continue
}
//fmt.Println("监听一次性任务")
//fmt.Println(job)
// 构建一个更新Event
jobEvent = common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
// 事件推给scheduler
G_scheduler.PushOnceJobEvent(jobEvent)
case mvccpb.DELETE: // killer标记过期, 被自动删除
}
}
}
}()
}
// 监听强杀任务通知
func (jobMgr *JobMgr) watchKiller() {
var (
......@@ -97,11 +157,19 @@ func (jobMgr *JobMgr) watchKiller() {
jobEvent *common.JobEvent
jobName string
job *common.Job
localIp string
err error
)
if localIp, err = getLocalIP(); err != nil {
return
}
// 监听/cron/killer目录
go func() { // 监听协程
// 监听/cron/killer/目录的变化
watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_KILLER_DIR, clientv3.WithPrefix())
watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_KILLER_DIR+localIp, clientv3.WithPrefix())
// 处理监听事件
for watchResp = range watchChan {
for _, watchEvent = range watchResp.Events {
......@@ -153,11 +221,15 @@ func InitJobMgr() (err error) {
watcher: watcher,
}
// 启动任务监听
// 启动任务监听 定时任务
G_jobMgr.watchJobs()
//启动一次性任务监听
G_jobMgr.watchOnceJobs()
// 启动监听killer
G_jobMgr.watchKiller()
//G_jobMgr.watchKiller()
return
}
......
......@@ -82,7 +82,7 @@ func InitLogSink() (err error) {
)
// 建立mongodb连接
clientOptions := options.Client().ApplyURI("mongodb://ichunt:huntmon6699@192.168.1.237:27017/ichunt?authMechanism=SCRAM-SHA-1")
clientOptions := options.Client().ApplyURI(G_config.MongodbUri)
if client, err = mongo.Connect(
context.TODO(),clientOptions); err != nil {
return
......
......@@ -8,7 +8,8 @@ import (
// 任务调度
type Scheduler struct {
jobEventChan chan *common.JobEvent // etcd任务事件队列 把任务放入channel管道
jobEventChan chan *common.JobEvent // etcd任务事件队列 把定时任务任务放入channel管道
oncejobEventChan chan *common.JobEvent // etcd任务事件队列 把一次性任务放入channel管道
jobPlanTable map[string]*common.JobSchedulePlan // 任务调度计划表
jobExecutingTable map[string]*common.JobExecuteInfo // 任务执行表
jobResultChan chan *common.JobExecuteResult // 任务结果队列
......@@ -18,7 +19,7 @@ var (
G_scheduler *Scheduler
)
// 处理任务事件
// 处理任务事件 定时任务
func (scheduler *Scheduler) handleJobEvent(jobEvent *common.JobEvent) {
var (
jobSchedulePlan *common.JobSchedulePlan
......@@ -45,6 +46,30 @@ func (scheduler *Scheduler) handleJobEvent(jobEvent *common.JobEvent) {
}
}
// 处理任务事件 一次性任务
func (scheduler *Scheduler) handleOnceJobEvent(jobEvent *common.JobEvent) {
var (
jobSchedulePlan *common.JobSchedulePlan
jobExecuteInfo *common.JobExecuteInfo
jobExecuting bool
err error
)
switch jobEvent.EventType {
case common.JOB_EVENT_SAVE: // 保存任务事件
if jobSchedulePlan, err = common.BuildJobSchedulePlan(jobEvent.Job); err != nil {
return
}
scheduler.jobPlanTable[jobEvent.Job.Name] = jobSchedulePlan
case common.JOB_EVENT_KILL: // 强杀任务事件
// 取消掉Command执行, 判断任务是否在执行中
if jobExecuteInfo, jobExecuting = scheduler.jobExecutingTable[jobEvent.Job.Name]; jobExecuting {
jobExecuteInfo.CancelFunc() // 触发command杀死shell子进程, 任务得到退出
}
}
}
// 尝试执行任务
func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) {
// 调度 和 执行 是2件事情
......@@ -68,7 +93,34 @@ func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) {
scheduler.jobExecutingTable[jobPlan.Job.Name] = jobExecuteInfo
// 执行任务
fmt.Println("执行任务:", jobExecuteInfo.Job.Name, jobExecuteInfo.PlanTime, jobExecuteInfo.RealTime)
fmt.Println("执行任务:", jobExecuteInfo.Job.Name," 计划执行时间:", jobExecuteInfo.PlanTime.Format("2006-01-02 15:04:05")," 实际执行时间:", jobExecuteInfo.RealTime.Format("2006-01-02 15:04:05"))
G_executor.ExecuteJob(jobExecuteInfo)
}
// 尝试执行任务 一次性任务
func (scheduler *Scheduler) TryStartOnceJob(job *common.Job) {
// 调度 和 执行 是2件事情
var (
jobExecuteInfo *common.JobExecuteInfo
jobExecuting bool
)
// 执行的任务可能运行很久, 1分钟会调度60次,但是只能执行1次, 防止并发!
// 如果任务正在执行,跳过本次调度
if jobExecuteInfo, jobExecuting = scheduler.jobExecutingTable[job.Name]; jobExecuting {
// fmt.Println("尚未退出,跳过执行:", jobPlan.Job.Name)
return
}
// 构建执行状态信息
jobExecuteInfo = common.BuildOnceJobExecuteInfo(job)
// 保存执行状态
scheduler.jobExecutingTable[job.Name] = jobExecuteInfo
// 执行任务
fmt.Println("执行一次性任务:", jobExecuteInfo.Job.Name," 计划执行时间:", jobExecuteInfo.PlanTime.Format("2006-01-02 15:04:05")," 实际执行时间:", jobExecuteInfo.RealTime.Format("2006-01-02 15:04:05"))
G_executor.ExecuteJob(jobExecuteInfo)
}
......@@ -127,10 +179,10 @@ func (scheduler *Scheduler) handleJobResult(result *common.JobExecuteResult) {
JobName: result.ExecuteInfo.Job.Name,
Command: result.ExecuteInfo.Job.Command,
Output: string(result.Output),
PlanTime: result.ExecuteInfo.PlanTime.UnixNano() / 1000 / 1000,
ScheduleTime: result.ExecuteInfo.RealTime.UnixNano() / 1000 / 1000,
StartTime: result.StartTime.UnixNano() / 1000 / 1000,
EndTime: result.EndTime.UnixNano() / 1000 / 1000,
PlanTime: result.ExecuteInfo.PlanTime.Unix() ,
ScheduleTime: result.ExecuteInfo.RealTime.Unix() ,
StartTime: result.StartTime.Unix() ,
EndTime: result.EndTime.Unix() ,
}
if result.Err != nil {
jobLog.Err = result.Err.Error()
......@@ -143,7 +195,7 @@ func (scheduler *Scheduler) handleJobResult(result *common.JobExecuteResult) {
// fmt.Println("任务执行完成:", result.ExecuteInfo.Job.Name, string(result.Output), result.Err)
}
// 调度协程
// 调度协程定时任务
func (scheduler *Scheduler) scheduleLoop() {
var (
jobEvent *common.JobEvent
......@@ -159,6 +211,8 @@ func (scheduler *Scheduler) scheduleLoop() {
scheduleTimer = time.NewTimer(scheduleAfter)
// 定时任务common.Job
// 变化事件
for {
select {
case jobEvent = <- scheduler.jobEventChan: //监听任务变化事件 新增job或者修改job操作会插入到该管道
......@@ -176,22 +230,45 @@ func (scheduler *Scheduler) scheduleLoop() {
}
}
//调度一次性执行任务
func (scheduler *Scheduler) onceScheduleLoop(){
var (
jobEvent *common.JobEvent
)
for {
select {
case jobEvent = <- scheduler.oncejobEventChan: //监听任务变化事件 新增job或者修改job操作会插入到该管道 (一次性执行任务)
scheduler.TryStartOnceJob(jobEvent.Job)
}
}
}
// 推送任务变化事件
//投递一个jobEvent 到jobEventChan 管道里面
func (scheduler *Scheduler) PushJobEvent(jobEvent *common.JobEvent) {
scheduler.jobEventChan <- jobEvent
}
// 推送任务变化事件 一次性任务
//投递一个jobEvent 到jobEventChan 管道里面
func (scheduler *Scheduler) PushOnceJobEvent(jobEvent *common.JobEvent) {
scheduler.oncejobEventChan <- jobEvent
}
// 初始化调度器
func InitScheduler() (err error) {
G_scheduler = &Scheduler{
jobEventChan: make(chan *common.JobEvent, 1000),//变化事件
jobEventChan: make(chan *common.JobEvent, 1000),//变化事件 定时任务
oncejobEventChan: make(chan *common.JobEvent, 1000),//变化事件 一次性执行任务
jobPlanTable: make(map[string]*common.JobSchedulePlan),//存放着所有要执行的计划任务
jobExecutingTable: make(map[string]*common.JobExecuteInfo),//任务执行状态
jobResultChan: make(chan *common.JobExecuteResult, 1000),// 任务执行结果
}
// 启动调度协程
// 启动调度协程定时任务
go G_scheduler.scheduleLoop()
//启动一次性任务调度协程
go G_scheduler.onceScheduleLoop()
return
}
......
......@@ -52,17 +52,17 @@ func main() {
goto ERR
}
//
// 启动执行器
//启动执行器
if err = worker.InitExecutor(); err != nil {
goto ERR
}
//
//// 启动调度器
//// 启动调度器定时任务
if err = worker.InitScheduler(); err != nil {
goto ERR
}
//
//// 初始化任务管理器
//// 初始化任务管理器 监听job
if err = worker.InitJobMgr(); err != nil {
goto ERR
}
......
......@@ -6,7 +6,7 @@
"etcdDialTimeout": 5000,
"mongodb地址": "采用mongodb URI",
"mongodbUri": "mongodb://36.111.184.221:27017",
"mongodbUri": "mongodb://ichunt:huntmon6699@192.168.1.237:27017/ichunt?authMechanism=SCRAM-SHA-1",
"mongodb连接超时时间": "单位毫秒",
"mongodbConnectTimeout": 5000,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment