-
Notifications
You must be signed in to change notification settings - Fork 5
/
subscription.go
119 lines (97 loc) · 3.24 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package messages
import (
"context"
"database/sql"
"fmt"
"math"
"vitess.io/vitess/go/vt/vitessdriver"
)
// subscription allows users to interact with the queue
type subscription struct {
cancelFunc context.CancelFunc
// GetMessage sends through scan targets for rows.Scan
destChan chan []interface{}
// this returns the rows.Scan error back to GetMessage
errChan chan error
// Subscribe requires unique connection string properties,
// so we will manage that on behalf of the user
db *sql.DB
// store db connection details
dbConfig vitessdriver.Configuration
streamSQL string
ackSQL string
failSQL string
}
// Open connects to an underlying Vitess cluster and streams messages. The queue will
// buffer the defined max concurrent number of messages in memory and will block until
// one of the messages is acknowledged.
// using the vitessdriver for database/sql.
// Only a single connection is opened and it remains open until Close is called.
// Context cancellation is respected
func (q *Queue) Open(ctx context.Context, address, target string) error {
// generate the raw subscription
q.s = q.newSubscription()
// create sub-context for subscribe goroutine with cancelFunc for cleanup
var newCtx context.Context
newCtx, q.s.cancelFunc = context.WithCancel(ctx)
// open a direct database connection if needed
if err := q.s.openDB(newCtx, address, target); err != nil {
return err
}
// start a streaming query that will run indefinitely
rows, err := q.s.db.QueryContext(newCtx, q.s.streamSQL)
if err != nil {
return err
}
// this goroutine will be waiting for rows and is the only writer to the channel
go func() {
defer rows.Close()
// we don't need to check for context cancellation, because that is already
// happening behind the scenes in the Vitess database/sql driver
for rows.Next() {
// get a pointer to the scanData struct provided by Get
dest := <-q.s.destChan
// scan into the provided destination fields and also set the error
// this is a synchronous call, so isn't a data race, while getting all the data back to Get
q.s.errChan <- rows.Scan(dest...)
}
}()
return nil
}
func (q *Queue) newSubscription() *subscription {
s := &subscription{
destChan: make(chan []interface{}),
errChan: make(chan error),
streamSQL: fmt.Sprintf("stream * from `%s`", q.Name),
ackSQL: fmt.Sprintf("UPDATE `%s` SET time_acked=?, time_next=null WHERE id=? AND time_acked is null", q.Name),
failSQL: fmt.Sprintf("UPDATE `%s` SET time_next=%d WHERE id=? AND time_acked is null", q.Name, math.MaxInt64),
}
return s
}
func (s *subscription) openDB(ctx context.Context, address, target string) error {
s.dbConfig = vitessdriver.Configuration{
Address: address,
Target: target,
Streaming: true,
}
var err error
s.db, err = vitessdriver.OpenWithConfiguration(s.dbConfig)
if err != nil {
return err
}
s.db.SetMaxOpenConns(1)
s.db.SetMaxIdleConns(1)
return nil
}
// Close drains the processing channel and closes the connection to the database
// TODO: Nack all remaining messages
func (q *Queue) Close() error {
defer func() {
// close the channels before exiting
close(q.s.destChan)
close(q.s.errChan)
}()
// cancel context inside rows.Next()
q.s.cancelFunc()
return q.s.db.Close()
}