-
Notifications
You must be signed in to change notification settings - Fork 2
/
task.go
155 lines (124 loc) · 3.01 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package pico
import (
"sync"
"github.com/pkg/errors"
)
type Task struct {
// FileProgress is measured by file counts
Progress
// wg waits for all convertor completed
wg *sync.WaitGroup
// Convertors are used to convert PDF to images
Convertors []*Convertor
// params is the final computed arguments used to invoke the conversion call
params *Parameters
// Entries is the channel of conversion progress entry
// the format will be ["currentPage" "lastPage" "filename" "workerId"]
Entries chan []string
// done is the channel that, when it is closed, all the task is completed
done chan interface{}
}
// SingleTask deals with single document conversion where usually the given pdf
// is a large file so we split it (evenly) into parts by page ranges and dispatch
// them to every convertor.
type SingleTask struct {
Task
}
// BatchTask deals with multiple documents conversion where each convertor converts
// single document.
type BatchTask struct {
Task
}
func (t *Task) wait() {
t.wg.Wait()
t.params.cancel()
close(t.Entries)
close(t.done)
}
func (t *Task) Completed() bool {
select {
case <-t.done:
return true
default:
return false
}
}
func (t *Task) Aborted() bool {
return false
}
// Wait hijacks the EntryChan and wait for all the workers finish
func (t *Task) Wait() {
for range t.Entries {
}
<-t.done
}
// WaitAndCollect acts like Wait() but collects all the entries into a slice.
// A empty array is returned if there is no entry received.
func (t *Task) WaitAndCollect() (entries [][]string) {
entries = make([][]string, 0)
for entry := range t.Entries {
entries = append(entries, entry)
}
<-t.done
return entries
}
func (t *Task) Errors() (errs []*ConversionError) {
<-t.done
for _, c := range t.Convertors {
errs = append(errs, c.Errors()...)
}
return
}
func (t *Task) Error() error {
if errs := t.Errors(); len(errs) > 0 {
return errs[0]
}
return nil
}
func (t *Task) buildConvertor(index int32) *Convertor {
return &Convertor{
t: t,
id: index,
done: make(chan interface{}),
}
}
func newSingleTask(p *Parameters) *SingleTask {
return &SingleTask{Task{
wg: &sync.WaitGroup{},
done: make(chan interface{}),
params: p,
Entries: make(chan []string, p.pageCount),
}}
}
func newBatchTask(p *Parameters) *BatchTask {
return &BatchTask{Task{
wg: &sync.WaitGroup{},
done: make(chan interface{}),
params: p,
Entries: make(chan []string, 200),
}}
}
// Start initiates the conversion process
func (t *SingleTask) Start(pdf string) error {
for i := int32(0); i < t.params.job; i++ {
c := t.buildConvertor(i)
t.wg.Add(1)
t.Convertors = append(t.Convertors, c)
if err := c.start(pdf); err != nil {
t.params.cancel()
return errors.Wrap(err, "failed to start convertor")
}
}
go t.wait()
return nil
}
func (t *BatchTask) Start(provider PdfProvider) error {
for i := int32(0); i < t.params.job; i++ {
c := t.buildConvertor(i)
t.wg.Add(1)
t.Convertors = append(t.Convertors, c)
go c.startAsWorker(provider)
}
go t.wait()
return nil
}