forked from transferia/transferia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsource.go
54 lines (47 loc) · 1.17 KB
/
source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package abstract
import (
"sort"
"go.ytsaurus.tech/yt/go/schema"
)
type ColumnSchema struct {
Name string `yson:"name" json:"name"`
YTType schema.Type `yson:"type" json:"type"`
Primary bool `json:"primary"`
}
func ToYtSchema(original []ColSchema, fixAnyTypeInPrimaryKey bool) []schema.Column {
result := make([]schema.Column, len(original))
for idx, el := range original {
result[idx] = schema.Column{
Name: el.ColumnName,
Expression: el.Expression,
Type: schema.Type(el.DataType),
}
if el.PrimaryKey {
result[idx].SortOrder = schema.SortAscending
if result[idx].Type == schema.TypeAny && fixAnyTypeInPrimaryKey {
result[idx].Type = schema.TypeString // should not use any as keys
}
}
}
sort.Slice(result, func(i, j int) bool {
return result[i].SortOrder != schema.SortNone
})
return result
}
type SourceReader struct {
TotalCount int
Reader chan map[string]interface{}
Name string
RawSchema []ColumnSchema
Schema string
Table string
Lsn uint64
CommitTime uint64
}
type Source interface {
Run(sink AsyncSink) error
Stop()
}
type Fetchable interface {
Fetch() ([]ChangeItem, error)
}