forked from transferia/transferia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstorage.go
373 lines (329 loc) · 10.1 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
package abstract
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/transferia/transferia/library/go/core/xerrors"
"github.com/transferia/transferia/pkg/util"
"github.com/transferia/transferia/pkg/util/set"
)
type LoadProgress func(current, progress, total uint64)
type TableDescription struct {
Name string
Schema string // for example - for mysql here are database name
Filter WhereStatement
EtaRow uint64 // estimated number of rows in the table
Offset uint64 // offset (in rows) along the ordering key (not necessary primary key)
}
var NonExistentTableID TableID = *NewTableID("", "")
func BuildIncludeMap(objects []string) (map[TableID]bool, error) {
includeObjects := map[TableID]bool{}
var errs util.Errors
for _, obj := range objects {
tid, err := ParseTableID(obj)
if err != nil {
errs = append(errs, xerrors.Errorf("object: %s: %w", obj, err))
continue
}
includeObjects[*tid] = true
}
if len(errs) > 0 {
return nil, xerrors.Errorf("unable to parse objects: %w", errs)
}
return includeObjects, nil
}
func SchemaFilterByObjects(schema DBSchema, objects []string) (DBSchema, error) {
if objects == nil {
return schema, nil
}
res := make(DBSchema)
for _, obj := range objects {
id, err := ParseTableID(obj)
if err != nil {
return nil, xerrors.Errorf("unable to parse: %s: %w", obj, err)
}
info, ok := schema[*id]
if !ok {
return nil, xerrors.Errorf("object %s not found in schema", obj)
}
res[*id] = info
}
return res, nil
}
func ParseTableID(object string) (*TableID, error) {
return NewTableIDFromStringPg(object, false)
}
func ParseTableIDs(objects ...string) ([]TableID, error) {
var res []TableID
for _, obj := range objects {
tid, err := ParseTableID(obj)
if err != nil {
return nil, xerrors.Errorf("unable to parse table ID: %s: %w", obj, err)
}
res = append(res, *tid)
}
return res, nil
}
// NewTableIDFromStringPg parses the given FQTN in PostgreSQL syntax to construct a TableID.
func NewTableIDFromStringPg(fqtn string, replaceOmittedSchemaWithPublic bool) (*TableID, error) {
parts, err := identifierToParts(fqtn)
if err != nil {
return nil, xerrors.Errorf("failed to parse identifier '%s' into parts: %w", fqtn, err)
}
switch len(parts) {
case 0:
return nil, xerrors.Errorf("zero-length identifier")
case 1:
if replaceOmittedSchemaWithPublic {
return &TableID{Namespace: "public", Name: parts[0]}, nil
}
return &TableID{Namespace: "", Name: parts[0]}, nil
case 2:
return &TableID{Namespace: parts[0], Name: parts[1]}, nil
default:
return nil, xerrors.Errorf("identifier '%s' contains %d parts instead of maximum two", fqtn, len(parts))
}
}
// identifierToParts separates the given identifier into parts separated by dot.
// Each part may be double-quoted and may contain double-escaped double quotes. These are properly parsed; external double quotes are removed.
func identifierToParts(identifier string) ([]string, error) {
result := make([]string, 0, 2)
idPartBuilder := strings.Builder{}
identifierInProgress := false
insideDoubleQuotedIdentifier := false
previousWasDoubleQuote := false
overFQTNRunes:
for i, r := range identifier {
switch r {
case '"':
if !insideDoubleQuotedIdentifier {
if !identifierInProgress {
insideDoubleQuotedIdentifier = true
continue overFQTNRunes
} else {
return nil, xerrors.Errorf("\" outside of a double-quoted name part in position [%d]", i-1)
}
}
if !previousWasDoubleQuote {
previousWasDoubleQuote = true
continue overFQTNRunes
}
previousWasDoubleQuote = false
case '.':
if insideDoubleQuotedIdentifier {
if !previousWasDoubleQuote {
// just a dot inside an escaped identifier
break
}
// previous character (double quote) was an end of an escaped identifier
previousWasDoubleQuote = false
insideDoubleQuotedIdentifier = false
}
if !identifierInProgress {
return nil, xerrors.Errorf("zero-length name part ending in position [%d]", i)
}
result = append(result, idPartBuilder.String())
idPartBuilder.Reset()
identifierInProgress = false
continue overFQTNRunes
default:
if previousWasDoubleQuote {
return nil, xerrors.Errorf("unescaped \" in position [%d]", i-1)
}
}
identifierInProgress = true
_, _ = idPartBuilder.WriteRune(r)
}
if !identifierInProgress {
return nil, xerrors.Errorf("zero-length name part ending in position [%d]", len(identifier)-1)
}
if insideDoubleQuotedIdentifier {
if !previousWasDoubleQuote {
return nil, xerrors.Errorf("missing ending \" in position [%d]", len(identifier)-1)
}
previousWasDoubleQuote = false
insideDoubleQuotedIdentifier = false
}
result = append(result, idPartBuilder.String())
idPartBuilder.Reset()
identifierInProgress = false
return result, nil
}
type TableInfo struct {
EtaRow uint64
IsView bool
Schema *TableSchema
}
type TableMap map[TableID]TableInfo
func (m *TableMap) Copy() TableMap {
cp := TableMap{}
for id, info := range *m {
cp[id] = info
}
return cp
}
func (m *TableMap) String(withSchema bool) string {
mapCopy := make(map[string]TableInfo)
for k, v := range *m {
var newTableInfo TableInfo
newTableInfo.EtaRow = v.EtaRow
newTableInfo.IsView = v.IsView
if withSchema {
newTableInfo.Schema = v.Schema
}
mapCopy[k.Fqtn()] = newTableInfo
}
result, _ := json.Marshal(mapCopy)
return string(result)
}
func (m *TableMap) NoKeysTables() []TableID {
var noKeyTables []TableID
for tID, tInfo := range *m {
if !tInfo.Schema.Columns().HasPrimaryKey() && !tInfo.Schema.Columns().HasFakeKeys() && !tInfo.IsView {
noKeyTables = append(noKeyTables, tID)
}
}
return noKeyTables
}
func (m *TableMap) FakePkeyTables() []TableID {
var fakePkeyTables []TableID
for tID, tInfo := range *m {
if tInfo.Schema.Columns().HasFakeKeys() {
fakePkeyTables = append(fakePkeyTables, tID)
}
}
return fakePkeyTables
}
func (m *TableMap) ConvertToTableDescriptions() []TableDescription {
tableDescriptions := make([]TableDescription, 0, len(*m))
for tID, tInfo := range *m {
tableDescriptions = append(tableDescriptions, TableDescription{
Name: tID.Name,
Schema: tID.Namespace,
EtaRow: tInfo.EtaRow,
Filter: "",
Offset: 0,
})
}
return tableDescriptions
}
func (m *TableMap) ToDBSchema() DBSchema {
resultSchema := make(DBSchema)
for tableID, tableInfo := range *m {
resultSchema[tableID] = tableInfo.Schema
}
return resultSchema
}
func (t *TableDescription) ID() TableID {
return TableID{Namespace: t.Schema, Name: t.Name}
}
func (t *TableDescription) PartID() string {
if t.Offset == 0 && t.Filter == "" {
// This needed for s3, see: https://github.com/transferia/transferia/review/3538625
return ""
}
asJSONString := fmt.Sprintf(`{"schema": "%s","name": "%s","filter": "%s","offset": %d}`,
t.Schema, t.Name, t.Filter, t.Offset)
return util.Hash(asJSONString)
}
func (t *TableDescription) Same(table string) bool {
if t.Name == table {
return true
}
if fmt.Sprintf("%v.%v", t.Schema, t.Name) == table {
return true
}
if fmt.Sprintf("%v.\"%v\"", t.Schema, t.Name) == table {
return true
}
if t.Fqtn() == table {
return true
}
return false
}
func (t *TableDescription) Fqtn() string {
return t.ID().Fqtn()
}
func (t *TableDescription) String() string {
return fmt.Sprintf("%s [filter %q offset %d]", t.Fqtn(), t.Filter, t.Offset)
}
// TableIDsIntersection returns an intersection of two lists of TableIDs
func TableIDsIntersection(a []TableID, b []TableID) []TableID {
if len(b) == 0 {
return a
}
if len(a) == 0 {
return b
}
// both sets are not empty, find an intersection
resultSet := set.New[TableID]()
for iA := range a {
for iB := range b {
if a[iA].Includes(b[iB]) {
resultSet.Add(b[iB])
} else if b[iB].Includes(a[iA]) {
resultSet.Add(a[iA])
}
}
}
return resultSet.SortedSliceFunc(func(a, b TableID) bool {
return a.Less(b) < 0
})
}
// Storage is for simple storage implementations
// For extra functionalities implement below storages.
type Storage interface {
Closeable
Ping() error
LoadTable(ctx context.Context, table TableDescription, pusher Pusher) error
TableSchema(ctx context.Context, table TableID) (*TableSchema, error)
TableList(filter IncludeTableList) (TableMap, error)
ExactTableRowsCount(table TableID) (uint64, error)
EstimateTableRowsCount(table TableID) (uint64, error)
TableExists(table TableID) (bool, error)
}
// PositionalStorage some storages may provide specific position for snapshot consistency
type PositionalStorage interface {
// Position provide info about snapshot read position
Position(ctx context.Context) (*LogPosition, error)
}
type LogPosition struct {
ID uint32
LSN uint64
TxID string
}
// SchemaStorage allow to resolve DB Schema from storage
type SchemaStorage interface {
LoadSchema() (DBSchema, error)
}
// SampleableStorage is for dataplane tests
type SampleableStorage interface {
Storage
TableSizeInBytes(table TableID) (uint64, error)
LoadTopBottomSample(table TableDescription, pusher Pusher) error
LoadRandomSample(table TableDescription, pusher Pusher) error
LoadSampleBySet(table TableDescription, keySet []map[string]interface{}, pusher Pusher) error
TableAccessible(table TableDescription) bool
}
// ShardingStorage is for in table sharding
type ShardingStorage interface {
ShardTable(ctx context.Context, table TableDescription) ([]TableDescription, error)
}
// Storage has data, that need to be shared with all workers
type ShardingContextStorage interface {
// ShardingContext Return shared data, used on *MAIN* worker;
// Take care, method return OperationState_ShardedUploadState, but only fill field Context;
// Because type of Context is private, this is protoc thing;
ShardingContext() ([]byte, error)
// SetShardingContext for storage, used on *SECONDARY* worker
SetShardingContext(shardedState []byte) error
}
type IncrementalStorage interface {
GetIncrementalState(ctx context.Context, incremental []IncrementalTable) ([]TableDescription, error)
SetInitialState(tables []TableDescription, incremental []IncrementalTable)
}
type SnapshotableStorage interface {
BeginSnapshot(ctx context.Context) error
EndSnapshot(ctx context.Context) error
}