Skip to content

Commit 49c06bd

Browse files
committed
update from dtm to version v1.15.1
1 parent 02ff444 commit 49c06bd

File tree

14 files changed

+90
-68
lines changed

14 files changed

+90
-68
lines changed

README.md

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Go Client for DTM
2+
3+
There are there packages:
4+
5+
## workflow
6+
Workflow is a new client for DTM. It support the mixed usage of patterns saga, tcc, xa. And it also support the mixed usage of http, grpc and local transactions.
7+
8+
This pattern offers maximum flexibility and can handle a wide range of scenarios. This pattern is highly recommended for transactions that need to be rolled back
9+
10+
Quick start for workflow using http can be found here: [https://github.com/dtm-labs/quick-start-sample/tree/main/workflow-http](https://github.com/dtm-labs/quick-start-sample/tree/main/workflow-http)
11+
12+
Quick start for workflow using grpc can be found here: [https://github.com/dtm-labs/quick-start-sample/tree/main/workflow-grpc](https://github.com/dtm-labs/quick-start-sample/tree/main/workflow-grpc)
13+
14+
Detailed examples can be found here: [https://github.com/dtm-labs/dtm-examples](https://github.com/dtm-labs/dtm-examples)
15+
16+
17+
## dtmcli
18+
dtmcli is the http client for patterns: saga, tcc, msg, xa
19+
20+
Quick start for dtmcli can be found here: [https://github.com/dtm-labs/quick-start-sample/tree/main/dtmcli-qs](https://github.com/dtm-labs/quick-start-sample/tree/main/dtmcli-qs)
21+
22+
Detailed examples can be found here: [https://github.com/dtm-labs/dtm-examples](https://github.com/dtm-labs/dtm-examples)
23+
24+
## dtmgrpc
25+
dtmcli is the grpc client for patterns: saga, tcc, msg, xa
26+
27+
Quick start for dtmgrpc can be found here: [https://github.com/dtm-labs/quick-start-sample/tree/main/dtmgrpc-qs](https://github.com/dtm-labs/quick-start-sample/tree/main/dtmgrpc-qs)
28+
29+
Detailed examples can be found here: [https://github.com/dtm-labs/dtm-examples](https://github.com/dtm-labs/dtm-examples)
30+
31+

dtmcli/dtmimp/trans_base.go

-3
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,6 @@ func TransRequestBranch(t *TransBase, method string, body interface{}, branchID
152152
SetQueryParams(query).
153153
SetHeaders(t.BranchHeaders).
154154
Execute(method, url)
155-
if err == nil {
156-
err = RespAsErrorCompatible(resp)
157-
}
158155
return resp, err
159156
}
160157

dtmcli/dtmimp/trans_xa_base.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func XaHandleLocalTrans(xa *TransBase, dbConf DBConf, cb func(*sql.DB) error) (r
3737
if rerr != nil {
3838
return
3939
}
40-
defer func() { _ = db.Close() }()
40+
defer XaClose(db)
4141
defer DeferDo(&rerr, func() error {
4242
_, err := DBExec(dbConf.Driver, db, GetDBSpecial(dbConf.Driver).GetXaSQL("prepare", xaBranch))
4343
return err

dtmcli/dtmimp/utils.go

+7-17
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"encoding/json"
1212
"errors"
1313
"fmt"
14-
"net/http"
1514
"net/url"
1615
"os"
1716
"runtime"
@@ -182,10 +181,16 @@ func XaDB(conf DBConf) (*sql.DB, error) {
182181
if conf.Driver == DBTypeMysql {
183182
dsn += "&autocommit=0"
184183
}
185-
logger.Infof("opening standalone %s: %s", conf.Driver, strings.Replace(dsn, conf.Password, "****", 1))
184+
logger.Infof("opening xa standalone %s: %s", conf.Driver, strings.Replace(dsn, conf.Password, "****", 1))
186185
return sql.Open(conf.Driver, dsn)
187186
}
188187

188+
// XaClose will log and close the db
189+
func XaClose(db *sql.DB) {
190+
logger.Infof("closing xa db")
191+
_ = db.Close()
192+
}
193+
189194
// DBExec use raw db to exec
190195
func DBExec(dbType string, db DB, sql string, values ...interface{}) (affected int64, rerr error) {
191196
if sql == "" {
@@ -218,21 +223,6 @@ func GetDsn(conf DBConf) string {
218223
return dsn
219224
}
220225

221-
// RespAsErrorCompatible translate a resty response to error
222-
// compatible with version < v1.10
223-
func RespAsErrorCompatible(resp *resty.Response) error {
224-
code := resp.StatusCode()
225-
str := resp.String()
226-
if code == http.StatusTooEarly || strings.Contains(str, ResultOngoing) {
227-
return fmt.Errorf("%s. %w", str, ErrOngoing)
228-
} else if code == http.StatusConflict || strings.Contains(str, ResultFailure) {
229-
return fmt.Errorf("%s. %w", str, ErrFailure)
230-
} else if code != http.StatusOK {
231-
return errors.New(str)
232-
}
233-
return nil
234-
}
235-
236226
// RespAsErrorByJSONRPC translate json rpc resty response to error
237227
func RespAsErrorByJSONRPC(resp *resty.Response) error {
238228
str := resp.String()

dtmcli/dtmimp/vars.go

-3
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@ var ErrOngoing = errors.New("ONGOING")
2424
// if QueryPrepared executed before call. then DoAndSubmit return this error
2525
var ErrDuplicated = errors.New("DUPLICATED")
2626

27-
// XaSQLTimeoutMs milliseconds for Xa sql to timeout
28-
var XaSQLTimeoutMs = 15000
29-
3027
// MapSuccess HTTP result of SUCCESS
3128
var MapSuccess = map[string]interface{}{"dtm_result": ResultSuccess}
3229

dtmcli/trans_msg.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier)
6969
errb := busiCall(bb)
7070
if errb != nil && !errors.Is(errb, ErrFailure) {
7171
// if busicall return an error other than failure, we will query the result
72-
_, err = dtmimp.TransRequestBranch(&s.TransBase, "GET", nil, bb.BranchID, bb.Op, queryPrepared)
72+
_, err = requestBranch(&s.TransBase, "GET", nil, bb.BranchID, bb.Op, queryPrepared)
7373
}
7474
if errors.Is(errb, ErrFailure) || errors.Is(err, ErrFailure) {
7575
_ = dtmimp.TransCallDtm(&s.TransBase, "abort")

dtmcli/trans_tcc.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,5 +71,5 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
7171
if err != nil {
7272
return nil, err
7373
}
74-
return dtmimp.TransRequestBranch(&t.TransBase, "POST", body, branchID, dtmimp.OpTry, tryURL)
74+
return requestBranch(&t.TransBase, "POST", body, branchID, dtmimp.OpTry, tryURL)
7575
}

dtmcli/types.go

-10
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,6 @@ func GetCurrentDBType() string {
3030
return dtmimp.GetCurrentDBType()
3131
}
3232

33-
// SetXaSQLTimeoutMs set XaSQLTimeoutMs
34-
func SetXaSQLTimeoutMs(ms int) {
35-
dtmimp.XaSQLTimeoutMs = ms
36-
}
37-
38-
// GetXaSQLTimeoutMs get XaSQLTimeoutMs
39-
func GetXaSQLTimeoutMs() int {
40-
return dtmimp.XaSQLTimeoutMs
41-
}
42-
4333
// SetBarrierTableName sets barrier table name
4434
func SetBarrierTableName(tablename string) {
4535
dtmimp.BarrierTableName = tablename

dtmcli/utils.go

+28-16
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"fmt"
66
"net/http"
7+
"strings"
78

89
"github.com/dtm-labs/client/dtmcli/dtmimp"
910
"github.com/go-resty/resty/v2"
@@ -19,14 +20,27 @@ func MustGenGid(server string) string {
1920
return res["gid"]
2021
}
2122

22-
// String2DtmError translate string to dtm error
23-
func String2DtmError(str string) error {
24-
return map[string]error{
25-
ResultFailure: ErrFailure,
26-
ResultOngoing: ErrOngoing,
27-
ResultSuccess: nil,
28-
"": nil,
29-
}[str]
23+
// ErrorMessage2Error return an error fmt.Errorf("%s %w", errMsg, err) but trim out duplicate wrap
24+
// eg. ErrorMessage2Error("an error. FAILURE", ErrFailure) return an error with message: "an error. FAILURE",
25+
// no additional " FAILURE" added
26+
func ErrorMessage2Error(errMsg string, err error) error {
27+
errMsg = strings.TrimSuffix(errMsg, " "+err.Error())
28+
return fmt.Errorf("%s %w", errMsg, err)
29+
}
30+
31+
// HTTPResp2DtmError translate a resty response to error
32+
// compatible with version < v1.10
33+
func HTTPResp2DtmError(resp *resty.Response) error {
34+
code := resp.StatusCode()
35+
str := resp.String()
36+
if code == http.StatusTooEarly || strings.Contains(str, ResultOngoing) {
37+
return ErrorMessage2Error(str, ErrOngoing)
38+
} else if code == http.StatusConflict || strings.Contains(str, ResultFailure) {
39+
return ErrorMessage2Error(str, ErrFailure)
40+
} else if code != http.StatusOK {
41+
return errors.New(str)
42+
}
43+
return nil
3044
}
3145

3246
// Result2HttpJSON return the http code and json result
@@ -51,12 +65,10 @@ func Result2HttpJSON(result interface{}) (code int, res interface{}) {
5165
return
5266
}
5367

54-
// IsRollback returns whether the result is indicating rollback
55-
func IsRollback(resp *resty.Response, err error) bool {
56-
return err == ErrFailure || dtmimp.RespAsErrorCompatible(resp) == ErrFailure
57-
}
58-
59-
// IsOngoing returns whether the result is indicating ongoing
60-
func IsOngoing(resp *resty.Response, err error) bool {
61-
return err == ErrOngoing || dtmimp.RespAsErrorCompatible(resp) == ErrOngoing
68+
func requestBranch(t *dtmimp.TransBase, method string, body interface{}, branchID string, op string, url string) (*resty.Response, error) {
69+
resp, err := dtmimp.TransRequestBranch(t, method, body, branchID, op, url)
70+
if err == nil {
71+
err = HTTPResp2DtmError(resp)
72+
}
73+
return resp, err
6274
}

dtmcli/xa.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,5 +79,5 @@ func XaGlobalTransaction2(server string, gid string, custom func(*Xa), xaFunc Xa
7979
// CallBranch call a xa branch
8080
func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
8181
branchID := x.NewSubBranchID()
82-
return dtmimp.TransRequestBranch(&x.TransBase, "POST", body, branchID, dtmimp.OpAction, url)
82+
return requestBranch(&x.TransBase, "POST", body, branchID, dtmimp.OpAction, url)
8383
}

dtmgrpc/type.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ package dtmgrpc
99
import (
1010
context "context"
1111
"errors"
12-
"fmt"
1312

1413
"github.com/dtm-labs/client/dtmcli"
1514
"github.com/dtm-labs/client/dtmcli/dtmimp"
@@ -40,9 +39,9 @@ func GrpcError2DtmError(err error) error {
4039
if st.Message() == dtmcli.ResultOngoing {
4140
return dtmcli.ErrOngoing
4241
}
43-
return fmt.Errorf("%s. %w", st.Message(), dtmcli.ErrFailure)
42+
return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrFailure)
4443
} else if st != nil && st.Code() == codes.FailedPrecondition {
45-
return fmt.Errorf("%s. %w", st.Message(), dtmcli.ErrOngoing)
44+
return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrOngoing)
4645
}
4746
return err
4847
}

workflow/imp.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (wf *Workflow) loadProgresses() error {
3939
Data: p.BinData,
4040
}
4141
if sr.Status == dtmcli.StatusFailed {
42-
sr.Error = fmt.Errorf("%s. %w", string(p.BinData), dtmcli.ErrFailure)
42+
sr.Error = dtmcli.ErrorMessage2Error(string(p.BinData), dtmcli.ErrFailure)
4343
}
4444
wf.progresses[p.BranchID+"-"+p.Op] = sr
4545
}
@@ -109,7 +109,7 @@ func (wf *Workflow) process(handler WfFunc, data []byte) (err error) {
109109
err = wf.processPhase2(err)
110110
}
111111
if err == nil || errors.Is(err, dtmcli.ErrFailure) {
112-
err1 := wf.submit(wfErrorToStatus(err))
112+
err1 := wf.submit(err)
113113
if err1 != nil {
114114
return err1
115115
}
@@ -178,7 +178,7 @@ func (wf *Workflow) recordedDoInner(fn func(bb *dtmcli.BranchBarrier) *stepResul
178178
}
179179
r := wf.getStepResult()
180180
if r != nil {
181-
logger.Debugf("progress restored: %s %s %v %s %s", branchID, wf.currentOp, r.Error, r.Status, r.Data)
181+
logger.Debugf("progress restored: '%s' '%s' '%v' '%s' '%s'", branchID, wf.currentOp, r.Error, r.Status, r.Data)
182182
return r
183183
}
184184
bb := &dtmcli.BranchBarrier{

workflow/rpc.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,28 @@ func (wf *Workflow) getProgress() ([]*dtmgpb.DtmProgress, error) {
2727
return reply.Progresses, err
2828
}
2929

30-
func (wf *Workflow) submit(status string) error {
30+
func (wf *Workflow) submit(err error) error {
31+
status := wfErrorToStatus(err)
32+
reason := ""
33+
if err != nil {
34+
reason = err.Error()
35+
}
3136
if wf.Protocol == dtmimp.ProtocolHTTP {
3237
m := map[string]interface{}{
3338
"gid": wf.Gid,
3439
"trans_type": wf.TransType,
3540
"req_extra": map[string]string{
36-
"status": status,
41+
"status": status,
42+
"rollback_reason": reason,
3743
},
3844
}
3945
_, err := dtmimp.TransCallDtmExt(wf.TransBase, m, "submit")
4046
return err
4147
}
4248
req := dtmgimp.GetDtmRequest(wf.TransBase)
4349
req.ReqExtra = map[string]string{
44-
"status": status,
50+
"status": status,
51+
"rollback_reason": reason,
4552
}
4653
reply := emptypb.Empty{}
4754
return dtmgimp.MustGetGrpcConn(wf.Dtm, false).Invoke(wf.Context, "/dtmgimp.Dtm/"+"Submit", req, &reply)

workflow/utils.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package workflow
33
import (
44
"bytes"
55
"errors"
6-
"fmt"
76
"io/ioutil"
87
"net/http"
98
"strconv"
@@ -75,9 +74,9 @@ func HTTPResp2DtmError(resp *http.Response) ([]byte, error) {
7574
data, err := ioutil.ReadAll(resp.Body)
7675
resp.Body = ioutil.NopCloser(bytes.NewBuffer(data))
7776
if code == http.StatusTooEarly {
78-
return data, fmt.Errorf("%s. %w", string(data), dtmcli.ErrOngoing)
77+
return data, dtmcli.ErrorMessage2Error(string(data), dtmcli.ErrOngoing)
7978
} else if code == http.StatusConflict {
80-
return data, fmt.Errorf("%s. %w", string(data), dtmcli.ErrFailure)
79+
return data, dtmcli.ErrorMessage2Error(string(data), dtmcli.ErrFailure)
8180
} else if err == nil && code != http.StatusOK {
8281
return data, errors.New(string(data))
8382
}
@@ -88,9 +87,9 @@ func HTTPResp2DtmError(resp *http.Response) ([]byte, error) {
8887
func GrpcError2DtmError(err error) error {
8988
st, _ := status.FromError(err)
9089
if st != nil && st.Code() == codes.Aborted {
91-
return fmt.Errorf("%s. %w", st.Message(), dtmcli.ErrFailure)
90+
return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrFailure)
9291
} else if st != nil && st.Code() == codes.FailedPrecondition {
93-
return fmt.Errorf("%s. %w", st.Message(), dtmcli.ErrOngoing)
92+
return dtmcli.ErrorMessage2Error(st.Message(), dtmcli.ErrOngoing)
9493
}
9594
return err
9695
}
@@ -113,7 +112,7 @@ func (wf *Workflow) stepResultFromGrpc(reply interface{}, err error) *stepResult
113112
if sr.Error == nil {
114113
sr.Data = dtmgimp.MustProtoMarshal(reply.(protoreflect.ProtoMessage))
115114
} else if sr.Status == dtmcli.StatusFailed {
116-
sr.Data = []byte(sr.Error.Error())
115+
sr.Data = []byte(err.Error())
117116
}
118117
return sr
119118
}

0 commit comments

Comments
 (0)