Skip to content

Commit a03758d

Browse files
committed
feat(mq): add rabbitmq support
1 parent 9b760b5 commit a03758d

File tree

7 files changed

+483
-0
lines changed

7 files changed

+483
-0
lines changed

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ require (
1515
github.com/parnurzeal/gorequest v0.2.16
1616
github.com/patrickmn/go-cache v2.1.0+incompatible
1717
github.com/pkg/errors v0.9.1
18+
github.com/rabbitmq/amqp091-go v1.9.0
1819
github.com/samber/lo v1.38.1
1920
github.com/satori/go.uuid v1.2.0
2021
github.com/sethvargo/go-envconfig v0.9.0
22+
github.com/silenceper/wechat/v2 v2.1.6
2123
github.com/soheilhy/cmux v0.1.5
2224
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0
2325
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0

go.sum

+36
Large diffs are not rendered by default.

mq/consumer.go

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package mq
2+
3+
import (
4+
"fmt"
5+
amqp "github.com/rabbitmq/amqp091-go"
6+
)
7+
8+
type Consumer struct {
9+
Conn *amqp.Connection
10+
Channel *amqp.Channel
11+
Tag string
12+
Done chan error
13+
Deliveries <-chan amqp.Delivery
14+
}
15+
16+
func NewConsumer(amqpURI, key string) (*Consumer, error) {
17+
c := &Consumer{
18+
Conn: nil,
19+
Channel: nil,
20+
Tag: key,
21+
Done: make(chan error),
22+
}
23+
24+
var err error
25+
26+
c.Conn, c.Channel, err = initConnection(amqpURI)
27+
if err != nil {
28+
return nil, err
29+
}
30+
31+
err = declareDirect(c.Channel, exchangeName, key)
32+
if err != nil {
33+
return nil, err
34+
}
35+
36+
c.Deliveries, err = c.Channel.Consume(
37+
key, // name
38+
key, // consumerTag,
39+
false, // autoAck
40+
false, // exclusive
41+
false, // noLocal
42+
false, // noWait
43+
nil, // arguments
44+
)
45+
if err != nil {
46+
return nil, fmt.Errorf("queue Consume: %s", err)
47+
}
48+
49+
return c, nil
50+
}
51+
52+
func (c *Consumer) Shutdown() error {
53+
// will close() the Deliveries Channel
54+
if err := c.Channel.Cancel(c.Tag, true); err != nil {
55+
return fmt.Errorf("consumer cancel failed: %s", err)
56+
}
57+
58+
if err := c.Conn.Close(); err != nil {
59+
return fmt.Errorf("AMQP connection close error: %s", err)
60+
}
61+
62+
defer fmt.Printf("AMQP shutdown OK")
63+
64+
// wait for exit
65+
return <-c.Done
66+
}

mq/init.go

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package mq
2+
3+
import (
4+
"fmt"
5+
amqp "github.com/rabbitmq/amqp091-go"
6+
)
7+
8+
const (
9+
exchangeName = "notify"
10+
11+
WechatKey = "notify.wechat"
12+
DingTalkKey = "notify.dingtalk"
13+
14+
UserIdKey = "userId"
15+
)
16+
17+
func initConnection(amqpURI string) (conn *amqp.Connection, channel *amqp.Channel, err error) {
18+
config := amqp.Config{Properties: amqp.NewConnectionProperties()}
19+
config.Properties.SetClientConnectionName("sample-consumer")
20+
fmt.Printf("dialing %q", amqpURI)
21+
conn, err = amqp.DialConfig(amqpURI, config)
22+
if err != nil {
23+
return nil, nil, fmt.Errorf("dial: %s", err)
24+
}
25+
26+
go func() {
27+
fmt.Printf("closing: %s", <-conn.NotifyClose(make(chan *amqp.Error)))
28+
}()
29+
30+
fmt.Printf("got Connection, getting Channel")
31+
channel, err = conn.Channel()
32+
if err != nil {
33+
return nil, nil, fmt.Errorf("channel: %s", err)
34+
}
35+
36+
return conn, channel, nil
37+
}
38+
39+
func declareDirect(channel *amqp.Channel, exchange, key string) error {
40+
err := channel.ExchangeDeclare(
41+
exchange,
42+
"direct",
43+
true,
44+
false,
45+
false,
46+
false,
47+
amqp.Table{},
48+
)
49+
if err != nil {
50+
return err
51+
}
52+
53+
_, err = channel.QueueDeclare(key,
54+
true, false, false, false, nil)
55+
if err != nil {
56+
return err
57+
}
58+
59+
err = channel.QueueBind(key, key, exchange, false, nil)
60+
if err != nil {
61+
return err
62+
}
63+
64+
return nil
65+
}

mq/notice.go

+168
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package mq
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"github.com/silenceper/wechat/v2/officialaccount/message"
7+
)
8+
9+
type NoticeTemplate struct {
10+
ctx context.Context
11+
StaffID []string `json:"StaffID"`
12+
WeChat *WeChatNoticeBody `json:"template,omitempty"`
13+
DingTalk *DingTalkNoticeBody `json:"dingtalk,omitempty"`
14+
}
15+
16+
func New() *NoticeTemplate {
17+
return &NoticeTemplate{}
18+
}
19+
20+
func (t *NoticeTemplate) WithContext(ctx context.Context) *NoticeTemplate {
21+
t.ctx = ctx
22+
return t
23+
}
24+
25+
func (t *NoticeTemplate) Receiver(staffId ...string) *NoticeTemplate {
26+
t.StaffID = append(t.StaffID, staffId...)
27+
return t
28+
}
29+
30+
func (t *NoticeTemplate) NewWeChat() *WeChatNoticeBody {
31+
t.WeChat = &WeChatNoticeBody{}
32+
return t.WeChat
33+
}
34+
35+
func (t *NoticeTemplate) NewDingTalk() *DingTalkNoticeBody {
36+
t.DingTalk = &DingTalkNoticeBody{}
37+
return t.DingTalk
38+
}
39+
40+
type WeChatNoticeBody struct {
41+
Data map[string]*message.TemplateDataItem `json:"data"`
42+
TemplateID string `json:"template_id"`
43+
ToUser string `json:"touser"`
44+
URL string `json:"url"`
45+
}
46+
47+
func (t *WeChatNoticeBody) SetTmpl(templateId string) *WeChatNoticeBody {
48+
t.TemplateID = templateId
49+
return t
50+
}
51+
52+
func (t *WeChatNoticeBody) SetData(key, value, color string) *WeChatNoticeBody {
53+
if t.Data == nil {
54+
t.Data = make(map[string]*message.TemplateDataItem)
55+
}
56+
t.Data[key] = &message.TemplateDataItem{Value: value, Color: color}
57+
return t
58+
}
59+
60+
func (t *WeChatNoticeBody) Url(url string) *WeChatNoticeBody {
61+
t.URL = url
62+
return t
63+
}
64+
65+
type DingTalkNoticeBody struct {
66+
MsgType string `json:"msgtype"`
67+
Text *DingTalkMsgPlainTextInput `json:"text,omitempty"`
68+
Link *DingTalkMsgLinkInput `json:"link,omitempty"`
69+
Markdown *DingTalkMsgMarkdownInput `json:"markdown,omitempty"`
70+
}
71+
72+
func (t *DingTalkNoticeBody) PlainText(content string) {
73+
t.MsgType = "text"
74+
t.Text = &DingTalkMsgPlainTextInput{
75+
Content: content,
76+
}
77+
}
78+
79+
func (t *DingTalkNoticeBody) LinkMsg(messageUrl, picUrl, title, text string) {
80+
t.MsgType = "link"
81+
t.Link = &DingTalkMsgLinkInput{
82+
MessageUrl: messageUrl,
83+
PicUrl: picUrl,
84+
Title: title,
85+
Text: text,
86+
}
87+
}
88+
89+
func (t *DingTalkNoticeBody) MarkdownMsg(title, text string) {
90+
t.MsgType = "markdown"
91+
t.Markdown = &DingTalkMsgMarkdownInput{
92+
Title: title,
93+
Text: text,
94+
}
95+
}
96+
97+
type DingTalkMsgPlainTextInput struct {
98+
Content string `json:"content"`
99+
}
100+
101+
type DingTalkMsgLinkInput struct {
102+
MessageUrl string `json:"messageUrl"`
103+
PicUrl string `json:"picUrl"`
104+
Title string `json:"title"`
105+
Text string `json:"text"`
106+
}
107+
108+
type DingTalkMsgMarkdownInput struct {
109+
Title string `json:"title"` // 这个是点进推送列表前显示的简短的消息(进入后不可见)
110+
Text string `json:"text"` // 这个是进入后显示的详细的消息
111+
}
112+
113+
type UniNotifyInput struct {
114+
Title string
115+
SubTitle string
116+
Content string
117+
Extra string
118+
}
119+
120+
type WechatBody struct {
121+
StaffId string `json:"staffId"`
122+
*WeChatNoticeBody
123+
MessageId string `json:"messageId"`
124+
}
125+
126+
func MarshalWechatBody(staffId string, msg *WeChatNoticeBody) (data []byte, err error) {
127+
body := &WechatBody{
128+
StaffId: staffId,
129+
WeChatNoticeBody: msg,
130+
}
131+
if data, err = json.Marshal(body); err != nil {
132+
return nil, err
133+
}
134+
return data, nil
135+
}
136+
137+
func UnmarshalWechatBody(data []byte) (*WechatBody, error) {
138+
body := &WechatBody{}
139+
if err := json.Unmarshal(data, body); err != nil {
140+
return nil, err
141+
}
142+
return body, nil
143+
}
144+
145+
type DingTalkBody struct {
146+
StaffId string `json:"staffId"`
147+
*DingTalkNoticeBody
148+
MessageId string `json:"messageId"`
149+
}
150+
151+
func MarshalDingTalkBody(staffId string, msg *DingTalkNoticeBody) (data []byte, err error) {
152+
body := &DingTalkBody{
153+
StaffId: staffId,
154+
DingTalkNoticeBody: msg,
155+
}
156+
if data, err = json.Marshal(body); err != nil {
157+
return nil, err
158+
}
159+
return data, nil
160+
}
161+
162+
func UnmarshalDingTalkBody(data []byte) (*DingTalkNoticeBody, error) {
163+
body := &DingTalkNoticeBody{}
164+
if err := json.Unmarshal(data, body); err != nil {
165+
return nil, err
166+
}
167+
return body, nil
168+
}

0 commit comments

Comments
 (0)