赞
踩
arrow/cpp/src/arrow/record_batch.h
/// \class RecordBatch /// \brief Collection of equal-length arrays matching a particular Schema /// /// A record batch is table-like data structure that is semantically a sequence /// of fields, each a contiguous Arrow array class ARROW_EXPORT RecordBatch { public: virtual ~RecordBatch() = default; /// \param[in] schema The record batch schema /// \param[in] num_rows length of fields in the record batch. Each array /// should have the same length as num_rows /// \param[in] columns the record batch fields as vector of arrays static std::shared_ptr<RecordBatch> Make(std::shared_ptr<Schema> schema, int64_t num_rows, std::vector<std::shared_ptr<Array>> columns); /// \brief Construct record batch from vector of internal data structures /// \since 0.5.0 /// /// This class is intended for internal use, or advanced users. /// /// \param schema the record batch schema /// \param num_rows the number of semantic rows in the record batch. This /// should be equal to the length of each field /// \param columns the data for the batch's columns static std::shared_ptr<RecordBatch> Make( std::shared_ptr<Schema> schema, int64_t num_rows, std::vector<std::shared_ptr<ArrayData>> columns); /// \brief Convert record batch to struct array /// /// Create a struct array whose child arrays are the record batch's columns. /// Note that the record batch's top-level field metadata cannot be reflected /// in the resulting struct array. Result<std::shared_ptr<StructArray>> ToStructArray() const; /// \brief Construct record batch from struct array /// /// This constructs a record batch using the child arrays of the given /// array, which must be a struct array. Note that the struct array's own /// null bitmap is not reflected in the resulting record batch. static Result<std::shared_ptr<RecordBatch>> FromStructArray( const std::shared_ptr<Array>& array); /// \brief Determine if two record batches are exactly equal /// /// \param[in] other the RecordBatch to compare with /// \param[in] check_metadata if true, check that Schema metadata is the same /// \return true if batches are equal bool Equals(const RecordBatch& other, bool check_metadata = false) const; /// \brief Determine if two record batches are approximately equal bool ApproxEquals(const RecordBatch& other) const; // \return the table's schema /// \return true if batches are equal const std::shared_ptr<Schema>& schema() const { return schema_; } /// \brief Retrieve all columns at once std::vector<std::shared_ptr<Array>> columns() const; /// \brief Retrieve an array from the record batch /// \param[in] i field index, does not boundscheck /// \return an Array object virtual std::shared_ptr<Array> column(int i) const = 0; /// \brief Retrieve an array from the record batch /// \param[in] name field name /// \return an Array or null if no field was found std::shared_ptr<Array> GetColumnByName(const std::string& name) const; /// \brief Retrieve an array's internal data from the record batch /// \param[in] i field index, does not boundscheck /// \return an internal ArrayData object virtual std::shared_ptr<ArrayData> column_data(int i) const = 0; /// \brief Retrieve all arrays' internal data from the record batch. virtual ArrayDataVector column_data() const = 0; /// \brief Add column to the record batch, producing a new RecordBatch /// /// \param[in] i field index, which will be boundschecked /// \param[in] field field to be added /// \param[in] column column to be added virtual Result<std::shared_ptr<RecordBatch>> AddColumn( int i, const std::shared_ptr<Field>& field, const std::shared_ptr<Array>& column) const = 0; /// \brief Add new nullable column to the record batch, producing a new /// RecordBatch. /// /// For non-nullable columns, use the Field-based version of this method. /// /// \param[in] i field index, which will be boundschecked /// \param[in] field_name name of field to be added /// \param[in] column column to be added virtual Result<std::shared_ptr<RecordBatch>> AddColumn( int i, std::string field_name, const std::shared_ptr<Array>& column) const; /// \brief Replace a column in the table, producing a new Table virtual Result<std::shared_ptr<RecordBatch>> SetColumn( int i, const std::shared_ptr<Field>& field, const std::shared_ptr<Array>& column) const = 0; /// \brief Remove column from the record batch, producing a new RecordBatch /// /// \param[in] i field index, does boundscheck virtual Result<std::shared_ptr<RecordBatch>> RemoveColumn(int i) const = 0; virtual std::shared_ptr<RecordBatch> ReplaceSchemaMetadata( const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0; /// \brief Name in i-th column const std::string& column_name(int i) const; /// \return the number of columns in the table int num_columns() const; /// \return the number of rows (the corresponding length of each column) int64_t num_rows() const { return num_rows_; } /// \brief Slice each of the arrays in the record batch /// \param[in] offset the starting offset to slice, through end of batch /// \return new record batch virtual std::shared_ptr<RecordBatch> Slice(int64_t offset) const; /// \brief Slice each of the arrays in the record batch /// \param[in] offset the starting offset to slice /// \param[in] length the number of elements to slice from offset /// \return new record batch virtual std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) const = 0; /// \return PrettyPrint representation suitable for debugging std::string ToString() const; /// \brief Perform cheap validation checks to determine obvious inconsistencies /// within the record batch's schema and internal data. /// /// This is O(k) where k is the total number of fields and array descendents. /// /// \return Status virtual Status Validate() const; /// \brief Perform extensive validation checks to determine inconsistencies /// within the record batch's schema and internal data. /// /// This is potentially O(k*n) where n is the number of rows. /// /// \return Status virtual Status ValidateFull() const; protected: RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows); std::shared_ptr<Schema> schema_; int64_t num_rows_; private: ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch); };
arrow/cpp/src/arrow/record_batch.cc
std::shared_ptr<RecordBatch> RecordBatch::Make(
std::shared_ptr<Schema> schema, int64_t num_rows,
std::vector<std::shared_ptr<ArrayData>> columns) {
DCHECK_EQ(schema->num_fields(), static_cast<int>(columns.size()));
return std::make_shared<SimpleRecordBatch>(std::move(schema), num_rows,
std::move(columns));
}
arrow/cpp/src/arrow/record_batch.cc
/// \class SimpleRecordBatch /// \brief A basic, non-lazy in-memory record batch class SimpleRecordBatch : public RecordBatch { public: SimpleRecordBatch(std::shared_ptr<Schema> schema, int64_t num_rows, std::vector<std::shared_ptr<Array>> columns) : RecordBatch(std::move(schema), num_rows), boxed_columns_(std::move(columns)) { columns_.resize(boxed_columns_.size()); for (size_t i = 0; i < columns_.size(); ++i) { columns_[i] = boxed_columns_[i]->data(); } } SimpleRecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, std::vector<std::shared_ptr<ArrayData>> columns) : RecordBatch(std::move(schema), num_rows), columns_(std::move(columns)) { boxed_columns_.resize(schema_->num_fields()); } std::shared_ptr<Array> column(int i) const override { std::shared_ptr<Array> result = internal::atomic_load(&boxed_columns_[i]); if (!result) { result = MakeArray(columns_[i]); internal::atomic_store(&boxed_columns_[i], result); } return result; } std::shared_ptr<ArrayData> column_data(int i) const override { return columns_[i]; } ArrayDataVector column_data() const override { return columns_; } Result<std::shared_ptr<RecordBatch>> AddColumn( int i, const std::shared_ptr<Field>& field, const std::shared_ptr<Array>& column) const override { ARROW_CHECK(field != nullptr); ARROW_CHECK(column != nullptr); if (!field->type()->Equals(column->type())) { return Status::TypeError("Column data type ", field->type()->name(), " does not match field data type ", column->type()->name()); } if (column->length() != num_rows_) { return Status::Invalid( "Added column's length must match record batch's length. Expected length ", num_rows_, " but got length ", column->length()); } ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->AddField(i, field)); return RecordBatch::Make(new_schema, num_rows_, internal::AddVectorElement(columns_, i, column->data())); } Result<std::shared_ptr<RecordBatch>> SetColumn( int i, const std::shared_ptr<Field>& field, const std::shared_ptr<Array>& column) const override { ARROW_CHECK(field != nullptr); ARROW_CHECK(column != nullptr); if (!field->type()->Equals(column->type())) { return Status::TypeError("Column data type ", field->type()->name(), " does not match field data type ", column->type()->name()); } if (column->length() != num_rows_) { return Status::Invalid( "Added column's length must match record batch's length. Expected length ", num_rows_, " but got length ", column->length()); } ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->SetField(i, field)); return RecordBatch::Make(new_schema, num_rows_, internal::ReplaceVectorElement(columns_, i, column->data())); } Result<std::shared_ptr<RecordBatch>> RemoveColumn(int i) const override { ARROW_ASSIGN_OR_RAISE(auto new_schema, schema_->RemoveField(i)); return RecordBatch::Make(new_schema, num_rows_, internal::DeleteVectorElement(columns_, i)); } std::shared_ptr<RecordBatch> ReplaceSchemaMetadata( const std::shared_ptr<const KeyValueMetadata>& metadata) const override { auto new_schema = schema_->WithMetadata(metadata); return RecordBatch::Make(new_schema, num_rows_, columns_); } std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) const override { std::vector<std::shared_ptr<ArrayData>> arrays; arrays.reserve(num_columns()); for (const auto& field : columns_) { arrays.emplace_back(field->Slice(offset, length)); } int64_t num_rows = std::min(num_rows_ - offset, length); return std::make_shared<SimpleRecordBatch>(schema_, num_rows, std::move(arrays)); } Status Validate() const override { if (static_cast<int>(columns_.size()) != schema_->num_fields()) { return Status::Invalid("Number of columns did not match schema"); } return RecordBatch::Validate(); } private: std::vector<std::shared_ptr<ArrayData>> columns_; // Caching boxed array data mutable std::vector<std::shared_ptr<Array>> boxed_columns_; };
arrow/cpp/src/arrow/ipc/json_simple.cc
Status ArrayFromJSON(const std::shared_ptr<DataType>& type, util::string_view json_string, std::shared_ptr<Array>* out) { std::shared_ptr<Converter> converter; RETURN_NOT_OK(GetConverter(type, &converter)); rj::Document json_doc; json_doc.Parse<kParseFlags>(json_string.data(), json_string.length()); if (json_doc.HasParseError()) { return Status::Invalid("JSON parse error at offset ", json_doc.GetErrorOffset(), ": ", GetParseError_En(json_doc.GetParseError())); } // The JSON document should be an array, append it RETURN_NOT_OK(converter->AppendValues(json_doc)); return converter->Finish(out); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。