当前位置:   article > 正文

Arrow 之 ORC Read_c:\arrow\cpp\src\arrow\filesystem\s3fs.cc:2598: ar

c:\arrow\cpp\src\arrow\filesystem\s3fs.cc:2598: arrow::fs::finalizes3 was no

OrcStripeReader

Arrow/cpp/src/arrow/adapters/orc/adapter.cc

class OrcStripeReader : public RecordBatchReader {
 public:
  OrcStripeReader(std::unique_ptr<liborc::RowReader> row_reader,
                  std::shared_ptr<Schema> schema, int64_t batch_size, MemoryPool* pool)
      : row_reader_(std::move(row_reader)),
        schema_(schema),
        pool_(pool),
        batch_size_{batch_size} {}

  std::shared_ptr<Schema> schema() const override { return schema_; }

  Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
    std::unique_ptr<liborc::ColumnVectorBatch> batch;
    ORC_CATCH_NOT_OK(batch = row_reader_->createRowBatch(batch_size_));

    const liborc::Type& type = row_reader_->getSelectedType();
    if (!row_reader_->next(*batch)) {
      out->reset();
      return Status::OK();
    }

    std::unique_ptr<RecordBatchBuilder> builder;
    RETURN_NOT_OK(RecordBatchBuilder::Make(schema_, pool_, batch->numElements, &builder));

    // The top-level type must be a struct to read into an arrow table
    const auto& struct_batch = checked_cast<liborc::StructVectorBatch&>(*batch);

    for (int i = 0; i < builder->num_fields(); i++) {
      RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0,
                                batch->numElements, builder->GetField(i)));
    }

    RETURN_NOT_OK(builder->Flush(out));
    return Status::OK();
  }

 private:
  std::unique_ptr<liborc::RowReader> row_reader_;
  std::shared_ptr<Schema> schema_;
  MemoryPool* pool_;
  int64_t batch_size_;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
OrcScanTask

arrow/cpp/src/arrow/dataset/file_orc.cc

/// \brief A ScanTask backed by an ORC file.
class OrcScanTask : public ScanTask {
 public:
  OrcScanTask(std::shared_ptr<FileFragment> fragment,
              std::shared_ptr<ScanOptions> options)
      : ScanTask(std::move(options), fragment), source_(fragment->source()) {}

  Result<RecordBatchIterator> Execute() override {
    struct Impl {
      static Result<RecordBatchIterator> Make(const FileSource& source,
                                              const FileFormat& format,
                                              const ScanOptions& scan_options) {
        ARROW_ASSIGN_OR_RAISE(
            auto reader, OpenReader(source, std::make_shared<ScanOptions>(scan_options)));
        int num_stripes = reader->NumberOfStripes();

        auto materialized_fields = scan_options.MaterializedFields();
        // filter out virtual columns
        std::vector<std::string> included_fields;
        ARROW_ASSIGN_OR_RAISE(auto schema, reader->ReadSchema());
        for (auto name : materialized_fields) {
          FieldRef ref(name);
          ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*schema));
          if (match.indices().empty()) continue;

          included_fields.push_back(name);
        }

        std::shared_ptr<RecordBatchReader> recordBatchReader;
        reader->NextStripeReader(scan_options.batch_size, included_fields, &recordBatchReader);
        
        return RecordBatchIterator(
            Impl{std::move(recordBatchReader)});
      }

      Result<std::shared_ptr<RecordBatch>> Next() {
        std::shared_ptr<RecordBatch> batch;
        RETURN_NOT_OK(recordBatchReader_->ReadNext(&batch));
        return batch;
      }

      std::shared_ptr<RecordBatchReader> recordBatchReader_;
    };

    return Impl::Make(source_, *checked_pointer_cast<FileFragment>(fragment_)->format(),
                      *options_);
  }

 private:
  FileSource source_;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

结构分析

Result<RecordBatchIterator> Execute() override
{
	struct Impl{}
	return Impl::Make(source_, *checked_pointer_cast<FileFragment>(fragment_)->format(), *options_);
}	

struct Impl 
{
	static Result<RecordBatchIterator> Make(const FileSource& source, const FileFormat& format, const ScanOptions& scan_options)
	{
		return RecordBatchIterator(Impl{std::move(recordBatchReader)});
	}
	Result<std::shared_ptr<RecordBatch>> Next()
	std::shared_ptr<RecordBatchReader> recordBatchReader_;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

struct Impl 相当于一个单例

  • Make 函数是其构建实例的接口,在其中初始化了成员recordBatchReader_
  • Make 函数同时一个返回了一个包装后的RecordBatchIterator
using RecordBatchIterator = Iterator<std::shared_ptr<RecordBatch>>;
  • 1
ORCFileReader

arrow/cpp/src/arrow/adapters/orc/adapter.h

brief Read an Arrow Table or RecordBatch from an ORC file.

/// \class ORCFileReader
/// \brief Read an Arrow Table or RecordBatch from an ORC file.
class ARROW_EXPORT ORCFileReader {
 public:
  ~ORCFileReader();

  /// \brief Creates a new ORC reader.
  ///
  /// \param[in] file the data source
  /// \param[in] pool a MemoryPool to use for buffer allocations
  /// \param[out] reader the returned reader object
  /// \return Status
  static Status Open(const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool,
                     std::unique_ptr<ORCFileReader>* reader);

  /// \brief Creates a new ORC reader
  ///
  /// \param[in] file the data source
  /// \param[in] pool a MemoryPool to use for buffer allocations
  /// \return the returned reader object
  static Result<std::unique_ptr<ORCFileReader>> Open(
      const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool);

  /// \brief Return the schema read from the ORC file
  ///
  /// \param[out] out the returned Schema object
  Status ReadSchema(std::shared_ptr<Schema>* out);

  /// \brief Return the schema read from the ORC file
  ///
  /// \return the returned Schema object
  Result<std::shared_ptr<Schema>> ReadSchema();

  /// \brief Read the file as a Table
  ///
  /// The table will be composed of one record batch per stripe.
  ///
  /// \param[out] out the returned Table
  Status Read(std::shared_ptr<Table>* out);

  /// \brief Read the file as a Table
  ///
  /// The table will be composed of one record batch per stripe.
  ///
  /// \param[in] schema the Table schema
  /// \param[out] out the returned Table
  Status Read(const std::shared_ptr<Schema>& schema, std::shared_ptr<Table>* out);

  /// \brief Read the file as a Table
  ///
  /// The table will be composed of one record batch per stripe.
  ///
  /// \param[in] include_indices the selected field indices to read
  /// \param[out] out the returned Table
  Status Read(const std::vector<int>& include_indices, std::shared_ptr<Table>* out);

  /// \brief Read the file as a Table
  ///
  /// The table will be composed of one record batch per stripe.
  ///
  /// \param[in] include_names the selected field names to read
  /// \return the returned Table
  Result<std::shared_ptr<Table>> Read(const std::vector<std::string>& include_names);

  /// \brief Read the file as a Table
  ///
  /// The table will be composed of one record batch per stripe.
  ///
  /// \param[in] schema the Table schema
  /// \param[in] include_indices the selected field indices to read
  /// \param[out] out the returned Table
  Status Read(const std::shared_ptr<Schema>& schema,
              const std::vector<int>& include_indices, std::shared_ptr<Table>* out);

  /// \brief Read a single stripe as a RecordBatch
  ///
  /// \param[in] stripe the stripe index
  /// \param[out] out the returned RecordBatch
  Status ReadStripe(int64_t stripe, std::shared_ptr<RecordBatch>* out);

  /// \brief Read a single stripe as a RecordBatch
  ///
  /// \param[in] stripe the stripe index
  /// \return the returned RecordBatch
  Result<std::shared_ptr<RecordBatch>> ReadStripe(int64_t stripe);

  /// \brief Read a single stripe as a RecordBatch
  ///
  /// \param[in] stripe the stripe index
  /// \param[in] include_indices the selected field indices to read
  /// \param[out] out the returned RecordBatch
  Status ReadStripe(int64_t stripe, const std::vector<int>& include_indices,
                    std::shared_ptr<RecordBatch>* out);

  /// \brief Read a single stripe as a RecordBatch
  ///
  /// \param[in] stripe the stripe index
  /// \param[in] include_names the selected field names to read
  /// \return the returned RecordBatch
  Result<std::shared_ptr<RecordBatch>> ReadStripe(
      int64_t stripe, const std::vector<std::string>& include_names);

  /// \brief Seek to designated row. Invoke NextStripeReader() after seek
  ///        will return stripe reader starting from designated row.
  ///
  /// \param[in] row_number the rows number to seek
  Status Seek(int64_t row_number);

  /// \brief Get a stripe level record batch iterator with specified row count
  ///         in each record batch. NextStripeReader serves as a fine grain
  ///         alternative to ReadStripe which may cause OOM issue by loading
  ///         the whole stripes into memory.
  ///
  /// \param[in] batch_size the number of rows each record batch contains in
  ///            record batch iteration.
  /// \param[out] out the returned stripe reader
  Status NextStripeReader(int64_t batch_size, std::shared_ptr<RecordBatchReader>* out);

  /// \brief Get a stripe level record batch iterator with specified row count
  ///         in each record batch. NextStripeReader serves as a fine grain
  ///         alternative to ReadStripe which may cause OOM issue by loading
  ///         the whole stripes into memory.
  ///
  /// \param[in] batch_size Get a stripe level record batch iterator with specified row
  /// count in each record batch.
  ///
  /// \param[in] include_indices the selected field indices to read
  /// \param[out] out the returned stripe reader
  Status NextStripeReader(int64_t batch_size, const std::vector<int>& include_indices,
                          std::shared_ptr<RecordBatchReader>* out);

  /// \brief Get a stripe level record batch iterator with specified row count
  ///         in each record batch. NextStripeReader serves as a fine grain
  ///         alternative to ReadStripe which may cause OOM issue by loading
  ///         the whole stripes into memory.
  ///
  /// \param[in] batch_size Get a stripe level record batch iterator with specified row
  /// count in each record batch.
  ///
  /// \param[in] include_indices the selected field names to read
  /// \param[out] out the returned stripe reader
  Status NextStripeReader(int64_t batch_size, const std::vector<std::string>& include_names,
                          std::shared_ptr<RecordBatchReader>* out);

  /// \brief The number of stripes in the file
  int64_t NumberOfStripes();

  /// \brief The number of rows in the file
  int64_t NumberOfRows();

 private:
  class Impl;
  std::unique_ptr<Impl> impl_;
  ORCFileReader();
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
ORCFileReader 之 ReadStripe

Arrow/cpp/src/arrow/adapters/orc/adapter.cc

Result<std::shared_ptr<RecordBatch>> ORCFileReader::ReadStripe(
    int64_t stripe, const std::vector<std::string>& include_names) {
  std::shared_ptr<RecordBatch> recordBatch;
  RETURN_NOT_OK(impl_->ReadStripe(stripe, include_names, &recordBatch));
  return recordBatch;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
Status ReadStripe(int64_t stripe, const std::vector<std::string>& include_names,
                    std::shared_ptr<RecordBatch>* out) {
    liborc::RowReaderOptions opts;
    RETURN_NOT_OK(SelectNames(&opts, include_names));
    RETURN_NOT_OK(SelectStripe(&opts, stripe));
    std::shared_ptr<Schema> schema;
    RETURN_NOT_OK(ReadSchema(opts, &schema));
    return ReadBatch(opts, schema, stripes_[stripe].num_rows, out);
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
 Status ReadBatch(const liborc::RowReaderOptions& opts,
                   const std::shared_ptr<Schema>& schema, int64_t nrows,
                   std::shared_ptr<RecordBatch>* out) {
    std::unique_ptr<liborc::RowReader> row_reader;
    std::unique_ptr<liborc::ColumnVectorBatch> batch;

    ORC_BEGIN_CATCH_NOT_OK
    row_reader = reader_->createRowReader(opts);
    batch = row_reader->createRowBatch(std::min(nrows, kReadRowsBatch));
    ORC_END_CATCH_NOT_OK

    std::unique_ptr<RecordBatchBuilder> builder;
    RETURN_NOT_OK(RecordBatchBuilder::Make(schema, pool_, nrows, &builder));

    // The top-level type must be a struct to read into an arrow table
    const auto& struct_batch = checked_cast<liborc::StructVectorBatch&>(*batch);

    const liborc::Type& type = row_reader->getSelectedType();
    while (row_reader->next(*batch)) {
      for (int i = 0; i < builder->num_fields(); i++) {
        RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0,
                                  batch->numElements, builder->GetField(i)));
      }
    }
    RETURN_NOT_OK(builder->Flush(out));
    return Status::OK();
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
ORCFileReader 之 NextStripeReader

Arrow/cpp/src/arrow/adapters/orc/adapter.cc

Status ORCFileReader::NextStripeReader(int64_t batch_size,
                                       const std::vector<std::string>& include_names,
                                       std::shared_ptr<RecordBatchReader>* out) {
  return impl_->NextStripeReader(batch_size, include_names, out);
}
  • 1
  • 2
  • 3
  • 4
  • 5
Status NextStripeReader(int64_t batch_size, const std::vector<std::string>& include_names,
                          std::shared_ptr<RecordBatchReader>* out) {
    if (current_row_ >= NumberOfRows()) {
      out->reset();
      return Status::OK();
    }

    liborc::RowReaderOptions opts;
    if (!include_names.empty()) {
      RETURN_NOT_OK(SelectNames(&opts, include_names));
    }
    StripeInformation stripe_info({0, 0, 0, 0});
    RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info));
    std::shared_ptr<Schema> schema;
    RETURN_NOT_OK(ReadSchema(opts, &schema));
    std::unique_ptr<liborc::RowReader> row_reader;

    ORC_BEGIN_CATCH_NOT_OK
    row_reader = reader_->createRowReader(opts);
    row_reader->seekToRow(current_row_);
    current_row_ = stripe_info.first_row_of_stripe + stripe_info.num_rows;
    ORC_END_CATCH_NOT_OK

    *out = std::shared_ptr<RecordBatchReader>(
        new OrcStripeReader(std::move(row_reader), schema, batch_size, pool_));
    return Status::OK();
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
Status SelectStripeWithRowNumber(liborc::RowReaderOptions* opts, int64_t row_number,
                                   StripeInformation* out) {
    ARROW_RETURN_IF(row_number >= NumberOfRows(),
                    Status::Invalid("Out of bounds row number: ", row_number));

    for (auto it = stripes_.begin(); it != stripes_.end(); it++) {
      if (static_cast<uint64_t>(row_number) >= it->first_row_of_stripe &&
          static_cast<uint64_t>(row_number) < it->first_row_of_stripe + it->num_rows) {
        opts->range(it->offset, it->length);
        *out = *it;
        return Status::OK();
      }
    }

    return Status::Invalid("Invalid row number", row_number);
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
ORCFileReader 之 ::Impl class

Arrow/cpp/src/arrow/adapters/orc/adapter.cc

class ORCFileReader::Impl {
 public:
  Impl() {}
  ~Impl() {}

  Status Open(const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool) {
    std::unique_ptr<ArrowInputFile> io_wrapper(new ArrowInputFile(file));
    liborc::ReaderOptions options;
    std::unique_ptr<liborc::Reader> liborc_reader;
    ORC_CATCH_NOT_OK(liborc_reader = createReader(std::move(io_wrapper), options));
    pool_ = pool;
    reader_ = std::move(liborc_reader);
    current_row_ = 0;

    return Init();
  }

  Status Init() {
    int64_t nstripes = reader_->getNumberOfStripes();
    stripes_.resize(nstripes);
    std::unique_ptr<liborc::StripeInformation> stripe;
    uint64_t first_row_of_stripe = 0;
    for (int i = 0; i < nstripes; ++i) {
      stripe = reader_->getStripe(i);
      stripes_[i] = StripeInformation({stripe->getOffset(), stripe->getLength(),
                                       stripe->getNumberOfRows(), first_row_of_stripe});
      first_row_of_stripe += stripe->getNumberOfRows();
    }
    return Status::OK();
  }

  int64_t NumberOfStripes() { return stripes_.size(); }

  int64_t NumberOfRows() { return reader_->getNumberOfRows(); }

  Status ReadSchema(std::shared_ptr<Schema>* out) {
    const liborc::Type& type = reader_->getType();
    return GetArrowSchema(type, out);
  }

  Status ReadSchema(const liborc::RowReaderOptions& opts, std::shared_ptr<Schema>* out) {
    std::unique_ptr<liborc::RowReader> row_reader;
    ORC_CATCH_NOT_OK(row_reader = reader_->createRowReader(opts));
    const liborc::Type& type = row_reader->getSelectedType();
    return GetArrowSchema(type, out);
  }

  Status GetArrowSchema(const liborc::Type& type, std::shared_ptr<Schema>* out) {
    if (type.getKind() != liborc::STRUCT) {
      return Status::NotImplemented(
          "Only ORC files with a top-level struct "
          "can be handled");
    }
    int size = static_cast<int>(type.getSubtypeCount());
    std::vector<std::shared_ptr<Field>> fields;
    for (int child = 0; child < size; ++child) {
      std::shared_ptr<DataType> elemtype;
      RETURN_NOT_OK(GetArrowType(type.getSubtype(child), &elemtype));
      std::string name = type.getFieldName(child);
      fields.push_back(field(name, elemtype));
    }
    std::list<std::string> keys = reader_->getMetadataKeys();
    std::shared_ptr<KeyValueMetadata> metadata;
    if (!keys.empty()) {
      metadata = std::make_shared<KeyValueMetadata>();
      for (auto it = keys.begin(); it != keys.end(); ++it) {
        metadata->Append(*it, reader_->getMetadataValue(*it));
      }
    }

    *out = std::make_shared<Schema>(fields, metadata);
    return Status::OK();
  }

  Status Read(std::shared_ptr<Table>* out) {
    liborc::RowReaderOptions opts;
    std::shared_ptr<Schema> schema;
    RETURN_NOT_OK(ReadSchema(opts, &schema));
    return ReadTable(opts, schema, out);
  }

  Status Read(const std::shared_ptr<Schema>& schema, std::shared_ptr<Table>* out) {
    liborc::RowReaderOptions opts;
    return ReadTable(opts, schema, out);
  }

  Status Read(const std::vector<int>& include_indices, std::shared_ptr<Table>* out) {
    liborc::RowReaderOptions opts;
    RETURN_NOT_OK(SelectIndices(&opts, include_indices));
    std::shared_ptr<Schema> schema;
    RETURN_NOT_OK(ReadSchema(opts, &schema));
    return ReadTable(opts, schema, out);
  }

  Status Read(const std::vector<std::string>& include_names,
              std::shared_ptr<Table>* out) {
    liborc::RowReaderOptions opts;
    RETURN_NOT_OK(SelectNames(&opts, include_names));
    std::shared_ptr<Schema> schema;
    RETURN_NOT_OK(ReadSchema(opts, &schema));
    return ReadTable(opts, schema, out);
  }

  Status Read(const std::shared_ptr<Schema>& schema,
              const std::vector<int>& include_indices, std::shared_ptr<Table>* out) {
    liborc::RowReaderOptions opts;
    RETURN_NOT_OK(SelectIndices(&opts, include_indices));
    return ReadTable(opts, schema, out);
  }

  Status ReadStripe(int64_t stripe, std::shared_ptr<RecordBatch>* out) {
    liborc::RowReaderOptions opts;
    RETURN_NOT_OK(SelectStripe(&opts, stripe));
    std::shared_ptr<Schema> schema;
    RETURN_NOT_OK(ReadSchema(opts, &schema));
    return ReadBatch(opts, schema, stripes_[stripe].num_rows, out);
  }

  Status ReadStripe(int64_t stripe, const std::vector<int>& include_indices,
                    std::shared_ptr<RecordBatch>* out) {
    liborc::RowReaderOptions opts;
    RETURN_NOT_OK(SelectIndices(&opts, include_indices));
    RETURN_NOT_OK(SelectStripe(&opts, stripe));
    std::shared_ptr<Schema> schema;
    RETURN_NOT_OK(ReadSchema(opts, &schema));
    return ReadBatch(opts, schema, stripes_[stripe].num_rows, out);
  }

  Status ReadStripe(int64_t stripe, const std::vector<std::string>& include_names,
                    std::shared_ptr<RecordBatch>* out) {
    liborc::RowReaderOptions opts;
    RETURN_NOT_OK(SelectNames(&opts, include_names));
    RETURN_NOT_OK(SelectStripe(&opts, stripe));
    std::shared_ptr<Schema> schema;
    RETURN_NOT_OK(ReadSchema(opts, &schema));
    return ReadBatch(opts, schema, stripes_[stripe].num_rows, out);
  }

  Status SelectStripe(liborc::RowReaderOptions* opts, int64_t stripe) {
    ARROW_RETURN_IF(stripe < 0 || stripe >= NumberOfStripes(),
                    Status::Invalid("Out of bounds stripe: ", stripe));

    opts->range(stripes_[stripe].offset, stripes_[stripe].length);
    return Status::OK();
  }

  Status SelectStripeWithRowNumber(liborc::RowReaderOptions* opts, int64_t row_number,
                                   StripeInformation* out) {
    ARROW_RETURN_IF(row_number >= NumberOfRows(),
                    Status::Invalid("Out of bounds row number: ", row_number));

    for (auto it = stripes_.begin(); it != stripes_.end(); it++) {
      if (static_cast<uint64_t>(row_number) >= it->first_row_of_stripe &&
          static_cast<uint64_t>(row_number) < it->first_row_of_stripe + it->num_rows) {
        opts->range(it->offset, it->length);
        *out = *it;
        return Status::OK();
      }
    }

    return Status::Invalid("Invalid row number", row_number);
  }

  Status SelectIndices(liborc::RowReaderOptions* opts,
                       const std::vector<int>& include_indices) {
    std::list<uint64_t> include_indices_list;
    for (auto it = include_indices.begin(); it != include_indices.end(); ++it) {
      ARROW_RETURN_IF(*it < 0, Status::Invalid("Negative field index"));
      include_indices_list.push_back(*it);
    }
    opts->includeTypes(include_indices_list);
    return Status::OK();
  }

  Status SelectNames(liborc::RowReaderOptions* opts,
                     const std::vector<std::string>& include_names) {
    std::list<std::string> include_names_list(include_names.begin(), include_names.end());
    opts->include(include_names_list);
    return Status::OK();
  }

  Status ReadTable(const liborc::RowReaderOptions& row_opts,
                   const std::shared_ptr<Schema>& schema, std::shared_ptr<Table>* out) {
    liborc::RowReaderOptions opts(row_opts);
    std::vector<std::shared_ptr<RecordBatch>> batches(stripes_.size());
    for (size_t stripe = 0; stripe < stripes_.size(); stripe++) {
      opts.range(stripes_[stripe].offset, stripes_[stripe].length);
      RETURN_NOT_OK(ReadBatch(opts, schema, stripes_[stripe].num_rows, &batches[stripe]));
    }
    return Table::FromRecordBatches(schema, std::move(batches)).Value(out);
  }

  Status ReadBatch(const liborc::RowReaderOptions& opts,
                   const std::shared_ptr<Schema>& schema, int64_t nrows,
                   std::shared_ptr<RecordBatch>* out) {
    std::unique_ptr<liborc::RowReader> row_reader;
    std::unique_ptr<liborc::ColumnVectorBatch> batch;

    ORC_BEGIN_CATCH_NOT_OK
    row_reader = reader_->createRowReader(opts);
    batch = row_reader->createRowBatch(std::min(nrows, kReadRowsBatch));
    ORC_END_CATCH_NOT_OK

    std::unique_ptr<RecordBatchBuilder> builder;
    RETURN_NOT_OK(RecordBatchBuilder::Make(schema, pool_, nrows, &builder));

    // The top-level type must be a struct to read into an arrow table
    const auto& struct_batch = checked_cast<liborc::StructVectorBatch&>(*batch);

    const liborc::Type& type = row_reader->getSelectedType();
    while (row_reader->next(*batch)) {
      for (int i = 0; i < builder->num_fields(); i++) {
        RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0,
                                  batch->numElements, builder->GetField(i)));
      }
    }
    RETURN_NOT_OK(builder->Flush(out));
    return Status::OK();
  }

  Status Seek(int64_t row_number) {
    ARROW_RETURN_IF(row_number >= NumberOfRows(),
                    Status::Invalid("Out of bounds row number: ", row_number));

    current_row_ = row_number;
    return Status::OK();
  }

  Status NextStripeReader(int64_t batch_size, const std::vector<int>& include_indices,
                          std::shared_ptr<RecordBatchReader>* out) {
    if (current_row_ >= NumberOfRows()) {
      out->reset();
      return Status::OK();
    }

    liborc::RowReaderOptions opts;
    if (!include_indices.empty()) {
      RETURN_NOT_OK(SelectIndices(&opts, include_indices));
    }
    StripeInformation stripe_info({0, 0, 0, 0});
    RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info));
    std::shared_ptr<Schema> schema;
    RETURN_NOT_OK(ReadSchema(opts, &schema));
    std::unique_ptr<liborc::RowReader> row_reader;

    ORC_BEGIN_CATCH_NOT_OK
    row_reader = reader_->createRowReader(opts);
    row_reader->seekToRow(current_row_);
    current_row_ = stripe_info.first_row_of_stripe + stripe_info.num_rows;
    ORC_END_CATCH_NOT_OK

    *out = std::shared_ptr<RecordBatchReader>(
        new OrcStripeReader(std::move(row_reader), schema, batch_size, pool_));
    return Status::OK();
  }

  Status NextStripeReader(int64_t batch_size, const std::vector<std::string>& include_names,
                          std::shared_ptr<RecordBatchReader>* out) {
    if (current_row_ >= NumberOfRows()) {
      out->reset();
      return Status::OK();
    }

    liborc::RowReaderOptions opts;
    if (!include_names.empty()) {
      RETURN_NOT_OK(SelectNames(&opts, include_names));
    }
    StripeInformation stripe_info({0, 0, 0, 0});
    RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info));
    std::shared_ptr<Schema> schema;
    RETURN_NOT_OK(ReadSchema(opts, &schema));
    std::unique_ptr<liborc::RowReader> row_reader;

    ORC_BEGIN_CATCH_NOT_OK
    row_reader = reader_->createRowReader(opts);
    row_reader->seekToRow(current_row_);
    current_row_ = stripe_info.first_row_of_stripe + stripe_info.num_rows;
    ORC_END_CATCH_NOT_OK

    *out = std::shared_ptr<RecordBatchReader>(
        new OrcStripeReader(std::move(row_reader), schema, batch_size, pool_));
    return Status::OK();
  }

  Status NextStripeReader(int64_t batch_size, std::shared_ptr<RecordBatchReader>* out) {
    std::vector<int> empty_vec;
    return NextStripeReader(batch_size, empty_vec, out);
  }

 private:
  MemoryPool* pool_;
  std::unique_ptr<liborc::Reader> reader_;
  std::vector<StripeInformation> stripes_;
  int64_t current_row_;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
ORCFileReader::Impl 初始化

Arrow/cpp/src/arrow/adapters/orc/adapter.cc

ORCFileReader::ORCFileReader() { impl_.reset(new ORCFileReader::Impl()); }
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/104269
推荐阅读
相关标签
  

闽ICP备14008679号