Commit 133cfdd8 by 孙龙

init

parent d8222df0
......@@ -3,9 +3,9 @@
dns="liexin:liexin#zsyM@tcp(192.168.2.232:3306)/liexin?parseTime=true"
[rabbitmq_ichunt]
queue_name="send_buyer_mail"
routing_key="send_buyer_mail"
exchange="ichunt_order_msg"
queue_name="ichunt_monitor_user_behavior"
routing_key="ichunt_monitor_user_behavior"
exchange="ichunt_monitor_behavior"
type="direct"
dns="amqp://guest:guest@192.168.2.232:5672/"
......@@ -8,4 +8,5 @@ require (
github.com/ichunt2019/go-msgserver v1.0.5 // indirect
github.com/ichunt2019/logger v1.0.5
github.com/jmoiron/sqlx v1.2.0
go.mongodb.org/mongo-driver v1.3.0
)
package apiMsgService
import (
"fmt"
//"fmt"
"go.mongodb.org/mongo-driver/mongo"
"context"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)
type LogBatch struct {
Logs []interface{} // 多条日志
}
// mongodb存储日志
type LogSink struct {
client *mongo.Client
logCollection *mongo.Collection
logChan chan *DataParams
autoCommitChan chan *LogBatch
}
var (
// 单例
G_logSink *LogSink
)
// 批量写入日志
func (logSink *LogSink) saveLogs(batch *LogBatch) {
logSink.logCollection.InsertMany(context.TODO(), batch.Logs)
}
// 日志存储协程
func (logSink *LogSink) writeLoop() {
//select{
//case log := <- logSink.logChan:
// fmt.Println("从管道中读取日志")
// fmt.Println(log)
//}
var (
log *DataParams
logBatch *LogBatch // 当前的批次
commitTimer *time.Timer
//timeoutBatch *LogBatch // 超时批次
)
for {
select {
case log = <- logSink.logChan:
fmt.Println("新任务")
if logBatch == nil {
logBatch = &LogBatch{}
}
// 把新日志追加到批次中
logBatch.Logs = append(logBatch.Logs, log)
// 如果批次满了, 就立即发送
if len(logBatch.Logs) >= 10 {
// 发送日志
logSink.saveLogs(logBatch)
// 清空logBatch
logBatch = nil
// 取消定时器
//commitTimer.Stop()
}
case <- time.NewTimer(1*time.Second).C:
if logBatch == nil {
logBatch = &LogBatch{}
}
fmt.Println("超时到期了")
fmt.Println(len(logBatch.Logs))
logSink.saveLogs(logBatch)
logBatch = nil
}
}
}
func InitLogSink() (err error) {
var (
client *mongo.Client
)
// 建立mongodb连接
clientOptions := options.Client().ApplyURI("mongodb://ichunt:huntmon6699@192.168.1.237:27017/ichunt?authMechanism=SCRAM-SHA-1")
if client, err = mongo.Connect(
context.TODO(),clientOptions); err != nil {
return
}
// 选择db和collection
G_logSink = &LogSink{
client: client,
logCollection: client.Database("ichunt").Collection("monitor_log"),
logChan: make(chan *DataParams, 1000),
//autoCommitChan: make(chan *common.LogBatch, 1000),
}
// 启动一个mongodb处理协程
go G_logSink.writeLoop()
return
}
// 发送日志
func (logSink *LogSink) Append(jobLog *DataParams) {
select {
case logSink.logChan <- jobLog:
default:
// 队列满了就丢弃
}
}
\ No newline at end of file
......@@ -2,61 +2,72 @@ package apiMsgService
import (
"encoding/json"
"fmt"
"go-api-behavior/util"
"time"
)
func ToJson(msg string) (err error){
err = json.Unmarshal([]byte(msg),&util.MsgParams)
return err
}
//写日志到管道
func SaveElkLogChan(MsgParams *util.DataParams){
util.MsgChan <- MsgParams
// 队列参数
type DataParams struct {
InterfaceType string `json:"interface_type"`
AccessUrl string `json:"access_url"`
RequestParams string `json:"request_params"`
ErrMsg string `json:"err_msg"`
ErrCode string `json:"err_code"`
Uid string `json:"uid"`
UserName string `json:"user_name"`
UserIp string `json:"user_ip"`
Remakr string `json:"remark"`
CreateTime int64 `json:"create_time"`
CreateTimeStr string `json:"create_time_str"`
}
func SaveElkLog(LogReportDir string){
var(
logBatch *util.LogBatch
)
fileLog,_ := util.NewFileLogger(LogReportDir)
fileLog.Init()
var MsgParams *DataParams
for{
select{
case util.MsgParams = <- util.MsgChan:
if logBatch == nil{
logBatch = &util.LogBatch{}
}
// 把新日志追加到批次中
logBatch.Logs = append(logBatch.Logs, util.MsgParams)
// 如果批次满了, 就立即写入
if len(logBatch.Logs) > 500{
//写入日志
fmt.Println("长度到了开始写入日志")
defer fileLog.Close()
fileLog.WriteLog(logBatch)
// 清空logBatch
logBatch = nil
}
case <- time.NewTimer(10*time.Second).C:
if logBatch == nil{
logBatch = &util.LogBatch{}
}
fmt.Println("超时到期了")
fmt.Println(len(logBatch.Logs))
//写入日志
fmt.Println("超时了写入日志")
defer fileLog.Close()
fileLog.WriteLog(logBatch)
// 清空logBatch
logBatch = nil
}
time.Sleep(time.Microsecond*10)
}
}
\ No newline at end of file
//写日志到管道
func SaveMsgToLogChan(msg string) (err error) {
err = json.Unmarshal([]byte(msg),&MsgParams)
G_logSink.Append(MsgParams)
return err
}
//
//func SaveElkLog(LogReportDir string){
// var(
// logBatch *util.LogBatch
// )
// fileLog,_ := util.NewFileLogger(LogReportDir)
// fileLog.Init()
//
// for{
// select{
// case util.MsgParams = <- util.MsgChan:
// if logBatch == nil{
// logBatch = &util.LogBatch{}
// }
// // 把新日志追加到批次中
// logBatch.Logs = append(logBatch.Logs, util.MsgParams)
// // 如果批次满了, 就立即写入
// if len(logBatch.Logs) > 500{
// //写入日志
// fmt.Println("长度到了开始写入日志")
// defer fileLog.Close()
// fileLog.WriteLog(logBatch)
// // 清空logBatch
// logBatch = nil
// }
// case <- time.NewTimer(10*time.Second).C:
//
// if logBatch == nil{
// logBatch = &util.LogBatch{}
// }
// fmt.Println("超时到期了")
// fmt.Println(len(logBatch.Logs))
// //写入日志
// fmt.Println("超时了写入日志")
// defer fileLog.Close()
// fileLog.WriteLog(logBatch)
// // 清空logBatch
// logBatch = nil
// }
//
// time.Sleep(time.Microsecond*10)
// }
//}
\ No newline at end of file
......@@ -17,13 +17,12 @@ type RecvPro struct {
//// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db
func (t *RecvPro) Consumer(dataByte []byte) error {
//logger.Info(string(dataByte))
err := apiMsgService.ToJson(string(dataByte))
//fmt.Println(string(dataByte))
err := apiMsgService.SaveMsgToLogChan(string(dataByte))
if err != nil{
logger.Error("%s",err)
return err
}
apiMsgService.SaveElkLogChan(util.MsgParams)
////return errors.New("顶顶顶顶")
return nil
}
......@@ -37,17 +36,6 @@ func (t *RecvPro) FailAction(dataByte []byte) error {
}
//func initDb(dns string) (err error) {
// err = db.Init(dns)
// if err != nil {
// return
// }
//
// return
//}
var ConfigDir string
var LogDir string
var LogReportDir string
......@@ -70,7 +58,7 @@ func main() {
//初始化配置文件
util.Init(ConfigDir)
util.MsgChan = make(chan *util.DataParams,10)
//util.MsgChan = make(chan *util.DataParams,10)
//
logConfig := make(map[string]string)
......@@ -91,11 +79,11 @@ func main() {
util.Configs.Rabbitmq_ichunt.Dns,
}
_ = apiMsgService.InitLogSink()
go func(){
apiMsgService.SaveElkLog(LogReportDir)
}()
//go func(){
// apiMsgService.SaveElkLog(LogReportDir)
//}()
for{
var wg sync.WaitGroup
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment