-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGoLang之Concurrency多任务独立模式.log
312 lines (243 loc) · 11.1 KB
/
GoLang之Concurrency多任务独立模式.log
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
GoLang之Concurrency多任务独立模式
并发编程的一种常见方式是有多个任务需要同时处理,并且每个任务都可以独立地完成。在Go语言里每个任务都在一个独立的goroutine(协程)里处理,和其他协程之间没有任何通信。下面通过go语言的方式模拟一个经典的独立多任务并发模式,程序创建了3个带有缓冲区的双向通道,所有的工作都会分发给工作协程来处理,协程的总数量和当前机器的处理器数量相当,将不必要的阻塞尽可能降到最低。
PS: 并发编程是go语言的最大优势,而协程是实现这一优势的手段。只要需要在函数调用前加上关键字go,这个函数就会以一个单位协程的形式执行。go语言向线程派发这些函数的执行,当一个协程阻塞时,调度器会把其他协程转移到其他的线程中去执行,从而实现无等待的并行运行。协程的优势是减少资源的context切换提高执行效率,但缺点是不易于调试,需要调度器提供相关调试工具。
package main
import (
"fmt"
"runtime"
)
var workers = runtime.NumCPU()
type Result struct {
jobname string
resultcode int
resultinfo string
}
type Job struct {
jobname string
results chan<- Result
}
func main() {
// go语言里大多数并发程序的开始处都有这一行代码, 但这行代码最终将会是多余的,
// 因为go语言的运行时系统会变得足够聪明以自动适配它所运行的机器
runtime.GOMAXPROCS(runtime.NumCPU())
// 返回当前处理器的数量
//fmt.Println(runtime.GOMAXPROCS(0))
// 返回当前机器的逻辑处理器或者核心的数量
//fmt.Println(runtime.NumCPU())
// 模拟8个工作任务
jobnames := []string{"gerry", "wcdj", "golang", "C++", "Lua", "perl", "python", "C"}
doRequest(jobnames)
}
func doRequest(jobnames []string) {
// 定义需要的channels切片
jobs := make(chan Job, workers)
results := make(chan Result, len(jobnames))
done := make(chan struct{}, workers)
// ---------------------------------------------
/*
* 下面是go协程并发处理的一个经典框架
*/
// 将需要并发处理的任务添加到jobs的channel中
go addJobs(jobs, jobnames, results) // Executes in its own goroutine
// 根据cpu的数量启动对应个数的goroutines从jobs争夺任务进行处理
for i := 0; i < workers; i++ {
go doJobs(done, jobs) // Each executes in its own goroutine
}
// 新创建一个接受结果的routine, 等待所有worker routiines的完成结果, 并将结果通知主routine
go awaitCompletion(done, results)
// 在主routine输出结果
processResults(results)
// ---------------------------------------------
}
func addJobs(jobs chan<- Job, jobnames []string, results chan<- Result) {
for _, jobname := range jobnames {
// 在channel中添加任务
jobs <- Job{jobname, results}
}
close(jobs)
}
func doJobs(done chan<- struct{}, jobs <-chan Job) {
// 在channel中取出任务并计算
for job := range jobs {
/*
* 定义类型自己的方法来处理业务逻辑, 实现框架和业务分离
*/
job.Do()
}
// 所有任务完成后的结束标志, 一个空结构体切片
done <- struct{}{}
}
// 方法是作用在自定义类型的值上的一类特殊函数
func (job Job) Do() {
// 打印当前处理的任务名称
fmt.Printf("... doing work in [%s]\n", job.jobname)
// 模拟处理结果
if job.jobname == "golang" {
job.results <- Result{job.jobname, 0, "OK"}
} else {
job.results <- Result{job.jobname, -1, "Error"}
}
}
func awaitCompletion(done <-chan struct{}, results chan Result) {
for i := 0; i < workers; i++ {
<-done
}
close(results)
}
func processResults(results <-chan Result) {
for result := range results {
fmt.Printf("done: %s,%d,%s\n", result.jobname, result.resultcode, result.resultinfo)
}
}
一个实际的例子:
package main
import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"os"
"path/filepath"
"regexp"
"runtime"
)
var workers = runtime.NumCPU()
type Result struct {
filename string
lino int
line string
}
type Job struct {
filename string
results chan<- Result
}
func main() {
// go语言里大多数并发程序的开始处都有这一行代码, 但这行代码最终将会是多余的,
// 因为go语言的运行时系统会变得足够聪明以自动适配它所运行的机器
runtime.GOMAXPROCS(runtime.NumCPU())
// 返回当前处理器的数量
fmt.Println(runtime.GOMAXPROCS(0))
// 返回当前机器的逻辑处理器或者核心的数量
fmt.Println(runtime.NumCPU())
// Args hold the command-line arguments, starting with the program name
if len(os.Args) < 3 || os.Args[1] == "-h" || os.Args[1] == "--help" {
// Base returns the last element of path. Trailing path separators are removed before extracting the last element. If the path is empty, Base returns ".". If the path consists entirely of separators, Base returns a single separator
fmt.Printf("usage: %s <regexp> <files>\n",
filepath.Base(os.Args[0]))
// Exit causes the current program to exit with the given status code. Conventionally, code zero indicates success, non-zero an error. The program terminates immediately; deferred functions are not run
os.Exit(1)
}
// Compile parses a regular expression and returns, if successful, a Regexp object that can be used to match against text
if lineRx, err := regexp.Compile(os.Args[1]); err != nil {
log.Fatalf("invalid regexp: %s\n", err)
} else {
grep(lineRx, commandLineFiles(os.Args[2:]))
}
}
func commandLineFiles(files []string) []string {
// Package runtime contains operations that interact with Go's runtime system, such as functions to control goroutines. It also includes the low-level type information used by the reflect package; see reflect's documentation for the programmable interface to the run-time type system
// GOOS is the running program's operating system target: one of darwin, freebsd, linux, and so on
if runtime.GOOS == "windows" {
args := make([]string, 0, len(files))
for _, name := range files {
// Glob returns the names of all files matching pattern or nil if there is no matching file. The syntax of patterns is the same as in Match. The pattern may describe hierarchical names such as /usr/*/bin/ed (assuming the Separator is '/')
if matches, err := filepath.Glob(name); err != nil {
args = append(args, name) // Invalid pattern
} else if matches != nil { // At least one match
args = append(args, matches...)
}
}
return args
}
return files
}
func grep(lineRx *regexp.Regexp, filenames []string) {
// 定义需要的channels切片
jobs := make(chan Job, workers)
results := make(chan Result, minimum(1000, len(filenames)))
done := make(chan struct{}, workers)
// ---------------------------------------------
/*
* 下面是go协程并发处理的一个经典框架
*/
// 将需要并发处理的任务添加到jobs的channel中
go addJobs(jobs, filenames, results) // Executes in its own goroutine
// 根据cpu的数量启动对应个数的goroutines从jobs争夺任务进行处理
for i := 0; i < workers; i++ {
go doJobs(done, lineRx, jobs) // Each executes in its own goroutine
}
// 新创建一个接受结果的routine, 等待所有worker routiines的完成结果, 并将结果通知主routine
go awaitCompletion(done, results)
// 在主routine输出结果
processResults(results)
// ---------------------------------------------
}
func addJobs(jobs chan<- Job, filenames []string, results chan<- Result) {
for _, filename := range filenames {
// 在channel中添加任务
jobs <- Job{filename, results}
}
close(jobs)
}
func doJobs(done chan<- struct{}, lineRx *regexp.Regexp, jobs <-chan Job) {
// 在channel中取出任务并计算
for job := range jobs {
/*
* 定义类型自己的方法来处理业务逻辑
*/
job.Do(lineRx)
}
// 所有任务完成后的结束标志, 一个空结构体切片
done <- struct{}{}
}
// 方法是作用在自定义类型的值上的一类特殊函数
func (job Job) Do(lineRx *regexp.Regexp) {
file, err := os.Open(job.filename)
if err != nil {
log.Printf("error: %s\n", err)
return
}
// 延迟释放, 类似C++中的析构函数
defer file.Close()
// NewReader returns a new Reader whose buffer has the default size
reader := bufio.NewReader(file)
for lino := 1; ; lino++ {
// ReadBytes reads until the first occurrence of delim in the input, returning a slice containing the data up to and including the delimiter. If ReadBytes encounters an error before finding a delimiter, it returns the data read before the error and the error itself (often io.EOF). ReadBytes returns err != nil if and only if the returned data does not end in delim. For simple uses, a Scanner may be more convenient
line, err := reader.ReadBytes('\n')
// Package bytes implements functions for the manipulation of byte slices. It is analogous to the facilities of the strings package
// TrimRight returns a subslice of s by slicing off all trailing UTF-8-encoded Unicode code points that are contained in cutset
line = bytes.TrimRight(line, "\n\r")
// Match reports whether the Regexp matches the byte slice b
if lineRx.Match(line) {
// 若匹配则将文件名, 行号, 匹配的行存在结果集里, 结果集是一个管道类型
job.results <- Result{job.filename, lino, string(line)}
}
// 读文件出错
if err != nil {
if err != io.EOF {
log.Printf("error:%d: %s\n", lino, err)
}
break
}
}
}
func awaitCompletion(done <-chan struct{}, results chan Result) {
for i := 0; i < workers; i++ {
<-done
}
close(results)
}
func processResults(results <-chan Result) {
for result := range results {
fmt.Printf("%s:%d:%s\n", result.filename, result.lino, result.line)
}
}
func minimum(x int, ys ...int) int {
for _, y := range ys {
if y < x {
x = y
}
}
return x
}