Commit ee06b3f0 by 孙龙

init

parents
package common
const (
// 任务保存目录
JOB_SAVE_DIR = "/cron/jobs/"
// 任务强杀目录
JOB_KILLER_DIR = "/cron/killer/"
// 任务锁目录
JOB_LOCK_DIR = "/cron/lock/"
// 服务注册目录
JOB_WORKER_DIR = "/cron/workers/"
// 保存任务事件
JOB_EVENT_SAVE = 1
// 删除任务事件
JOB_EVENT_DELETE = 2
// 强杀任务事件
JOB_EVENT_KILL = 3
)
\ No newline at end of file
package common
import "errors"
var (
ERR_LOCK_ALREADY_REQUIRED = errors.New("锁已被占用")
ERR_NO_LOCAL_IP_FOUND = errors.New("没有找到网卡IP")
)
\ No newline at end of file
package common
import (
"encoding/json"
"strings"
"github.com/gorhill/cronexpr"
"time"
"context"
)
// 定时任务
type Job struct {
Name string `json:"name"` // 任务名
Command string `json:"command"` // shell命令
CronExpr string `json:"cronExpr"` // cron表达式
}
// 任务调度计划
type JobSchedulePlan struct {
Job *Job // 要调度的任务信息
Expr *cronexpr.Expression // 解析好的cronexpr表达式
NextTime time.Time // 下次调度时间
}
// 任务执行状态
type JobExecuteInfo struct {
Job *Job // 任务信息
PlanTime time.Time // 理论上的调度时间
RealTime time.Time // 实际的调度时间
CancelCtx context.Context // 任务command的context
CancelFunc context.CancelFunc// 用于取消command执行的cancel函数
}
// HTTP接口应答
type Response struct {
Errno int `json:"errno"`
Msg string `json:"msg"`
Data interface{} `json:"data"`
}
// 变化事件
type JobEvent struct {
EventType int // SAVE, DELETE
Job *Job
}
// 任务执行结果
type JobExecuteResult struct {
ExecuteInfo *JobExecuteInfo // 执行状态
Output []byte // 脚本输出
Err error // 脚本错误原因
StartTime time.Time // 启动时间
EndTime time.Time // 结束时间
}
// 任务执行日志
type JobLog struct {
JobName string `json:"jobName" bson:"jobName"` // 任务名字
Command string `json:"command" bson:"command"` // 脚本命令
Err string `json:"err" bson:"err"` // 错误原因
Output string `json:"output" bson:"output"` // 脚本输出
PlanTime int64 `json:"planTime" bson:"planTime"` // 计划开始时间
ScheduleTime int64 `json:"scheduleTime" bson:"scheduleTime"` // 实际调度时间
StartTime int64 `json:"startTime" bson:"startTime"` // 任务执行开始时间
EndTime int64 `json:"endTime" bson:"endTime"` // 任务执行结束时间
}
// 日志批次
type LogBatch struct {
Logs []interface{} // 多条日志
}
// 任务日志过滤条件
type JobLogFilter struct {
JobName string `bson:"jobName"`
}
// 任务日志排序规则
type SortLogByStartTime struct {
SortOrder int `bson:"startTime"` // {startTime: -1}
}
// 应答方法
func BuildResponse(errno int, msg string, data interface{}) (resp []byte, err error) {
// 1, 定义一个response
var (
response Response
)
response.Errno = errno
response.Msg = msg
response.Data = data
// 2, 序列化json
resp, err = json.Marshal(response)
return
}
// 反序列化Job
func UnpackJob(value []byte) (ret *Job, err error) {
var (
job *Job
)
job = &Job{}
if err = json.Unmarshal(value, job); err != nil {
return
}
ret = job
return
}
// 从etcd的key中提取任务名
// /cron/jobs/job10抹掉/cron/jobs/
func ExtractJobName(jobKey string) (string) {
return strings.TrimPrefix(jobKey, JOB_SAVE_DIR)
}
// 从 /cron/killer/job10提取job10
func ExtractKillerName(killerKey string) (string) {
return strings.TrimPrefix(killerKey, JOB_KILLER_DIR)
}
// 任务变化事件有2种:1)更新任务 2)删除任务
func BuildJobEvent(eventType int, job *Job) (jobEvent *JobEvent) {
return &JobEvent{
EventType: eventType,
Job: job,
}
}
// 构造任务执行计划
func BuildJobSchedulePlan(job *Job) (jobSchedulePlan *JobSchedulePlan, err error) {
var (
expr *cronexpr.Expression
)
// 解析JOB的cron表达式
if expr, err = cronexpr.Parse(job.CronExpr); err != nil {
return
}
// 生成任务调度计划对象
jobSchedulePlan = &JobSchedulePlan{
Job: job,
Expr: expr,
NextTime: expr.Next(time.Now()),
}
return
}
// 构造执行状态信息
func BuildJobExecuteInfo(jobSchedulePlan *JobSchedulePlan) (jobExecuteInfo *JobExecuteInfo){
jobExecuteInfo = &JobExecuteInfo{
Job: jobSchedulePlan.Job,
PlanTime: jobSchedulePlan.NextTime, // 计算调度时间
RealTime: time.Now(), // 真实调度时间
}
jobExecuteInfo.CancelCtx, jobExecuteInfo.CancelFunc = context.WithCancel(context.TODO())
return
}
// 提取worker的IP
func ExtractWorkerIP(regKey string) (string) {
return strings.TrimPrefix(regKey, JOB_WORKER_DIR)
}
\ No newline at end of file
module go-crontab
go 1.12
require (
github.com/DataDog/zstd v1.4.4 // indirect
github.com/coreos/etcd v3.3.18+incompatible
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
github.com/mongodb/mongo-go-driver v1.2.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v1.0.0 // indirect
go.mongodb.org/mongo-driver v1.2.0
go.uber.org/zap v1.13.0 // indirect
google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb // indirect
google.golang.org/grpc v1.26.0 // indirect
)
This diff is collapsed. Click to expand it.
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)
func main(){
var (
config clientv3.Config
err error
client *clientv3.Client
kv clientv3.KV
getResp *clientv3.GetResponse
)
//配置
config = clientv3.Config{
Endpoints:[]string{"192.168.2.232:2379"},
DialTimeout:time.Second*5,
}
//连接 床见一个客户端
if client,err = clientv3.New(config);err != nil{
fmt.Println(err)
return
}
//用于读写etcd的键值对
kv = clientv3.NewKV(client)
//删除key
//kv.Delete(context.TODO(),"/cron/jobs",clientv3.WithPrefix())
//
//return
//新增
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/job1","{\"name\":\"定时任务1\",\"command\":\"D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php E:/WWW/a.php\",\"cronExpr\":\"/5 * * * * * *\"}",clientv3.WithPrevKV())
//putResp, err := kv.Put(context.TODO(),"/cron/jobs/job2","{\"name\":\"定时任务2\",\" echo hello world \",\"cronExpr\":\"/7 * * * * * *\"}",clientv3.WithPrevKV())
//if err != nil{
// fmt.Println(err)
//}else{
// fmt.Println("Revision:",putResp.Header.Revision)
// if putResp.PrevKv != nil{
// fmt.Println("key:",string(putResp.PrevKv.Key))
// fmt.Println("Value:",string(putResp.PrevKv.Value))
// fmt.Println("Version:",string(putResp.PrevKv.Version))
// }
//}
//查询
getResp,err = kv.Get(context.TODO(),"/cron/jobs",clientv3.WithPrefix())
if err != nil {
fmt.Println(err)
return
}
for _, kvpair := range getResp.Kvs {
fmt.Println(kvpair)
}
}
package master
import (
"net/http"
"net"
"time"
"strconv"
"github.com/owenliang/crontab/common"
"encoding/json"
)
// 任务的HTTP接口
type ApiServer struct {
httpServer *http.Server
}
var (
// 单例对象
G_apiServer *ApiServer
)
// 保存任务接口
// POST job={"name": "job1", "command": "echo hello", "cronExpr": "* * * * *"}
func handleJobSave(resp http.ResponseWriter, req *http.Request) {
var (
err error
postJob string
job common.Job
oldJob *common.Job
bytes []byte
)
// 1, 解析POST表单
if err = req.ParseForm(); err != nil {
goto ERR
}
// 2, 取表单中的job字段
postJob = req.PostForm.Get("job")
// 3, 反序列化job
if err = json.Unmarshal([]byte(postJob), &job); err != nil {
goto ERR
}
// 4, 保存到etcd
if oldJob, err = G_jobMgr.SaveJob(&job); err != nil {
goto ERR
}
// 5, 返回正常应答 ({"errno": 0, "msg": "", "data": {....}})
if bytes, err = common.BuildResponse(0, "success", oldJob); err == nil {
resp.Write(bytes)
}
return
ERR:
// 6, 返回异常应答
if bytes, err = common.BuildResponse(-1, err.Error(), nil); err == nil {
resp.Write(bytes)
}
}
// 删除任务接口
// POST /job/delete name=job1
func handleJobDelete(resp http.ResponseWriter, req *http.Request) {
var (
err error // interface{}
name string
oldJob *common.Job
bytes []byte
)
// POST: a=1&b=2&c=3
if err = req.ParseForm(); err != nil {
goto ERR
}
// 删除的任务名
name = req.PostForm.Get("name")
// 去删除任务
if oldJob, err = G_jobMgr.DeleteJob(name); err != nil {
goto ERR
}
// 正常应答
if bytes, err = common.BuildResponse(0, "success", oldJob); err == nil {
resp.Write(bytes)
}
return
ERR:
if bytes, err = common.BuildResponse(-1, err.Error(), nil); err == nil {
resp.Write(bytes)
}
}
// 列举所有crontab任务
func handleJobList(resp http.ResponseWriter, req *http.Request) {
var (
jobList []*common.Job
bytes []byte
err error
)
// 获取任务列表
if jobList, err = G_jobMgr.ListJobs(); err != nil {
goto ERR
}
// 正常应答
if bytes, err = common.BuildResponse(0, "success", jobList); err == nil {
resp.Write(bytes)
}
return
ERR:
if bytes, err = common.BuildResponse(-1, err.Error(), nil); err == nil {
resp.Write(bytes)
}
}
// 强制杀死某个任务
// POST /job/kill name=job1
func handleJobKill(resp http.ResponseWriter, req *http.Request) {
var (
err error
name string
bytes []byte
)
// 解析POST表单
if err = req.ParseForm(); err != nil {
goto ERR
}
// 要杀死的任务名
name = req.PostForm.Get("name")
// 杀死任务
if err = G_jobMgr.KillJob(name); err != nil {
goto ERR
}
// 正常应答
if bytes, err = common.BuildResponse(0, "success", nil); err == nil {
resp.Write(bytes)
}
return
ERR:
if bytes, err = common.BuildResponse(-1, err.Error(), nil); err == nil {
resp.Write(bytes)
}
}
// 查询任务日志
func handleJobLog(resp http.ResponseWriter, req *http.Request) {
var (
err error
name string // 任务名字
skipParam string// 从第几条开始
limitParam string // 返回多少条
skip int
limit int
logArr []*common.JobLog
bytes []byte
)
// 解析GET参数
if err = req.ParseForm(); err != nil {
goto ERR
}
// 获取请求参数 /job/log?name=job10&skip=0&limit=10
name = req.Form.Get("name")
skipParam = req.Form.Get("skip")
limitParam = req.Form.Get("limit")
if skip, err = strconv.Atoi(skipParam); err != nil {
skip = 0
}
if limit, err = strconv.Atoi(limitParam); err != nil {
limit = 20
}
if logArr, err = G_logMgr.ListLog(name, skip, limit); err != nil {
goto ERR
}
// 正常应答
if bytes, err = common.BuildResponse(0, "success", logArr); err == nil {
resp.Write(bytes)
}
return
ERR:
if bytes, err = common.BuildResponse(-1, err.Error(), nil); err == nil {
resp.Write(bytes)
}
}
// 获取健康worker节点列表
func handleWorkerList(resp http.ResponseWriter, req *http.Request) {
var (
workerArr []string
err error
bytes []byte
)
if workerArr, err = G_workerMgr.ListWorkers(); err != nil {
goto ERR
}
// 正常应答
if bytes, err = common.BuildResponse(0, "success", workerArr); err == nil {
resp.Write(bytes)
}
return
ERR:
if bytes, err = common.BuildResponse(-1, err.Error(), nil); err == nil {
resp.Write(bytes)
}
}
// 初始化服务
func InitApiServer() (err error){
var (
mux *http.ServeMux
listener net.Listener
httpServer *http.Server
staticDir http.Dir // 静态文件根目录
staticHandler http.Handler // 静态文件的HTTP回调
)
// 配置路由
mux = http.NewServeMux()
mux.HandleFunc("/job/save", handleJobSave)
mux.HandleFunc("/job/delete", handleJobDelete)
mux.HandleFunc("/job/list", handleJobList)
mux.HandleFunc("/job/kill", handleJobKill)
mux.HandleFunc("/job/log", handleJobLog)
mux.HandleFunc("/worker/list", handleWorkerList)
// /index.html
// 静态文件目录
staticDir = http.Dir(G_config.WebRoot)
staticHandler = http.FileServer(staticDir)
mux.Handle("/", http.StripPrefix("/", staticHandler)) // ./webroot/index.html
// 启动TCP监听
if listener, err = net.Listen("tcp", ":" + strconv.Itoa(G_config.ApiPort)); err != nil {
return
}
// 创建一个HTTP服务
httpServer = &http.Server{
ReadTimeout: time.Duration(G_config.ApiReadTimeout) * time.Millisecond,
WriteTimeout: time.Duration(G_config.ApiWriteTimeout) * time.Millisecond,
Handler: mux,
}
// 赋值单例
G_apiServer = &ApiServer{
httpServer: httpServer,
}
// 启动了服务端
go httpServer.Serve(listener)
return
}
\ No newline at end of file
package master
import (
"io/ioutil"
"encoding/json"
)
// 程序配置
type Config struct {
ApiPort int `json:"apiPort"`
ApiReadTimeout int `json:"apiReadTimeout"`
ApiWriteTimeout int `json:"apiWriteTimeout"`
EtcdEndpoints []string `json:"etcdEndpoints"`
EtcdDialTimeout int `json:"etcdDialTimeout"`
WebRoot string `json:"webroot"`
MongodbUri string `json:"mongodbUri"`
MongodbConnectTimeout int `json:"mongodbConnectTimeout"`
}
var (
// 单例
G_config *Config
)
// 加载配置
func InitConfig(filename string) (err error) {
var (
content []byte
conf Config
)
// 1, 把配置文件读进来
if content, err = ioutil.ReadFile(filename); err != nil {
return
}
// 2, 做JSON反序列化
if err = json.Unmarshal(content, &conf); err != nil {
return
}
// 3, 赋值单例
G_config = &conf
return
}
\ No newline at end of file
package master
import (
"github.com/coreos/etcd/clientv3"
"time"
"github.com/owenliang/crontab/common"
"encoding/json"
"context"
"github.com/coreos/etcd/mvcc/mvccpb"
)
// 任务管理器
type JobMgr struct {
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
}
var (
// 单例
G_jobMgr *JobMgr
)
// 初始化管理器
func InitJobMgr() (err error) {
var (
config clientv3.Config
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
)
// 初始化配置
config = clientv3.Config{
Endpoints: G_config.EtcdEndpoints, // 集群地址
DialTimeout: time.Duration(G_config.EtcdDialTimeout) * time.Millisecond, // 连接超时
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
return
}
// 得到KV和Lease的API子集
kv = clientv3.NewKV(client)
lease = clientv3.NewLease(client)
// 赋值单例
G_jobMgr = &JobMgr{
client: client,
kv: kv,
lease: lease,
}
return
}
// 保存任务
func (jobMgr *JobMgr) SaveJob(job *common.Job) (oldJob *common.Job, err error) {
// 把任务保存到/cron/jobs/任务名 -> json
var (
jobKey string
jobValue []byte
putResp *clientv3.PutResponse
oldJobObj common.Job
)
// etcd的保存key
jobKey = common.JOB_SAVE_DIR + job.Name
// 任务信息json
if jobValue, err = json.Marshal(job); err != nil {
return
}
// 保存到etcd
if putResp, err = jobMgr.kv.Put(context.TODO(), jobKey, string(jobValue), clientv3.WithPrevKV()); err != nil {
return
}
// 如果是更新, 那么返回旧值
if putResp.PrevKv != nil {
// 对旧值做一个反序列化
if err = json.Unmarshal(putResp.PrevKv.Value, &oldJobObj); err != nil {
err = nil
return
}
oldJob = &oldJobObj
}
return
}
// 删除任务
func (jobMgr *JobMgr) DeleteJob(name string) (oldJob *common.Job, err error) {
var (
jobKey string
delResp *clientv3.DeleteResponse
oldJobObj common.Job
)
// etcd中保存任务的key
jobKey = common.JOB_SAVE_DIR + name
// 从etcd中删除它
if delResp, err = jobMgr.kv.Delete(context.TODO(), jobKey, clientv3.WithPrevKV()); err != nil {
return
}
// 返回被删除的任务信息
if len(delResp.PrevKvs) != 0 {
// 解析一下旧值, 返回它
if err =json.Unmarshal(delResp.PrevKvs[0].Value, &oldJobObj); err != nil {
err = nil
return
}
oldJob = &oldJobObj
}
return
}
// 列举任务
func (jobMgr *JobMgr) ListJobs() (jobList []*common.Job, err error) {
var (
dirKey string
getResp *clientv3.GetResponse
kvPair *mvccpb.KeyValue
job *common.Job
)
// 任务保存的目录
dirKey = common.JOB_SAVE_DIR
// 获取目录下所有任务信息
if getResp, err = jobMgr.kv.Get(context.TODO(), dirKey, clientv3.WithPrefix()); err != nil {
return
}
// 初始化数组空间
jobList = make([]*common.Job, 0)
// len(jobList) == 0
// 遍历所有任务, 进行反序列化
for _, kvPair = range getResp.Kvs {
job = &common.Job{}
if err =json.Unmarshal(kvPair.Value, job); err != nil {
err = nil
continue
}
jobList = append(jobList, job)
}
return
}
// 杀死任务
func (jobMgr *JobMgr) KillJob(name string) (err error) {
// 更新一下key=/cron/killer/任务名
var (
killerKey string
leaseGrantResp *clientv3.LeaseGrantResponse
leaseId clientv3.LeaseID
)
// 通知worker杀死对应任务
killerKey = common.JOB_KILLER_DIR + name
// 让worker监听到一次put操作, 创建一个租约让其稍后自动过期即可
if leaseGrantResp, err = jobMgr.lease.Grant(context.TODO(), 1); err != nil {
return
}
// 租约ID
leaseId = leaseGrantResp.ID
// 设置killer标记
if _, err = jobMgr.kv.Put(context.TODO(), killerKey, "", clientv3.WithLease(leaseId)); err != nil {
return
}
return
}
\ No newline at end of file
package master
import (
"github.com/mongodb/mongo-go-driver/mongo"
"context"
"github.com/mongodb/mongo-go-driver/mongo/clientopt"
"time"
"github.com/owenliang/crontab/common"
"github.com/mongodb/mongo-go-driver/mongo/findopt"
)
// mongodb日志管理
type LogMgr struct {
client *mongo.Client
logCollection *mongo.Collection
}
var (
G_logMgr *LogMgr
)
func InitLogMgr() (err error) {
var (
client *mongo.Client
)
// 建立mongodb连接
if client, err = mongo.Connect(
context.TODO(),
G_config.MongodbUri,
clientopt.ConnectTimeout(time.Duration(G_config.MongodbConnectTimeout) * time.Millisecond)); err != nil {
return
}
G_logMgr = &LogMgr{
client: client,
logCollection: client.Database("cron").Collection("log"),
}
return
}
// 查看任务日志
func (logMgr *LogMgr) ListLog(name string, skip int, limit int) (logArr []*common.JobLog, err error){
var (
filter *common.JobLogFilter
logSort *common.SortLogByStartTime
cursor mongo.Cursor
jobLog *common.JobLog
)
// len(logArr)
logArr = make([]*common.JobLog, 0)
// 过滤条件
filter = &common.JobLogFilter{JobName: name}
// 按照任务开始时间倒排
logSort = &common.SortLogByStartTime{SortOrder: -1}
// 查询
if cursor, err = logMgr.logCollection.Find(context.TODO(), filter, findopt.Sort(logSort), findopt.Skip(int64(skip)), findopt.Limit(int64(limit))); err != nil {
return
}
// 延迟释放游标
defer cursor.Close(context.TODO())
for cursor.Next(context.TODO()) {
jobLog = &common.JobLog{}
// 反序列化BSON
if err = cursor.Decode(jobLog); err != nil {
continue // 有日志不合法
}
logArr = append(logArr, jobLog)
}
return
}
\ No newline at end of file
package master
import (
"github.com/coreos/etcd/clientv3"
"time"
"context"
"github.com/owenliang/crontab/common"
"github.com/coreos/etcd/mvcc/mvccpb"
)
// /cron/workers/
type WorkerMgr struct {
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
}
var (
G_workerMgr *WorkerMgr
)
// 获取在线worker列表
func (workerMgr *WorkerMgr) ListWorkers() (workerArr []string, err error) {
var (
getResp *clientv3.GetResponse
kv *mvccpb.KeyValue
workerIP string
)
// 初始化数组
workerArr = make([]string, 0)
// 获取目录下所有Kv
if getResp, err = workerMgr.kv.Get(context.TODO(), common.JOB_WORKER_DIR, clientv3.WithPrefix()); err != nil {
return
}
// 解析每个节点的IP
for _, kv = range getResp.Kvs {
// kv.Key : /cron/workers/192.168.2.1
workerIP = common.ExtractWorkerIP(string(kv.Key))
workerArr = append(workerArr, workerIP)
}
return
}
func InitWorkerMgr() (err error) {
var (
config clientv3.Config
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
)
// 初始化配置
config = clientv3.Config{
Endpoints: G_config.EtcdEndpoints, // 集群地址
DialTimeout: time.Duration(G_config.EtcdDialTimeout) * time.Millisecond, // 连接超时
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
return
}
// 得到KV和Lease的API子集
kv = clientv3.NewKV(client)
lease = clientv3.NewLease(client)
G_workerMgr = &WorkerMgr{
client :client,
kv: kv,
lease: lease,
}
return
}
\ No newline at end of file
package main
import (
"runtime"
"github.com/owenliang/crontab/master"
"fmt"
"flag"
"time"
)
var (
confFile string // 配置文件路径
)
// 解析命令行参数
func initArgs() {
// master -config ./master.json -xxx 123 -yyy ddd
// master -h
flag.StringVar(&confFile, "config", "./master.json", "指定master.json")
flag.Parse()
}
// 初始化线程数量
func initEnv() {
runtime.GOMAXPROCS(runtime.NumCPU())
}
func main() {
var (
err error
)
// 初始化命令行参数
initArgs()
// 初始化线程
initEnv()
// 加载配置
if err = master.InitConfig(confFile); err != nil {
goto ERR
}
// 初始化服务发现模块
if err = master.InitWorkerMgr(); err != nil {
goto ERR
}
// 日志管理器
if err =master.InitLogMgr(); err != nil {
goto ERR
}
// 任务管理器
if err = master.InitJobMgr(); err != nil {
goto ERR
}
// 启动Api HTTP服务
if err = master.InitApiServer(); err != nil {
goto ERR
}
// 正常退出
for {
time.Sleep(1 * time.Second)
}
return
ERR:
fmt.Println(err)
}
{
"API接口服务端口": "提供任务增删改查服务",
"apiPort": 8070,
"API接口读超时": "单位是毫秒",
"apiReadTimeout": 5000,
"API接口写超时": "单位是毫秒",
"apiWriteTimeout": 5000,
"etcd的集群列表": "配置多个, 避免单点故障",
"etcdEndpoints": ["36.111.184.221:2379"],
"etcd的连接超时": "单位毫秒",
"etcdDialTimeout": 5000,
"web页面根目录": "静态页面,前后端分离开发",
"webroot": "./webroot",
"mongodb地址": "采用mongodb URI",
"mongodbUri": "mongodb://36.111.184.221:27017",
"mongodb连接超时时间": "单位毫秒",
"mongodbConnectTimeout": 5000
}
\ No newline at end of file
package main
import (
"fmt"
"go.mongodb.org/mongo-driver/mongo"
"context"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
)
// 任务的执行时间点
type TimePoint struct {
StartTime int64 `bson:"startTime"`
EndTime int64 `bson:"endTime"`
}
// 一条日志
type LogRecord struct {
JobName string `bson:"jobName"` // 任务名
Command string `bson:"command"` // shell命令
Err string `bson:"err"` // 脚本错误
Content string `bson:"content"`// 脚本输出
TimePoint TimePoint `bson:"timePoint"`// 执行时间点
}
func main(){
var (
client *mongo.Client
err error
result *mongo.InsertOneResult
)
// 建立mongodb连接
clientOptions := options.Client().ApplyURI("mongodb://ichunt:huntmon6699@192.168.1.237:27017/ichunt?authMechanism=SCRAM-SHA-1")
if client, err = mongo.Connect(
context.TODO(),clientOptions); err != nil {
return
}
// 2, 选择数据库my_db
database := client.Database("ichunt")
// 3, 选择表my_collection
collection := database.Collection("cron_log")
// 4, 插入记录(bson)
record := &LogRecord{
JobName: "job10",
Command: "echo hello",
Err: "",
Content: "hello",
TimePoint: TimePoint{StartTime: time.Now().Unix(), EndTime: time.Now().Unix() + 10},
}
if result, err = collection.InsertOne(context.TODO(), record); err != nil {
fmt.Println(err)
return
}
fmt.Println(result)
//// _id: 默认生成一个全局唯一ID, ObjectID:12字节的二进制
docId := result.InsertedID.(primitive.ObjectID)
fmt.Println("自增ID:", docId.Hex())
}
\ No newline at end of file
package worker
import (
"encoding/json"
"io/ioutil"
)
// 程序配置
type Config struct {
EtcdEndpoints []string `json:"etcdEndpoints"`
EtcdDialTimeout int `json:"etcdDialTimeout"`
MongodbUri string `json:"mongodbUri"`
MongodbConnectTimeout int `json:"mongodbConnectTimeout"`
JobLogBatchSize int `json:"jobLogBatchSize"`
JobLogCommitTimeout int `json"jobLogCommitTimeout"`
}
var (
// 单例
G_config *Config
)
// 加载配置
func InitConfig(filename string) (err error) {
var (
content []byte
conf Config
)
// 1, 把配置文件读进来
if content, err = ioutil.ReadFile(filename); err != nil {
return
}
// 2, 做JSON反序列化
if err = json.Unmarshal(content, &conf); err != nil {
return
}
// 3, 赋值单例
G_config = &conf
return
}
\ No newline at end of file
package worker
import (
"go-crontab/common"
"os/exec"
"time"
"math/rand"
)
// 任务执行器
type Executor struct {
}
var (
G_executor *Executor
)
// 执行一个任务
func (executor *Executor) ExecuteJob(info *common.JobExecuteInfo) {
go func() {
var (
cmd *exec.Cmd
err error
output []byte
result *common.JobExecuteResult
jobLock *JobLock
)
// 任务结果
result = &common.JobExecuteResult{
ExecuteInfo: info,
Output: make([]byte, 0),
}
// 初始化分布式锁
jobLock = G_jobMgr.CreateJobLock(info.Job.Name)
// 记录任务开始时间
result.StartTime = time.Now()
// 上锁
// 随机睡眠(0~1s)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
err = jobLock.TryLock()
defer jobLock.Unlock()
if err != nil { // 上锁失败
result.Err = err
result.EndTime = time.Now()
} else {
// 上锁成功后,重置任务启动时间
result.StartTime = time.Now()
// 执行shell命令
cmd = exec.CommandContext(info.CancelCtx, "/bin/bash", "-c", info.Job.Command)
// 执行并捕获输出
output, err = cmd.CombinedOutput()
// 记录任务结束时间
result.EndTime = time.Now()
result.Output = output
result.Err = err
}
// 任务执行完成后,把执行的结果返回给Scheduler,Scheduler会从executingTable中删除掉执行记录
G_scheduler.PushJobResult(result)
}()
}
// 初始化执行器
func InitExecutor() (err error) {
G_executor = &Executor{}
return
}
\ No newline at end of file
package worker
import (
"github.com/coreos/etcd/clientv3"
"context"
"go-crontab/common"
)
// 分布式锁(TXN事务)
type JobLock struct {
// etcd客户端
kv clientv3.KV
lease clientv3.Lease
jobName string // 任务名
cancelFunc context.CancelFunc // 用于终止自动续租
leaseId clientv3.LeaseID // 租约ID
isLocked bool // 是否上锁成功
}
// 初始化一把锁
func InitJobLock(jobName string, kv clientv3.KV, lease clientv3.Lease) (jobLock *JobLock) {
jobLock = &JobLock{
kv: kv,
lease: lease,
jobName: jobName,
}
return
}
// 尝试上锁
func (jobLock *JobLock) TryLock() (err error) {
var (
leaseGrantResp *clientv3.LeaseGrantResponse
cancelCtx context.Context
cancelFunc context.CancelFunc
leaseId clientv3.LeaseID
keepRespChan <- chan *clientv3.LeaseKeepAliveResponse
txn clientv3.Txn
lockKey string
txnResp *clientv3.TxnResponse
)
// 1, 创建租约(5秒)
if leaseGrantResp, err = jobLock.lease.Grant(context.TODO(), 5); err != nil {
return
}
// context用于取消自动续租
cancelCtx, cancelFunc = context.WithCancel(context.TODO())
// 租约ID
leaseId = leaseGrantResp.ID
// 2, 自动续租
if keepRespChan, err = jobLock.lease.KeepAlive(cancelCtx, leaseId); err != nil {
goto FAIL
}
// 3, 处理续租应答的协程
go func() {
var (
keepResp *clientv3.LeaseKeepAliveResponse
)
for {
select {
case keepResp = <- keepRespChan: // 自动续租的应答
if keepResp == nil {
goto END
}
}
}
END:
}()
// 4, 创建事务txn
txn = jobLock.kv.Txn(context.TODO())
// 锁路径
lockKey = common.JOB_LOCK_DIR + jobLock.jobName
// 5, 事务抢锁
txn.If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
Then(clientv3.OpPut(lockKey, "", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet(lockKey))
// 提交事务
if txnResp, err = txn.Commit(); err != nil {
goto FAIL
}
// 6, 成功返回, 失败释放租约
if !txnResp.Succeeded { // 锁被占用
err = common.ERR_LOCK_ALREADY_REQUIRED
goto FAIL
}
// 抢锁成功
jobLock.leaseId = leaseId
jobLock.cancelFunc = cancelFunc
jobLock.isLocked = true
return
FAIL:
cancelFunc() // 取消自动续租
jobLock.lease.Revoke(context.TODO(), leaseId) // 释放租约
return
}
// 释放锁
func (jobLock *JobLock) Unlock() {
if jobLock.isLocked {
jobLock.cancelFunc() // 取消我们程序自动续租的协程
jobLock.lease.Revoke(context.TODO(), jobLock.leaseId) // 释放租约
}
}
\ No newline at end of file
package worker
import (
"context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"go-crontab/common"
"time"
)
// 任务管理器
type JobMgr struct {
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
watcher clientv3.Watcher
}
var (
// 单例
G_jobMgr *JobMgr
)
// 监听任务变化
func (jobMgr *JobMgr) watchJobs() (err error) {
var (
getResp *clientv3.GetResponse
kvpair *mvccpb.KeyValue
job *common.Job
watchStartRevision int64
watchChan clientv3.WatchChan
watchResp clientv3.WatchResponse
watchEvent *clientv3.Event
jobName string
jobEvent *common.JobEvent
)
// 1, get一下/cron/jobs/目录下的所有任务,并且获知当前集群的revision
if getResp, err = jobMgr.kv.Get(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithPrefix()); err != nil {
return
}
//fmt.Println(getResp)
// 当前有哪些任务
for _, kvpair = range getResp.Kvs {
// 反序列化json得到Job
if job, err = common.UnpackJob(kvpair.Value); err == nil {
jobEvent = common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
// 同步给scheduler(调度协程)
G_scheduler.PushJobEvent(jobEvent)
}
}
// 2, 从该revision向后监听变化事件
go func() { // 监听协程
// 从GET时刻的后续版本开始监听变化
watchStartRevision = getResp.Header.Revision + 1
// 监听/cron/jobs/目录的后续变化
watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_SAVE_DIR, clientv3.WithRev(watchStartRevision), clientv3.WithPrefix())
// 处理监听事件
for watchResp = range watchChan {
for _, watchEvent = range watchResp.Events {
switch watchEvent.Type {
case mvccpb.PUT: // 任务保存事件 新增或者修改
//反序列化job 推送一个更新事件给scheduler
if job, err = common.UnpackJob(watchEvent.Kv.Value); err != nil {
continue
}
// 构建一个更新Event
jobEvent = common.BuildJobEvent(common.JOB_EVENT_SAVE, job)
case mvccpb.DELETE: // 任务被删除了
//推送一个删除事件给scheduler
// Delete /cron/jobs/job10
jobName = common.ExtractJobName(string(watchEvent.Kv.Key))
job = &common.Job{Name: jobName}
// 构建一个删除Event
jobEvent = common.BuildJobEvent(common.JOB_EVENT_DELETE, job)
}
// 变化推给scheduler
//推送到channel里面 管道 jobEventChan<-
G_scheduler.PushJobEvent(jobEvent)
}
}
}()
return
}
// 监听强杀任务通知
func (jobMgr *JobMgr) watchKiller() {
var (
watchChan clientv3.WatchChan
watchResp clientv3.WatchResponse
watchEvent *clientv3.Event
jobEvent *common.JobEvent
jobName string
job *common.Job
)
// 监听/cron/killer目录
go func() { // 监听协程
// 监听/cron/killer/目录的变化
watchChan = jobMgr.watcher.Watch(context.TODO(), common.JOB_KILLER_DIR, clientv3.WithPrefix())
// 处理监听事件
for watchResp = range watchChan {
for _, watchEvent = range watchResp.Events {
switch watchEvent.Type {
case mvccpb.PUT: // 杀死任务事件
jobName = common.ExtractKillerName(string(watchEvent.Kv.Key))
job = &common.Job{Name: jobName}
jobEvent = common.BuildJobEvent(common.JOB_EVENT_KILL, job)
// 事件推给scheduler
G_scheduler.PushJobEvent(jobEvent)
case mvccpb.DELETE: // killer标记过期, 被自动删除
}
}
}
}()
}
// 初始化管理器
func InitJobMgr() (err error) {
var (
config clientv3.Config
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
watcher clientv3.Watcher
)
// 初始化配置
config = clientv3.Config{
Endpoints: G_config.EtcdEndpoints, // 集群地址
DialTimeout: time.Duration(G_config.EtcdDialTimeout) * time.Millisecond, // 连接超时
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
return
}
// 得到KV和Lease的API子集
kv = clientv3.NewKV(client)
lease = clientv3.NewLease(client)
watcher = clientv3.NewWatcher(client)
// 赋值单例
G_jobMgr = &JobMgr{
client: client,
kv: kv,
lease: lease,
watcher: watcher,
}
// 启动任务监听
G_jobMgr.watchJobs()
// 启动监听killer
G_jobMgr.watchKiller()
return
}
// 创建任务执行锁
func (jobMgr *JobMgr) CreateJobLock(jobName string) (jobLock *JobLock){
jobLock = InitJobLock(jobName, jobMgr.kv, jobMgr.lease)
return
}
\ No newline at end of file
package worker
import (
"go.mongodb.org/mongo-driver/mongo"
"go-crontab/common"
"context"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)
// mongodb存储日志
type LogSink struct {
client *mongo.Client
logCollection *mongo.Collection
logChan chan *common.JobLog
autoCommitChan chan *common.LogBatch
}
var (
// 单例
G_logSink *LogSink
)
// 批量写入日志
func (logSink *LogSink) saveLogs(batch *common.LogBatch) {
logSink.logCollection.InsertMany(context.TODO(), batch.Logs)
}
// 日志存储协程
func (logSink *LogSink) writeLoop() {
var (
log *common.JobLog
logBatch *common.LogBatch // 当前的批次
commitTimer *time.Timer
timeoutBatch *common.LogBatch // 超时批次
)
for {
select {
case log = <- logSink.logChan:
if logBatch == nil {
logBatch = &common.LogBatch{}
// 让这个批次超时自动提交(给1秒的时间)
commitTimer = time.AfterFunc(
time.Duration(G_config.JobLogCommitTimeout) * time.Millisecond,
func(batch *common.LogBatch) func() {
return func() {
logSink.autoCommitChan <- batch
}
}(logBatch),
)
}
// 把新日志追加到批次中
logBatch.Logs = append(logBatch.Logs, log)
// 如果批次满了, 就立即发送
if len(logBatch.Logs) >= G_config.JobLogBatchSize {
// 发送日志
logSink.saveLogs(logBatch)
// 清空logBatch
logBatch = nil
// 取消定时器
commitTimer.Stop()
}
case timeoutBatch = <- logSink.autoCommitChan: // 过期的批次
// 判断过期批次是否仍旧是当前的批次
if timeoutBatch != logBatch {
continue // 跳过已经被提交的批次
}
// 把批次写入到mongo中
logSink.saveLogs(timeoutBatch)
// 清空logBatch
logBatch = nil
}
}
}
func InitLogSink() (err error) {
var (
client *mongo.Client
)
// 建立mongodb连接
clientOptions := options.Client().ApplyURI("mongodb://ichunt:huntmon6699@192.168.1.237:27017/ichunt?authMechanism=SCRAM-SHA-1")
if client, err = mongo.Connect(
context.TODO(),clientOptions); err != nil {
return
}
// 选择db和collection
G_logSink = &LogSink{
client: client,
logCollection: client.Database("ichunt").Collection("cron_log"),
logChan: make(chan *common.JobLog, 1000),
autoCommitChan: make(chan *common.LogBatch, 1000),
}
// 启动一个mongodb处理协程
go G_logSink.writeLoop()
return
}
// 发送日志
func (logSink *LogSink) Append(jobLog *common.JobLog) {
select {
case logSink.logChan <- jobLog:
default:
// 队列满了就丢弃
}
}
\ No newline at end of file
package worker
import (
"context"
"github.com/coreos/etcd/clientv3"
"go-crontab/common"
"net"
"time"
)
// 注册节点到etcd: /cron/workers/IP地址
type Register struct {
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
localIP string // 本机IP
}
var (
G_register *Register
)
// 获取本机网卡IP
func getLocalIP() (ipv4 string, err error) {
var (
addrs []net.Addr
addr net.Addr
ipNet *net.IPNet // IP地址
isIpNet bool
)
// 获取所有网卡
if addrs, err = net.InterfaceAddrs(); err != nil {
return
}
// 取第一个非lo的网卡IP
for _, addr = range addrs {
// 这个网络地址是IP地址: ipv4, ipv6
if ipNet, isIpNet = addr.(*net.IPNet); isIpNet && !ipNet.IP.IsLoopback() {
// 跳过IPV6
if ipNet.IP.To4() != nil {
ipv4 = ipNet.IP.String() // 192.168.1.1
return
}
}
}
err = common.ERR_NO_LOCAL_IP_FOUND
return
}
// 注册到/cron/workers/IP, 并自动续租
func (register *Register) keepOnline() {
var (
regKey string
leaseGrantResp *clientv3.LeaseGrantResponse
err error
keepAliveChan <- chan *clientv3.LeaseKeepAliveResponse
keepAliveResp *clientv3.LeaseKeepAliveResponse
cancelCtx context.Context
cancelFunc context.CancelFunc
)
for {
// 注册路径
regKey = common.JOB_WORKER_DIR + register.localIP
cancelFunc = nil
// 创建租约
if leaseGrantResp, err = register.lease.Grant(context.TODO(), 10); err != nil {
goto RETRY
}
// 自动续租
if keepAliveChan, err = register.lease.KeepAlive(context.TODO(), leaseGrantResp.ID); err != nil {
goto RETRY
}
cancelCtx, cancelFunc = context.WithCancel(context.TODO())
// 注册到etcd
if _, err = register.kv.Put(cancelCtx, regKey, "", clientv3.WithLease(leaseGrantResp.ID)); err != nil {
goto RETRY
}
// 处理续租应答
for {
select {
case keepAliveResp = <- keepAliveChan:
if keepAliveResp == nil { // 续租失败
goto RETRY
}
}
}
RETRY:
time.Sleep(1 * time.Second)
if cancelFunc != nil {
cancelFunc()
}
}
}
func InitRegister() (err error) {
var (
config clientv3.Config
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
localIp string
)
// 初始化配置
config = clientv3.Config{
Endpoints: G_config.EtcdEndpoints, // 集群地址
DialTimeout: time.Duration(G_config.EtcdDialTimeout) * time.Millisecond, // 连接超时
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
return
}
// 本机IP
if localIp, err = getLocalIP(); err != nil {
return
}
// 得到KV和Lease的API子集
kv = clientv3.NewKV(client)
lease = clientv3.NewLease(client)
G_register = &Register{
client: client,
kv: kv,
lease: lease,
localIP: localIp,
}
// 服务注册
go G_register.keepOnline()
return
}
\ No newline at end of file
package worker
import (
"go-crontab/common"
"time"
"fmt"
)
// 任务调度
type Scheduler struct {
jobEventChan chan *common.JobEvent // etcd任务事件队列 把任务放入channel管道
jobPlanTable map[string]*common.JobSchedulePlan // 任务调度计划表
jobExecutingTable map[string]*common.JobExecuteInfo // 任务执行表
jobResultChan chan *common.JobExecuteResult // 任务结果队列
}
var (
G_scheduler *Scheduler
)
// 处理任务事件
func (scheduler *Scheduler) handleJobEvent(jobEvent *common.JobEvent) {
var (
jobSchedulePlan *common.JobSchedulePlan
jobExecuteInfo *common.JobExecuteInfo
jobExecuting bool
jobExisted bool
err error
)
switch jobEvent.EventType {
case common.JOB_EVENT_SAVE: // 保存任务事件
if jobSchedulePlan, err = common.BuildJobSchedulePlan(jobEvent.Job); err != nil {
return
}
scheduler.jobPlanTable[jobEvent.Job.Name] = jobSchedulePlan
case common.JOB_EVENT_DELETE: // 删除任务事件
if jobSchedulePlan, jobExisted = scheduler.jobPlanTable[jobEvent.Job.Name]; jobExisted {
delete(scheduler.jobPlanTable, jobEvent.Job.Name)
}
case common.JOB_EVENT_KILL: // 强杀任务事件
// 取消掉Command执行, 判断任务是否在执行中
if jobExecuteInfo, jobExecuting = scheduler.jobExecutingTable[jobEvent.Job.Name]; jobExecuting {
jobExecuteInfo.CancelFunc() // 触发command杀死shell子进程, 任务得到退出
}
}
}
// 尝试执行任务
func (scheduler *Scheduler) TryStartJob(jobPlan *common.JobSchedulePlan) {
// 调度 和 执行 是2件事情
var (
jobExecuteInfo *common.JobExecuteInfo
jobExecuting bool
)
// 执行的任务可能运行很久, 1分钟会调度60次,但是只能执行1次, 防止并发!
// 如果任务正在执行,跳过本次调度
if jobExecuteInfo, jobExecuting = scheduler.jobExecutingTable[jobPlan.Job.Name]; jobExecuting {
// fmt.Println("尚未退出,跳过执行:", jobPlan.Job.Name)
return
}
// 构建执行状态信息
jobExecuteInfo = common.BuildJobExecuteInfo(jobPlan)
// 保存执行状态
scheduler.jobExecutingTable[jobPlan.Job.Name] = jobExecuteInfo
// 执行任务
fmt.Println("执行任务:", jobExecuteInfo.Job.Name, jobExecuteInfo.PlanTime, jobExecuteInfo.RealTime)
G_executor.ExecuteJob(jobExecuteInfo)
}
// 重新计算任务调度状态
//计算出最近要过期的任务 下次要执行的任务 还有多久
//5秒后一个任务要执行
func (scheduler *Scheduler) TrySchedule() (scheduleAfter time.Duration) {
var (
jobPlan *common.JobSchedulePlan
now time.Time
nearTime *time.Time
)
// 如果任务表为空话,随便睡眠多久
if len(scheduler.jobPlanTable) == 0 {
scheduleAfter = 1 * time.Second
return
}
// 当前时间
now = time.Now()
// 遍历所有任务
for _, jobPlan = range scheduler.jobPlanTable {
//到期的任务 尝试执行任务
if jobPlan.NextTime.Before(now) || jobPlan.NextTime.Equal(now) {
//尝试执行任务:有可能任务到期要执行了 但是上一次任务还没有结束,那么这次就不会执行
//更新下下次执行时间即可
scheduler.TryStartJob(jobPlan)
jobPlan.NextTime = jobPlan.Expr.Next(now) // 更新下次执行时间
}
//没有过期的任务
// 统计最近一个要过期的任务时间
if nearTime == nil || jobPlan.NextTime.Before(*nearTime) {
nearTime = &jobPlan.NextTime
}
}
// 下次调度间隔(最近要执行的任务调度时间 - 当前时间)
//假如最近一次任务是5秒后执行 那么久睡眠5秒
scheduleAfter = (*nearTime).Sub(now)
return
}
// 处理任务结果
func (scheduler *Scheduler) handleJobResult(result *common.JobExecuteResult) {
var (
jobLog *common.JobLog
)
// 删除执行状态
delete(scheduler.jobExecutingTable, result.ExecuteInfo.Job.Name)
// 生成执行日志
if result.Err != common.ERR_LOCK_ALREADY_REQUIRED {
jobLog = &common.JobLog{
JobName: result.ExecuteInfo.Job.Name,
Command: result.ExecuteInfo.Job.Command,
Output: string(result.Output),
PlanTime: result.ExecuteInfo.PlanTime.UnixNano() / 1000 / 1000,
ScheduleTime: result.ExecuteInfo.RealTime.UnixNano() / 1000 / 1000,
StartTime: result.StartTime.UnixNano() / 1000 / 1000,
EndTime: result.EndTime.UnixNano() / 1000 / 1000,
}
if result.Err != nil {
jobLog.Err = result.Err.Error()
} else {
jobLog.Err = ""
}
G_logSink.Append(jobLog)
}
// fmt.Println("任务执行完成:", result.ExecuteInfo.Job.Name, string(result.Output), result.Err)
}
// 调度协程
func (scheduler *Scheduler) scheduleLoop() {
var (
jobEvent *common.JobEvent
scheduleAfter time.Duration //下次执行任务时间还有多久
scheduleTimer *time.Timer
jobResult *common.JobExecuteResult
)
// 初始化一次(1秒) 得到下次调度的时间
scheduleAfter = scheduler.TrySchedule()
// 调度的延迟定时器
scheduleTimer = time.NewTimer(scheduleAfter)
// 定时任务common.Job
for {
select {
case jobEvent = <- scheduler.jobEventChan: //监听任务变化事件 新增job或者修改job操作会插入到该管道
// 对内存中维护的任务列表做增删改查 当有事件来了后 就把任务放入内存中
//当有删除操作 就把该任务从内存中删掉
scheduler.handleJobEvent(jobEvent)
case <- scheduleTimer.C: // 最近的任务到期了
case jobResult = <- scheduler.jobResultChan: // 监听任务执行结果
scheduler.handleJobResult(jobResult)
}
// 调度一次任务 最近要过期 要执行的任务
scheduleAfter = scheduler.TrySchedule()
// 重置调度间隔
scheduleTimer.Reset(scheduleAfter)
}
}
// 推送任务变化事件
//投递一个jobEvent 到jobEventChan 管道里面
func (scheduler *Scheduler) PushJobEvent(jobEvent *common.JobEvent) {
scheduler.jobEventChan <- jobEvent
}
// 初始化调度器
func InitScheduler() (err error) {
G_scheduler = &Scheduler{
jobEventChan: make(chan *common.JobEvent, 1000),//变化事件
jobPlanTable: make(map[string]*common.JobSchedulePlan),//存放着所有要执行的计划任务
jobExecutingTable: make(map[string]*common.JobExecuteInfo),//任务执行状态
jobResultChan: make(chan *common.JobExecuteResult, 1000),// 任务执行结果
}
// 启动调度协程
go G_scheduler.scheduleLoop()
return
}
// 回传任务执行结果
func (scheduler *Scheduler) PushJobResult(jobResult *common.JobExecuteResult) {
scheduler.jobResultChan <- jobResult
}
\ No newline at end of file
package main
import (
"flag"
"fmt"
"go-crontab/worker"
"runtime"
"time"
)
var (
confFile string // 配置文件路径
)
// 解析命令行参数
func initArgs() {
// worker -config ./worker.json
// worker -h
flag.StringVar(&confFile, "config", "./worker.json", "worker.json")
flag.Parse()
}
// 初始化线程数量
func initEnv() {
runtime.GOMAXPROCS(runtime.NumCPU())
}
func main() {
var (
err error
)
// 初始化命令行参数
initArgs()
// 初始化线程
initEnv()
// 加载配置
if err = worker.InitConfig(confFile); err != nil {
goto ERR
}
// 服务注册
// key:/cron/workers/192.168.2.246
if err = worker.InitRegister(); err != nil {
goto ERR
}
//
//// 启动日志协程
if err = worker.InitLogSink(); err != nil {
goto ERR
}
//
// 启动执行器
if err = worker.InitExecutor(); err != nil {
goto ERR
}
//
//// 启动调度器
if err = worker.InitScheduler(); err != nil {
goto ERR
}
//
//// 初始化任务管理器
if err = worker.InitJobMgr(); err != nil {
goto ERR
}
//
//// 正常退出
for {
time.Sleep(1 * time.Second)
}
return
ERR:
fmt.Println(err)
}
{
"etcd的集群列表": "配置多个, 避免单点故障",
"etcdEndpoints": ["192.168.2.232:2379"],
"etcd的连接超时": "单位毫秒",
"etcdDialTimeout": 5000,
"mongodb地址": "采用mongodb URI",
"mongodbUri": "mongodb://36.111.184.221:27017",
"mongodb连接超时时间": "单位毫秒",
"mongodbConnectTimeout": 5000,
"日志批次大小": "为了减少mongodb网络往返, 打包成一批写入",
"jobLogBatchSize": 100,
"日志自动提交超时": "在批次未达到阀值之前, 超时会自动提交batch",
"jobLogCommitTimeout": 1000
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment