我是靠谱客的博主 甜甜学姐,这篇文章主要介绍Go1.18新特性使用Generics泛型进行流式处理,现在分享给大家,希望可以做个参考。

前言

Stream 是一个基于 Go 1.18+ 泛型的流式处理库, 它支持并行处理流中的数据. 并行流会将元素平均划分多个的分区, 并创建相同数量的 goroutine 执行, 并且会保证处理完成后流中元素保持原始顺序.

GitHub - xyctruth/stream: A Stream library based on Go 1.18+ Generics (Support Parallel Stream)

安装

需要安装 Go 1.18+ 版本

复制代码
1
$ go get github.com/xyctruth/stream

在代码中导入它

复制代码
1
import "github.com/xyctruth/stream"

基础

复制代码
1
2
3
4
5
6
7
8
9
10
11
s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}). Filter(func(s string) bool { return s != "b" }). Map(func(s string) string { return "class_" + s }). Sort(). Distinct(). ToSlice() // 需要转换切片元素的类型 s := stream.NewSliceByMapping[int, string, string]([]int{1, 2, 3, 4, 5}). Filter(func(v int) bool { return v >3 }). Map(func(v int) string { return "mapping_" + strconv.Itoa(v) }). Reduce(func(r string, v string) string { return r + v })

类型约束

any 接受任何类型的元素, 所以不能使用 == != > < 比较元素, 导致你不能使用 Sort(), Find()...等函数 ,但是你可以使用 SortFunc(fn), FindFunc(fn)... 代替

复制代码
1
2
3
4
type SliceStream[E any] struct { slice []E } stream.NewSlice([]int{1, 2, 3, 7, 1})

comparable 接收的类型可以使用 == != 比较元素, 但仍然不能使用 > < 比较元素, 因此你不能使用 Sort(), Min()...等函数 ,但是你可以使用 SortFunc(fn), MinFunc()... 代替

复制代码
1
2
3
4
type SliceComparableStream[E comparable] struct { SliceStream[E] } stream.NewSliceByComparable([]int{1, 2, 3, 7, 1})

constraints.Ordered 接收的类型可以使用 == != > <, 所以可以使用所有的函数

复制代码
1
2
3
4
type SliceOrderedStream[E constraints.Ordered] struct { SliceComparableStream[E] } stream.NewSliceByOrdered([]int{1, 2, 3, 7, 1})

类型转换

有些时候我们需要使用 Map ,Reduce 转换切片元素的类型,但是很遗憾目前 Golang 并不支持结构体的方法有额外的类型参数,所有类型参数必须在结构体中声明。在 Golang 支持之前我们暂时使用临时方案解决这个问题。

复制代码
1
2
3
4
5
6
7
8
9
10
11
// SliceMappingStream Need to convert the type of slice elements. // - E elements type // - MapE map elements type // - ReduceE reduce elements type type SliceMappingStream[E any, MapE any, ReduceE any] struct { SliceStream[E] } s := stream.NewSliceByMapping[int, string, string]([]int{1, 2, 3, 4, 5}). Filter(func(v int) bool { return v >3 }). Map(func(v int) string { return "mapping_" + strconv.Itoa(v) }). Reduce(func(r string, v string) string { return r + v })

并行

Parallel 函数接收一个 goroutines int 参数. 如果 goroutines>1 则开启并行, 否则关闭并行, 默认流是关闭并行的。

并行会将流中的元素平均划分多个的分区, 并创建相同数量的 goroutine 执行, 并且会保证处理完成后流中元素保持原始顺序.

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}). Parallel(10). Filter(func(s string) bool { // 一些耗时操作 return s != "b" }). Map(func(s string) string { // 一些耗时操作 return "class_" + s }). ForEach( func(index int, s string) { // 一些耗时操作 }, ).ToSlice()

并行类型

  • First: 一旦获得第一个返回值,并行处理就结束. For: AllMatch, AnyMatch, FindFunc
  • ALL: 所有元素都需要并行处理,得到所有返回值,然后并行结束. For: Map, Filter
  • Action: 所有元素需要并行处理,不需要返回值. For: ForEach, Action

并行 goroutines

开启并行 goroutine 数量在面对 CPU 操作与 IO 操作有着不同的选择。

一般面对 CPU 操作时 goroutine 数量不需要设置大于 CPU 核心数,而 IO 操作时 goroutine 数量可以设置远远大于 CPU 核心数.

CPU 操作

复制代码
1
2
3
NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) { sort.Ints(newArray(1000)) // 模拟 CPU 耗时操作 })

使用6个cpu核心进行基准测试

复制代码
1
2
3
4
5
6
7
8
9
10
go test -run=^$ -benchtime=5s -cpu=6 -bench=^BenchmarkParallelByCPU goarch: amd64 pkg: github.com/xyctruth/stream cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz BenchmarkParallelByCPU/no_parallel(0)-6 717 9183119 ns/op BenchmarkParallelByCPU/goroutines(2)-6 1396 4303113 ns/op BenchmarkParallelByCPU/goroutines(4)-6 2539 2388197 ns/op BenchmarkParallelByCPU/goroutines(6)-6 2932 2159407 ns/op BenchmarkParallelByCPU/goroutines(8)-6 2334 2577405 ns/op BenchmarkParallelByCPU/goroutines(10)-6 2649 2352926 ns/op

IO 操作

复制代码
1
2
3
NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) { time.Sleep(time.Millisecond) // 模拟 IO 耗时操作 })

使用6个cpu核心进行基准测试

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
go test -run=^$ -benchtime=5s -cpu=6 -bench=^BenchmarkParallelByIO goos: darwin goarch: amd64 pkg: github.com/xyctruth/stream cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz BenchmarkParallelByIO/no_parallel(0)-6 52 102023558 ns/op BenchmarkParallelByIO/goroutines(2)-6 100 55807303 ns/op BenchmarkParallelByIO/goroutines(4)-6 214 27868725 ns/op BenchmarkParallelByIO/goroutines(6)-6 315 18925789 ns/op BenchmarkParallelByIO/goroutines(8)-6 411 14439700 ns/op BenchmarkParallelByIO/goroutines(10)-6 537 11164758 ns/op BenchmarkParallelByIO/goroutines(50)-6 2629 2310602 ns/op BenchmarkParallelByIO/goroutines(100)-6 5094 1221887 ns/op

项目地址 https://github.com/xyctruth/stream

以上就是Go1.18新特性使用Generics泛型进行流式处理的详细内容,更多关于Go1.18 Generics泛型流式处理的资料请关注靠谱客其它相关文章!

最后

以上就是甜甜学姐最近收集整理的关于Go1.18新特性使用Generics泛型进行流式处理的全部内容,更多相关Go1内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(120)

评论列表共有 0 条评论

立即
投稿
返回
顶部