Commit e8f01daf by 朱继来

Merge branch 'master' of http://119.23.72.7/sunlong_v5/golang-queue-server into…

Merge branch 'master' of http://119.23.72.7/sunlong_v5/golang-queue-server into zjl_account_20200413
parents 023ab72f 2bf56014
...@@ -14,3 +14,8 @@ host="192.168.1.235" ...@@ -14,3 +14,8 @@ host="192.168.1.235"
password="icDb29mLy2s" password="icDb29mLy2s"
port="6379" port="6379"
[rabbitmq_order_push_stock]
queue_name="order_push_stock"
dns="amqp://guest:guest@192.168.2.232:5672/"
package Order package Order
import ( import (
_"database/sql" _ "database/sql"
_"fmt" "fmt"
_ "fmt"
"github.com/ichunt2019/logger" "github.com/ichunt2019/logger"
"go-queue-server/dal/db" "github.com/streadway/amqp"
"go-queue-server/util" "go-queue-server/util"
_"time" "go-queue-server/dal/db"
_ "time"
) )
func initDb(dns string) (err error) {
err = db.Init(dns)
if err != nil {
return
}
return
}
func UpdateStatus(order_id int, status_extend int) (err error) { func UpdateStatus(order_id int, status_extend int) (err error) {
initDb(util.Configs.Liexin_databases.Dns) //初始化db
_, err1 := db.DB.Exec("update lie_order set status_extend = ?, wms_syn = ? where order_id = ?", status_extend, 1, order_id) _, err1 := db.DB.Exec("update lie_order set status_extend = ?, wms_syn = ? where order_id = ?", status_extend, 1, order_id)
...@@ -28,5 +22,53 @@ func UpdateStatus(order_id int, status_extend int) (err error) { ...@@ -28,5 +22,53 @@ func UpdateStatus(order_id int, status_extend int) (err error) {
return err1 return err1
} }
err = makeOrder(order_id)
return err return err
}
func makeOrder(order_id int) (err error) {
conn, err := amqp.Dial(util.Configs.Rabbitmq_order_push_stock.Dns)
if (err != nil) {
logger.Fatal(fmt.Sprintf(" err %s 推入仓库失败 订单号 %d", err, order_id))
}
defer conn.Close()
//通道
ch, err := conn.Channel()
if (err != nil) {
logger.Fatal(fmt.Sprintf(" err %s 推入仓库失败 订单号 %d", err, order_id))
}
defer ch.Close()
//设置数据
q, err := ch.QueueDeclare(
util.Configs.Rabbitmq_order_push_stock.QueueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if (err != nil) {
logger.Fatal(fmt.Sprintf(" err %s 推入仓库失败 订单号 %d", err, order_id))
}
body := fmt.Sprintf("{\"order_id\":%d}", order_id)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if (err != nil) {
logger.Fatal(fmt.Sprintf(" err %s 推入仓库失败 订单号 %d", err, order_id))
}
return
} }
\ No newline at end of file
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"github.com/ichunt2019/logger" "github.com/ichunt2019/logger"
"go-queue-server/dal/db" "go-queue-server/dal/db"
"go-queue-server/util"
"log" "log"
"time" "time"
) )
...@@ -20,17 +19,8 @@ type ActionLog struct { ...@@ -20,17 +19,8 @@ type ActionLog struct {
CreateTime int `db:"create_time"` CreateTime int `db:"create_time"`
} }
func initDb(dns string) (err error) {
err = db.Init(dns)
if err != nil {
return
}
return
}
func AddLog() (err error) { func AddLog() (err error) {
initDb(util.Configs.Liexin_databases.Dns) //初始化db
// QueryRow() // QueryRow()
// Query() // Query()
......
...@@ -2,11 +2,10 @@ package UserGroup ...@@ -2,11 +2,10 @@ package UserGroup
import ( import (
"database/sql" "database/sql"
_"fmt" "encoding/json"
_ "fmt"
"github.com/ichunt2019/logger" "github.com/ichunt2019/logger"
"go-queue-server/dal/db" "go-queue-server/dal/db"
"go-queue-server/util"
"encoding/json"
) )
type UserGroup struct { type UserGroup struct {
...@@ -21,18 +20,11 @@ type UserGroup struct { ...@@ -21,18 +20,11 @@ type UserGroup struct {
CreateTime int `db:"create_time"` CreateTime int `db:"create_time"`
} }
func initDb(dns string) (err error) {
err = db.Init(dns)
if err != nil {
return
}
return
}
// 单行查询 // 单行查询
func QueryRow(order_id int) []byte { func QueryRow(order_id int) []byte {
initDb(util.Configs.Liexin_databases.Dns) //初始化db //initDb(util.Configs.Liexin_databases.Dns) //初始化db
var userGroup UserGroup var userGroup UserGroup
...@@ -50,7 +42,7 @@ func QueryRow(order_id int) []byte { ...@@ -50,7 +42,7 @@ func QueryRow(order_id int) []byte {
// 更新参团状态 // 更新参团状态
func UpdateStatus(order_id int) (err error) { func UpdateStatus(order_id int) (err error) {
initDb(util.Configs.Liexin_databases.Dns) //初始化db //initDb(util.Configs.Liexin_databases.Dns) //初始化db
_, err1 := db.DB.Exec("update lie_user_group set status = ?, is_assign = ? where order_id = ?", 1, 1, order_id) _, err1 := db.DB.Exec("update lie_user_group set status = ?, is_assign = ? where order_id = ?", 1, 1, order_id)
......
...@@ -3,7 +3,6 @@ package UserGroupJoin ...@@ -3,7 +3,6 @@ package UserGroupJoin
import ( import (
"github.com/ichunt2019/logger" "github.com/ichunt2019/logger"
"go-queue-server/dal/db" "go-queue-server/dal/db"
"go-queue-server/util"
"time" "time"
) )
...@@ -18,7 +17,6 @@ func initDb(dns string) (err error) { ...@@ -18,7 +17,6 @@ func initDb(dns string) (err error) {
// 新增虚拟参团人员 // 新增虚拟参团人员
func InsertGroupJoin(group_id int, order_id int, account string) (err error) { func InsertGroupJoin(group_id int, order_id int, account string) (err error) {
initDb(util.Configs.Liexin_databases.Dns) //初始化db
current := time.Now().Unix() current := time.Now().Unix()
_, err1 := db.DB.Exec("insert into lie_user_group_join (group_id, account, order_id, join_time) values (?, ?, ?, ?)", group_id, account, order_id, current) _, err1 := db.DB.Exec("insert into lie_user_group_join (group_id, account, order_id, join_time) values (?, ?, ?, ?)", group_id, account, order_id, current)
......
...@@ -9,8 +9,10 @@ require ( ...@@ -9,8 +9,10 @@ require (
github.com/go-sql-driver/mysql v1.4.1 github.com/go-sql-driver/mysql v1.4.1
github.com/gomodule/redigo v2.0.0+incompatible github.com/gomodule/redigo v2.0.0+incompatible
github.com/ichunt2019/go-msgserver v1.0.4 github.com/ichunt2019/go-msgserver v1.0.4
github.com/ichunt2019/go-rabbitmq v1.0.1 // indirect
github.com/ichunt2019/logger v1.0.5 github.com/ichunt2019/logger v1.0.5
github.com/jmoiron/sqlx v1.2.0 github.com/jmoiron/sqlx v1.2.0
github.com/prometheus/common v0.7.0 // indirect github.com/prometheus/common v0.7.0 // indirect
github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71
github.com/tealeg/xlsx v1.0.5 github.com/tealeg/xlsx v1.0.5
) )
...@@ -35,6 +35,8 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ ...@@ -35,6 +35,8 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/ichunt2019/go-msgserver v1.0.4 h1:4BsRE1Ca4J0h9xyDqUF0pRMpMjDC5G7023hESrzszsY= github.com/ichunt2019/go-msgserver v1.0.4 h1:4BsRE1Ca4J0h9xyDqUF0pRMpMjDC5G7023hESrzszsY=
github.com/ichunt2019/go-msgserver v1.0.4/go.mod h1:fWAvbry0W9nhmkqgT2agwRqYCWlguUJXgy2rgoWOUmA= github.com/ichunt2019/go-msgserver v1.0.4/go.mod h1:fWAvbry0W9nhmkqgT2agwRqYCWlguUJXgy2rgoWOUmA=
github.com/ichunt2019/go-msgserver v1.0.5 h1:yanQ95Ld0etJzVhyZql0jhXXE18qGFX48qFgTCim5hg= github.com/ichunt2019/go-msgserver v1.0.5 h1:yanQ95Ld0etJzVhyZql0jhXXE18qGFX48qFgTCim5hg=
github.com/ichunt2019/go-rabbitmq v1.0.1 h1:qHhpGm9v7jnhSBo3f3viX+BSky9yugp9lCSV03eYsF4=
github.com/ichunt2019/go-rabbitmq v1.0.1/go.mod h1:TQsZ1XWULyvm4UwpYHwNPtOXYbuVvLLI0GM7g/BRy68=
github.com/ichunt2019/logger v1.0.3 h1:sH4HfpzYIP9jGGx2AGqN1vRMaph299jB5/L7mwfUMwY= github.com/ichunt2019/logger v1.0.3 h1:sH4HfpzYIP9jGGx2AGqN1vRMaph299jB5/L7mwfUMwY=
github.com/ichunt2019/logger v1.0.3/go.mod h1:5IWMrrqJIWwOIGav9ACWOI+KOuYeteUvOei4zubclwg= github.com/ichunt2019/logger v1.0.3/go.mod h1:5IWMrrqJIWwOIGav9ACWOI+KOuYeteUvOei4zubclwg=
github.com/ichunt2019/logger v1.0.4 h1:y8xfaOLk/5Q++YBoq3x+NCf5Z4WpsQe4juCD2n/ul14= github.com/ichunt2019/logger v1.0.4 h1:y8xfaOLk/5Q++YBoq3x+NCf5Z4WpsQe4juCD2n/ul14=
...@@ -79,6 +81,8 @@ github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4 ...@@ -79,6 +81,8 @@ github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71 h1:2MR0pKUzlP3SGgj5NYJe/zRYDwOu9ku6YHy+Iw7l5DM=
github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
......
...@@ -3,7 +3,7 @@ package main ...@@ -3,7 +3,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"flag" "flag"
_"fmt" "fmt"
"github.com/ichunt2019/logger" "github.com/ichunt2019/logger"
"go-queue-server/dal/db" "go-queue-server/dal/db"
"go-queue-server/util" "go-queue-server/util"
...@@ -73,7 +73,9 @@ func main() { ...@@ -73,7 +73,9 @@ func main() {
RedisConn = redis_connect(util.Configs) RedisConn = redis_connect(util.Configs)
t1 := time.NewTimer(time.Minute * 5) t1 := time.NewTimer(time.Second * 5)
for { for {
select { select {
...@@ -110,7 +112,7 @@ func main() { ...@@ -110,7 +112,7 @@ func main() {
} }
//t1.Reset(time.Second * 1) //t1.Reset(time.Second * 1)
t1.Reset(time.Minute * 5) t1.Reset(time.Second * 5)
} }
} }
} }
...@@ -141,7 +143,7 @@ func assign() { ...@@ -141,7 +143,7 @@ func assign() {
for _, v := range tuangou_order { for _, v := range tuangou_order {
value, _ := v.([]byte) value, _ := v.([]byte)
order_id, _ := strconv.Atoi(string(value)) order_id, _ := strconv.Atoi(string(value))
//fmt.Println(order_id) fmt.Println(order_id)
// 写入到chan // 写入到chan
select { select {
......
...@@ -2,14 +2,15 @@ package main ...@@ -2,14 +2,15 @@ package main
import ( import (
"flag" "flag"
"fmt" _ "fmt"
"github.com/ichunt2019/go-msgserver/utils/rabbitmq" _ "github.com/ichunt2019/go-msgserver/utils/rabbitmq"
"github.com/ichunt2019/go-rabbitmq/utils/rabbitmq"
"github.com/ichunt2019/logger" "github.com/ichunt2019/logger"
"go-queue-server/dal/db" "go-queue-server/dal/db"
"go-queue-server/order/sendPurchaseEmail/sendEmail" "go-queue-server/order/sendPurchaseEmail/sendEmail"
"go-queue-server/util" "go-queue-server/util"
"sync" _ "sync"
"time" _ "time"
) )
...@@ -77,13 +78,13 @@ func main() { ...@@ -77,13 +78,13 @@ func main() {
t := &RecvPro{} t := &RecvPro{}
queueExchange := &rabbitmq.QueueExchange{ //queueExchange := &rabbitmq.QueueExchange{
util.Configs.Rabbitmq_ichunt.QueueName, // util.Configs.Rabbitmq_ichunt.QueueName,
util.Configs.Rabbitmq_ichunt.RoutingKey, // util.Configs.Rabbitmq_ichunt.RoutingKey,
util.Configs.Rabbitmq_ichunt.Exchange, // util.Configs.Rabbitmq_ichunt.Exchange,
util.Configs.Rabbitmq_ichunt.Type, // util.Configs.Rabbitmq_ichunt.Type,
util.Configs.Rabbitmq_ichunt.Dns, // util.Configs.Rabbitmq_ichunt.Dns,
} //}
//for i:=0;i<= 100;i++{ //for i:=0;i<= 100;i++{
// logger.Debug("Debug记录日志555555555555555555555555555555") // logger.Debug("Debug记录日志555555555555555555555555555555")
...@@ -94,24 +95,32 @@ func main() { ...@@ -94,24 +95,32 @@ func main() {
// logger.Fatal("Fatal记录日志555555555555555555555555555555") // logger.Fatal("Fatal记录日志555555555555555555555555555555")
//} //}
for{ //for{
var wg sync.WaitGroup // var wg sync.WaitGroup
fmt.Println("开始执行任务....") // fmt.Println("开始执行任务....")
for i := 0;i<3;i++{ // for i := 0;i<3;i++{
wg.Add(1) // wg.Add(1)
go func(wg *sync.WaitGroup){ // go func(wg *sync.WaitGroup){
mq := rabbitmq.New(queueExchange) // mq := rabbitmq.New(queueExchange)
mq.RegisterReceiver(t) // mq.RegisterReceiver(t)
err :=mq.Start() // err :=mq.Start()
if err != nil{ // if err != nil{
//
fmt.Println(err) // fmt.Println(err)
} // }
wg.Done() // wg.Done()
}(&wg) // }(&wg)
} // }
wg.Wait() // wg.Wait()
fmt.Println("执行任务完成....") // fmt.Println("执行任务完成....")
time.Sleep(time.Microsecond*10) // time.Sleep(time.Microsecond*10)
} //}
rabbitmq.Recv(rabbitmq.QueueExchange{
util.Configs.Rabbitmq_ichunt.QueueName,
util.Configs.Rabbitmq_ichunt.RoutingKey,
util.Configs.Rabbitmq_ichunt.Exchange,
util.Configs.Rabbitmq_ichunt.Type,
util.Configs.Rabbitmq_ichunt.Dns,
},t,3)
} }
\ No newline at end of file
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
type Config struct{ type Config struct{
Liexin_databases *LiexinMysqlConfig Liexin_databases *LiexinMysqlConfig
Rabbitmq_ichunt *RabbitmqIchunt Rabbitmq_ichunt *RabbitmqIchunt
Rabbitmq_order_push_stock *RabbitmqOrderPushStock
Crm_domain *SendMail Crm_domain *SendMail
Ding_msg *Ding Ding_msg *Ding
Api_domain *ApiDomain Api_domain *ApiDomain
...@@ -28,6 +29,11 @@ type RabbitmqIchunt struct { ...@@ -28,6 +29,11 @@ type RabbitmqIchunt struct {
Dns string `toml:"dns"` Dns string `toml:"dns"`
} }
type RabbitmqOrderPushStock struct {
QueueName string `toml:"queue_name"`
Dns string `toml:"dns"`
}
type SendMail struct{ type SendMail struct{
SendMailUrl string `toml:"send_mail"` SendMailUrl string `toml:"send_mail"`
} }
...@@ -66,4 +72,5 @@ func Init(ConfigDir string){ ...@@ -66,4 +72,5 @@ func Init(ConfigDir string){
//fmt.Printf("%+v",Configs.Crm_domain) //fmt.Printf("%+v",Configs.Crm_domain)
//fmt.Printf("%+v",Configs.Rabbitmq_ichunt) //fmt.Printf("%+v",Configs.Rabbitmq_ichunt)
//fmt.Printf("%+v",Configs.Redis_config) //fmt.Printf("%+v",Configs.Redis_config)
//fmt.Printf("%+v",Configs.Rabbitmq_order_push_stock)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment