Go语言中用多线程读取大文件

admin 2020-03-31 go 多线程 52

有一天,我在一家公司面试,我被问到一个问题,你怎么能在一个50gb的文件里用4gb的内存计算一个单词的出现次数。诀窍是不要将整个文件加载到内存中,并在移动文件指针时继续处理每个单词。有了它,我们可以用最少的内存资源轻松地处理整个文件。

接下来的问题是我们如何使用多线程来加速这个过程?解决方案是在文件的不同部分保留多个指针,每个线程同时读取文件的块。最后,结果可以合并。

这只是显示了如何分割整个文件。以及文件的各种指针。假设文件是1GB大。5个线程中的每个将处理200MB。连续指针将从前一个指针的最后一个读字节开始读取。

当涉及到多线程时,最容易想到的选择是go-routine。我将带您浏览一个程序,它读取一个大型文本文件并创建一个单词字典。

这个程序演示了使用5个go例程读取一个1GB的文件,每个线程读取200MB。

const mb = 1024 * 1024
const gb = 1024 * mb

func main() {
	wg := sync.WaitGroup{}

	//这个通道用于发送各种goroutine中的每个已读单词。
	channel := make(chan (string))

	// 存储唯一单词计数的字典。
	dict := make(map[string]int64)

	//done是一个通道,所有的单词都已输入字典后的信号。
	done := make(chan (bool), 1)

	// 读取通道中所有输入的单词并将它们添加到字典中。
	go func() {
		for s := range channel {
			dict[s]++
		}

		//向主线程发出信号,表明所有的单词都已进入词典。
		done <- true
	}()
	// current表示文件的字节数。
	var current int64
	// Limit表示每个线程要处理的文件块大小。
	var limit int64 = 500 * mb
	for i := 0; i < 2; i++ {
		wg.Add(1)
		go func() {
			read(current, limit, "gameofthrones.txt", channel)
			fmt.Printf("%d thread has been completed \n", i)
			wg.Done()
		}()
		// 将current值增加1+(前一个线程读取的最后一个字节)。
		current += limit + 1
	}
	// Wait for all go routines to complete.
	wg.Wait()
	close(channel)
	// Wait for dictionary to process all the words.
	<-done
	close(done)
}

func read(offset int64, limit int64, fileName string, channel chan (string)) {
	file, err := os.Open(fileName)
	defer file.Close()
	if err != nil {
		panic(err)
	}
	// 将文件指针移动到指定块的开始位置。
	file.Seek(offset, 0)
	reader := bufio.NewReader(file)
	// 这段代码确保chunk的开头是一个新单词。
        //如果在给定的位置遇到一个字符,它将移动几个字节直到单词的末尾。
	if offset != 0 {
		_, err = reader.ReadBytes(' ')
		if err == io.EOF {
			fmt.Println("EOF")
			return
		}
		if err != nil {
			panic(err)
		}
	}
	var cummulativeSize int64
	for {
		// 如果读大小超过了块大小,则断开。
		if cummulativeSize > limit {
			break
		}
		b, err := reader.ReadBytes(' ')
		// 如果遇到文件结束,则中断。
		if err == io.EOF {
			break
		}
		if err != nil {
			panic(err)
		}
		cummulativeSize += int64(len(b))
		s := strings.TrimSpace(string(b))
		if s != "" {
			// 将通道中的已读单词发送到字典中。
			channel <- s
		}
	}
}

在这里,我们还需要处理边界情况。如果这个词块的开头不是一个新单词的开头呢?类似地,如果一个块的结束不是块的结束,该怎么办?

我们通过将块的末尾延长到单词的末尾并将连续块的开头移动到下一个单词的开头来处理这个问题。

我们使用通道将不同线程读取的所有单词统一到一个字典中。一个同步。Waitgroup可用于线程的同步,并确保所有线程都已完成文件的读取。

可以看到,与串行方式相比,性能提高了一倍,同时处理1GB文件所需的时间减少了一半。

我们没有得到5x性能的原因。尽管goroutine是轻量级线程,但读取文件的过程需要整个os级CPU内核的资源。它没有睡眠时间。因此,在双核系统中(CPU只有两个核),它只能使文件处理的性能提高一倍。

原文链接:https://hackernoon.com/leveraging-multithreading-to-read-large-files-faster-in-go-lmn32t7