Golang如何处理parquet文件

其他教程   发布日期:2023年08月07日   浏览次数:699

这篇文章主要介绍“Golang如何处理parquet文件”,在日常操作中,相信很多人在Golang如何处理parquet文件问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Golang如何处理parquet文件”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

前言

Parquet是Apache基金会支持的项目,是面向列存储二进制文件格式。支持不同类型的压缩方式,广泛用于数据科学和大数据环境,如Hadoop生态。

创建结构体

首先创建struct,用于表示要处理的数据:

  1. type user struct {
  2. ID string `parquet:"name=id, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  3. FirstName string `parquet:"name=firstname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  4. LastName string `parquet:"name=lastname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  5. Email string `parquet:"name=email, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  6. Phone string `parquet:"name=phone, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  7. Blog string `parquet:"name=blog, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  8. Username string `parquet:"name=username, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  9. Score float64 `parquet:"name=score, type=DOUBLE"`
  10. CreatedAt time.Time //wont be saved in the parquet file
  11. }

这里要提醒的是tag,用于说明struct中每个字段在生成parquet过程中如何被处理。

parquet-go包可以处理parquet数据,更多的tag可以参考其官网。

生成parquet文件

下面现给出生成parquet文件的代码,然后分别进行说明:

  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "time"
  6. "github.com/bxcodec/faker/v3"
  7. "github.com/xitongsys/parquet-go-source/local"
  8. "github.com/xitongsys/parquet-go/parquet"
  9. "github.com/xitongsys/parquet-go/reader"
  10. "github.com/xitongsys/parquet-go/writer"
  11. )
  12. type user struct {
  13. ID string `parquet:"name=id, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  14. FirstName string `parquet:"name=firstname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  15. LastName string `parquet:"name=lastname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  16. Email string `parquet:"name=email, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  17. Phone string `parquet:"name=phone, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  18. Blog string `parquet:"name=blog, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  19. Username string `parquet:"name=username, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  20. Score float64 `parquet:"name=score, type=DOUBLE"`
  21. CreatedAt time.Time //wont be saved in the parquet file
  22. }
  23. const recordNumber = 10000
  24. func main() {
  25. var data []*user
  26. //create fake data
  27. for i := 0; i < recordNumber; i++ {
  28. u := &user{
  29. ID: faker.UUIDDigit(),
  30. FirstName: faker.FirstName(),
  31. LastName: faker.LastName(),
  32. Email: faker.Email(),
  33. Phone: faker.Phonenumber(),
  34. Blog: faker.URL(),
  35. Username: faker.Username(),
  36. Score: float64(i),
  37. CreatedAt: time.Now(),
  38. }
  39. data = append(data, u)
  40. }
  41. err := generateParquet(data)
  42. if err != nil {
  43. log.Fatal(err)
  44. }
  45. }
  46. func generateParquet(data []*user) error {
  47. log.Println("generating parquet file")
  48. fw, err := local.NewLocalFileWriter("output.parquet")
  49. if err != nil {
  50. return err
  51. }
  52. //parameters: writer, type of struct, size
  53. pw, err := writer.NewParquetWriter(fw, new(user), int64(len(data)))
  54. if err != nil {
  55. return err
  56. }
  57. //compression type
  58. pw.CompressionType = parquet.CompressionCodec_GZIP
  59. defer fw.Close()
  60. for _, d := range data {
  61. if err = pw.Write(d); err != nil {
  62. return err
  63. }
  64. }
  65. if err = pw.WriteStop(); err != nil {
  66. return err
  67. }
  68. return nil
  69. }

定义结构体上面已经说明,但需要提醒的是类型与文档保持一致:

Primitive Type Go Type
BOOLEAN bool
INT32 int32
INT64 int64
INT96(deprecated) string
FLOAT float32
DOUBLE float64
BYTE_ARRAY string
FIXED_LEN_BYTE_ARRAY string

接着就是使用faker包生成模拟数据。然后调用

  1. err := generateParquet(data)
方法。该方法大概逻辑为:
  • 首先准备输出文件,然后基于本地输出文件构造pw,用于写parquet数据:

  1. fw, err := local.NewLocalFileWriter("output.parquet")
  2. if err != nil {
  3. return err
  4. }
  5. //parameters: writer, type of struct, size
  6. pw, err := writer.NewParquetWriter(fw, new(user), int64(len(data)))
  7. if err != nil {
  8. return err
  9. }
  10. //compression type
  11. pw.CompressionType = parquet.CompressionCodec_GZIP
  12. defer fw.Close()

然后设置压缩类型,并通过defer操作确保关闭文件。下面开始写数据:

  1. for _, d := range data {
  2. if err = pw.Write(d); err != nil {
  3. return err
  4. }
  5. }
  6. if err = pw.WriteStop(); err != nil {
  7. return err
  8. }
  9. return nil

循环写数据,最后调用

  1. pw.WriteStop()
停止写。 成功写文件后,下面介绍如何读取parquet文件。

读取parquet文件

首先介绍如何一次性读取文件,主要用于读取较小的文件:

  1. func readParquet() ([]*user, error) {
  2. fr, err := local.NewLocalFileReader("output.parquet")
  3. if err != nil {
  4. return nil, err
  5. }
  6. pr, err := reader.NewParquetReader(fr, new(user), recordNumber)
  7. if err != nil {
  8. return nil, err
  9. }
  10. u := make([]*user, recordNumber)
  11. if err = pr.Read(&u); err != nil {
  12. return nil, err
  13. }
  14. pr.ReadStop()
  15. fr.Close()
  16. return u, nil
  17. }

大概流程如下:首先定义本地文件,然后构造pr用于读取parquet文件:

  1. fr, err := local.NewLocalFileReader("output.parquet")
  2. if err != nil {
  3. return nil, err
  4. }
  5. pr, err := reader.NewParquetReader(fr, new(user), recordNumber)
  6. if err != nil {
  7. return nil, err
  8. }

然后定义目标内容容器u,一次性读取数据:

  1. u := make([]*user, recordNumber)
  2. if err = pr.Read(&u); err != nil {
  3. return nil, err
  4. }
  5. pr.ReadStop()
  6. fr.Close()

但一次性大量记录加载至内存可能有问题。这是官方文档提示:

If the parquet file is very big (even the size of parquet file is small, the uncompressed size may be very large), please don&rsquo;t read all rows at one time, which may induce the OOM. You can read a small portion of the data at a time like a stream-oriented file.

大意是不要一次读取文件至内存,可能造成OOM。实际应用中应该分页读取,下面通过代码进行说明:

  1. func readPartialParquet(pageSize, page int) ([]*user, error) {
  2. fr, err := local.NewLocalFileReader("output.parquet")
  3. if err != nil {
  4. return nil, err
  5. }
  6. defer func() {
  7. _ = fr.Close()
  8. }()
  9. pr, err := reader.NewParquetReader(fr, new(user), int64(pageSize))
  10. if err != nil {
  11. return nil, err
  12. }
  13. defer pr.ReadStop()
  14. //num := pr.GetNumRows()
  15. pr.SkipRows(int64(pageSize * page))
  16. u := make([]*user, pageSize)
  17. if err = pr.Read(&u); err != nil {
  18. return nil, err
  19. }
  20. return u, nil
  21. }

与上面函数差异不大,首先函数包括两个参数,用于指定页大小和页数,关键代码是跳过一定记录:

  1. pr.SkipRows(int64(pageSize * page))

根据这个方法可以获得总行数,

  1. pr.GetNumRows()
,然后结合页大小计算总页数,最后循环可以实现分页查询。

计算列平均值

既然使用了Parquet列存储格式,下面演示下如何计算Score列的平均值。

  1. func calcScoreAVG() (float64, error) {
  2. fr, err := local.NewLocalFileReader("output.parquet")
  3. if err != nil {
  4. return 0.0, err
  5. }
  6. pr, err := reader.NewParquetColumnReader(fr, recordNumber)
  7. if err != nil {
  8. return 0.0, err
  9. }
  10. num := int(pr.GetNumRows())
  11. data, _, _, err := pr.ReadColumnByPath("parquet_go_rootu0001score", num)
  12. if err != nil {
  13. return 0.0, err
  14. }
  15. var result float64
  16. for _, i := range data {
  17. result += i.(float64)
  18. }
  19. return (result / float64(num)), nil
  20. }

首先打开文件,然后调用pr.GetNumRows()方法获取总行数。然后基于路径指定列,其中

  1. parquet_go_root
为根路径,因为前面使用字节数组,这里分割符变为u0001,完整路径为:
  1. parquet_go_rootu0001score

以上就是Golang如何处理parquet文件的详细内容,更多关于Golang如何处理parquet文件的资料请关注九品源码其它相关文章!