2025-04-01 10:38:02 +09:00

265 lines
5.6 KiB
Go

package parquet
import (
"fmt"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
var (
_ resource.BulkRequestIterator = (*parquetReader)(nil)
)
func NewParquetReader(inputPath string, batchSize int64) (resource.BulkRequestIterator, error) {
return newResourceReader(inputPath, batchSize)
}
type parquetReader struct {
reader *file.Reader
namespace *stringColumn
group *stringColumn
resource *stringColumn
name *stringColumn
value *stringColumn
folder *stringColumn
action *int32Column
columns []columnBuffer
batchSize int64
defLevels []int16
repLevels []int16
// how many we already read
bufferSize int
bufferIndex int
rowGroupIDX int
req *resource.BulkRequest
err error
}
// Next implements resource.BulkRequestIterator.
func (r *parquetReader) Next() bool {
r.req = nil
for r.err == nil && r.reader != nil {
if r.bufferIndex >= r.bufferSize && r.value.reader.HasNext() {
r.bufferIndex = 0
r.err = r.readBulk()
if r.err != nil {
return false
}
r.bufferIndex = r.value.count
}
if r.bufferSize > r.bufferIndex {
i := r.bufferIndex
r.bufferIndex++
r.req = &resource.BulkRequest{
Key: &resource.ResourceKey{
Group: r.group.buffer[i].String(),
Resource: r.resource.buffer[i].String(),
Namespace: r.namespace.buffer[i].String(),
Name: r.name.buffer[i].String(),
},
Action: resource.BulkRequest_Action(r.action.buffer[i]),
Value: r.value.buffer[i].Bytes(),
Folder: r.folder.buffer[i].String(),
}
return true
}
r.rowGroupIDX++
if r.rowGroupIDX >= r.reader.NumRowGroups() {
_ = r.reader.Close()
r.reader = nil
return false
}
r.err = r.open(r.reader.RowGroup(r.rowGroupIDX))
}
return false
}
// Request implements resource.BulkRequestIterator.
func (r *parquetReader) Request() *resource.BulkRequest {
return r.req
}
// RollbackRequested implements resource.BulkRequestIterator.
func (r *parquetReader) RollbackRequested() bool {
return r.err != nil
}
func newResourceReader(inputPath string, batchSize int64) (*parquetReader, error) {
rdr, err := file.OpenParquetFile(inputPath, true)
if err != nil {
return nil, err
}
schema := rdr.MetaData().Schema
makeColumn := func(name string) *stringColumn {
index := schema.ColumnIndexByName(name)
if index < 0 {
err = fmt.Errorf("missing column: %s", name)
}
return &stringColumn{
index: index,
buffer: make([]parquet.ByteArray, batchSize),
}
}
reader := &parquetReader{
reader: rdr,
namespace: makeColumn("namespace"),
group: makeColumn("group"),
resource: makeColumn("resource"),
name: makeColumn("name"),
value: makeColumn("value"),
folder: makeColumn("folder"),
action: &int32Column{
index: schema.ColumnIndexByName("action"),
buffer: make([]int32, batchSize),
},
batchSize: batchSize,
defLevels: make([]int16, batchSize),
repLevels: make([]int16, batchSize),
}
if err != nil {
_ = rdr.Close()
return nil, err
}
reader.columns = []columnBuffer{
reader.namespace,
reader.group,
reader.resource,
reader.name,
reader.action,
reader.value,
}
// Empty file, close and return
if rdr.NumRowGroups() < 1 {
err = rdr.Close()
reader.reader = nil
return reader, err
}
err = reader.open(rdr.RowGroup(0))
if err != nil {
_ = rdr.Close()
return nil, err
}
// get the first batch
err = reader.readBulk()
if err != nil {
_ = rdr.Close()
return nil, err
}
return reader, nil
}
func (r *parquetReader) open(rgr *file.RowGroupReader) error {
for _, c := range r.columns {
err := c.open(rgr)
if err != nil {
return err
}
}
return nil
}
func (r *parquetReader) readBulk() error {
r.bufferIndex = 0
r.bufferSize = 0
for i, c := range r.columns {
count, err := c.batch(r.batchSize, r.defLevels, r.repLevels)
if err != nil {
return err
}
if i > 0 && r.bufferSize != count {
return fmt.Errorf("expecting the same size for all columns")
}
r.bufferSize = count
}
return nil
}
//-------------------------------
// Column support
//-------------------------------
type columnBuffer interface {
open(rgr *file.RowGroupReader) error
batch(batchSize int64, defLevels []int16, repLevels []int16) (int, error)
}
type stringColumn struct {
index int // within the schema
reader *file.ByteArrayColumnChunkReader
buffer []parquet.ByteArray
count int // the active count
}
func (c *stringColumn) open(rgr *file.RowGroupReader) error {
tmp, err := rgr.Column(c.index)
if err != nil {
return err
}
var ok bool
c.reader, ok = tmp.(*file.ByteArrayColumnChunkReader)
if !ok {
return fmt.Errorf("expected resource strings")
}
return nil
}
func (c *stringColumn) batch(batchSize int64, defLevels []int16, repLevels []int16) (int, error) {
_, count, err := c.reader.ReadBatch(batchSize, c.buffer, defLevels, repLevels)
c.count = count
return count, err
}
type int32Column struct {
index int // within the schemna
reader *file.Int32ColumnChunkReader
buffer []int32
count int // the active count
}
func (c *int32Column) open(rgr *file.RowGroupReader) error {
tmp, err := rgr.Column(c.index)
if err != nil {
return err
}
var ok bool
c.reader, ok = tmp.(*file.Int32ColumnChunkReader)
if !ok {
return fmt.Errorf("expected resource strings")
}
return nil
}
func (c *int32Column) batch(batchSize int64, defLevels []int16, repLevels []int16) (int, error) {
_, count, err := c.reader.ReadBatch(batchSize, c.buffer, defLevels, repLevels)
c.count = count
return count, err
}
//-------------------------------
// Column support
//-------------------------------