赞
踩
Arrow/cpp/src/arrow/adapters/orc/adapter.cc
class OrcStripeReader : public RecordBatchReader { public: OrcStripeReader(std::unique_ptr<liborc::RowReader> row_reader, std::shared_ptr<Schema> schema, int64_t batch_size, MemoryPool* pool) : row_reader_(std::move(row_reader)), schema_(schema), pool_(pool), batch_size_{batch_size} {} std::shared_ptr<Schema> schema() const override { return schema_; } Status ReadNext(std::shared_ptr<RecordBatch>* out) override { std::unique_ptr<liborc::ColumnVectorBatch> batch; ORC_CATCH_NOT_OK(batch = row_reader_->createRowBatch(batch_size_)); const liborc::Type& type = row_reader_->getSelectedType(); if (!row_reader_->next(*batch)) { out->reset(); return Status::OK(); } std::unique_ptr<RecordBatchBuilder> builder; RETURN_NOT_OK(RecordBatchBuilder::Make(schema_, pool_, batch->numElements, &builder)); // The top-level type must be a struct to read into an arrow table const auto& struct_batch = checked_cast<liborc::StructVectorBatch&>(*batch); for (int i = 0; i < builder->num_fields(); i++) { RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0, batch->numElements, builder->GetField(i))); } RETURN_NOT_OK(builder->Flush(out)); return Status::OK(); } private: std::unique_ptr<liborc::RowReader> row_reader_; std::shared_ptr<Schema> schema_; MemoryPool* pool_; int64_t batch_size_; };
arrow/cpp/src/arrow/dataset/file_orc.cc
/// \brief A ScanTask backed by an ORC file. class OrcScanTask : public ScanTask { public: OrcScanTask(std::shared_ptr<FileFragment> fragment, std::shared_ptr<ScanOptions> options) : ScanTask(std::move(options), fragment), source_(fragment->source()) {} Result<RecordBatchIterator> Execute() override { struct Impl { static Result<RecordBatchIterator> Make(const FileSource& source, const FileFormat& format, const ScanOptions& scan_options) { ARROW_ASSIGN_OR_RAISE( auto reader, OpenReader(source, std::make_shared<ScanOptions>(scan_options))); int num_stripes = reader->NumberOfStripes(); auto materialized_fields = scan_options.MaterializedFields(); // filter out virtual columns std::vector<std::string> included_fields; ARROW_ASSIGN_OR_RAISE(auto schema, reader->ReadSchema()); for (auto name : materialized_fields) { FieldRef ref(name); ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*schema)); if (match.indices().empty()) continue; included_fields.push_back(name); } std::shared_ptr<RecordBatchReader> recordBatchReader; reader->NextStripeReader(scan_options.batch_size, included_fields, &recordBatchReader); return RecordBatchIterator( Impl{std::move(recordBatchReader)}); } Result<std::shared_ptr<RecordBatch>> Next() { std::shared_ptr<RecordBatch> batch; RETURN_NOT_OK(recordBatchReader_->ReadNext(&batch)); return batch; } std::shared_ptr<RecordBatchReader> recordBatchReader_; }; return Impl::Make(source_, *checked_pointer_cast<FileFragment>(fragment_)->format(), *options_); } private: FileSource source_; };
结构分析
Result<RecordBatchIterator> Execute() override { struct Impl{} return Impl::Make(source_, *checked_pointer_cast<FileFragment>(fragment_)->format(), *options_); } struct Impl { static Result<RecordBatchIterator> Make(const FileSource& source, const FileFormat& format, const ScanOptions& scan_options) { return RecordBatchIterator(Impl{std::move(recordBatchReader)}); } Result<std::shared_ptr<RecordBatch>> Next() std::shared_ptr<RecordBatchReader> recordBatchReader_; }
struct Impl 相当于一个单例
using RecordBatchIterator = Iterator<std::shared_ptr<RecordBatch>>;
arrow/cpp/src/arrow/adapters/orc/adapter.h
brief Read an Arrow Table or RecordBatch from an ORC file.
/// \class ORCFileReader /// \brief Read an Arrow Table or RecordBatch from an ORC file. class ARROW_EXPORT ORCFileReader { public: ~ORCFileReader(); /// \brief Creates a new ORC reader. /// /// \param[in] file the data source /// \param[in] pool a MemoryPool to use for buffer allocations /// \param[out] reader the returned reader object /// \return Status static Status Open(const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool, std::unique_ptr<ORCFileReader>* reader); /// \brief Creates a new ORC reader /// /// \param[in] file the data source /// \param[in] pool a MemoryPool to use for buffer allocations /// \return the returned reader object static Result<std::unique_ptr<ORCFileReader>> Open( const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool); /// \brief Return the schema read from the ORC file /// /// \param[out] out the returned Schema object Status ReadSchema(std::shared_ptr<Schema>* out); /// \brief Return the schema read from the ORC file /// /// \return the returned Schema object Result<std::shared_ptr<Schema>> ReadSchema(); /// \brief Read the file as a Table /// /// The table will be composed of one record batch per stripe. /// /// \param[out] out the returned Table Status Read(std::shared_ptr<Table>* out); /// \brief Read the file as a Table /// /// The table will be composed of one record batch per stripe. /// /// \param[in] schema the Table schema /// \param[out] out the returned Table Status Read(const std::shared_ptr<Schema>& schema, std::shared_ptr<Table>* out); /// \brief Read the file as a Table /// /// The table will be composed of one record batch per stripe. /// /// \param[in] include_indices the selected field indices to read /// \param[out] out the returned Table Status Read(const std::vector<int>& include_indices, std::shared_ptr<Table>* out); /// \brief Read the file as a Table /// /// The table will be composed of one record batch per stripe. /// /// \param[in] include_names the selected field names to read /// \return the returned Table Result<std::shared_ptr<Table>> Read(const std::vector<std::string>& include_names); /// \brief Read the file as a Table /// /// The table will be composed of one record batch per stripe. /// /// \param[in] schema the Table schema /// \param[in] include_indices the selected field indices to read /// \param[out] out the returned Table Status Read(const std::shared_ptr<Schema>& schema, const std::vector<int>& include_indices, std::shared_ptr<Table>* out); /// \brief Read a single stripe as a RecordBatch /// /// \param[in] stripe the stripe index /// \param[out] out the returned RecordBatch Status ReadStripe(int64_t stripe, std::shared_ptr<RecordBatch>* out); /// \brief Read a single stripe as a RecordBatch /// /// \param[in] stripe the stripe index /// \return the returned RecordBatch Result<std::shared_ptr<RecordBatch>> ReadStripe(int64_t stripe); /// \brief Read a single stripe as a RecordBatch /// /// \param[in] stripe the stripe index /// \param[in] include_indices the selected field indices to read /// \param[out] out the returned RecordBatch Status ReadStripe(int64_t stripe, const std::vector<int>& include_indices, std::shared_ptr<RecordBatch>* out); /// \brief Read a single stripe as a RecordBatch /// /// \param[in] stripe the stripe index /// \param[in] include_names the selected field names to read /// \return the returned RecordBatch Result<std::shared_ptr<RecordBatch>> ReadStripe( int64_t stripe, const std::vector<std::string>& include_names); /// \brief Seek to designated row. Invoke NextStripeReader() after seek /// will return stripe reader starting from designated row. /// /// \param[in] row_number the rows number to seek Status Seek(int64_t row_number); /// \brief Get a stripe level record batch iterator with specified row count /// in each record batch. NextStripeReader serves as a fine grain /// alternative to ReadStripe which may cause OOM issue by loading /// the whole stripes into memory. /// /// \param[in] batch_size the number of rows each record batch contains in /// record batch iteration. /// \param[out] out the returned stripe reader Status NextStripeReader(int64_t batch_size, std::shared_ptr<RecordBatchReader>* out); /// \brief Get a stripe level record batch iterator with specified row count /// in each record batch. NextStripeReader serves as a fine grain /// alternative to ReadStripe which may cause OOM issue by loading /// the whole stripes into memory. /// /// \param[in] batch_size Get a stripe level record batch iterator with specified row /// count in each record batch. /// /// \param[in] include_indices the selected field indices to read /// \param[out] out the returned stripe reader Status NextStripeReader(int64_t batch_size, const std::vector<int>& include_indices, std::shared_ptr<RecordBatchReader>* out); /// \brief Get a stripe level record batch iterator with specified row count /// in each record batch. NextStripeReader serves as a fine grain /// alternative to ReadStripe which may cause OOM issue by loading /// the whole stripes into memory. /// /// \param[in] batch_size Get a stripe level record batch iterator with specified row /// count in each record batch. /// /// \param[in] include_indices the selected field names to read /// \param[out] out the returned stripe reader Status NextStripeReader(int64_t batch_size, const std::vector<std::string>& include_names, std::shared_ptr<RecordBatchReader>* out); /// \brief The number of stripes in the file int64_t NumberOfStripes(); /// \brief The number of rows in the file int64_t NumberOfRows(); private: class Impl; std::unique_ptr<Impl> impl_; ORCFileReader(); };
Arrow/cpp/src/arrow/adapters/orc/adapter.cc
Result<std::shared_ptr<RecordBatch>> ORCFileReader::ReadStripe(
int64_t stripe, const std::vector<std::string>& include_names) {
std::shared_ptr<RecordBatch> recordBatch;
RETURN_NOT_OK(impl_->ReadStripe(stripe, include_names, &recordBatch));
return recordBatch;
}
Status ReadStripe(int64_t stripe, const std::vector<std::string>& include_names,
std::shared_ptr<RecordBatch>* out) {
liborc::RowReaderOptions opts;
RETURN_NOT_OK(SelectNames(&opts, include_names));
RETURN_NOT_OK(SelectStripe(&opts, stripe));
std::shared_ptr<Schema> schema;
RETURN_NOT_OK(ReadSchema(opts, &schema));
return ReadBatch(opts, schema, stripes_[stripe].num_rows, out);
}
Status ReadBatch(const liborc::RowReaderOptions& opts, const std::shared_ptr<Schema>& schema, int64_t nrows, std::shared_ptr<RecordBatch>* out) { std::unique_ptr<liborc::RowReader> row_reader; std::unique_ptr<liborc::ColumnVectorBatch> batch; ORC_BEGIN_CATCH_NOT_OK row_reader = reader_->createRowReader(opts); batch = row_reader->createRowBatch(std::min(nrows, kReadRowsBatch)); ORC_END_CATCH_NOT_OK std::unique_ptr<RecordBatchBuilder> builder; RETURN_NOT_OK(RecordBatchBuilder::Make(schema, pool_, nrows, &builder)); // The top-level type must be a struct to read into an arrow table const auto& struct_batch = checked_cast<liborc::StructVectorBatch&>(*batch); const liborc::Type& type = row_reader->getSelectedType(); while (row_reader->next(*batch)) { for (int i = 0; i < builder->num_fields(); i++) { RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0, batch->numElements, builder->GetField(i))); } } RETURN_NOT_OK(builder->Flush(out)); return Status::OK(); }
Arrow/cpp/src/arrow/adapters/orc/adapter.cc
Status ORCFileReader::NextStripeReader(int64_t batch_size,
const std::vector<std::string>& include_names,
std::shared_ptr<RecordBatchReader>* out) {
return impl_->NextStripeReader(batch_size, include_names, out);
}
Status NextStripeReader(int64_t batch_size, const std::vector<std::string>& include_names, std::shared_ptr<RecordBatchReader>* out) { if (current_row_ >= NumberOfRows()) { out->reset(); return Status::OK(); } liborc::RowReaderOptions opts; if (!include_names.empty()) { RETURN_NOT_OK(SelectNames(&opts, include_names)); } StripeInformation stripe_info({0, 0, 0, 0}); RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info)); std::shared_ptr<Schema> schema; RETURN_NOT_OK(ReadSchema(opts, &schema)); std::unique_ptr<liborc::RowReader> row_reader; ORC_BEGIN_CATCH_NOT_OK row_reader = reader_->createRowReader(opts); row_reader->seekToRow(current_row_); current_row_ = stripe_info.first_row_of_stripe + stripe_info.num_rows; ORC_END_CATCH_NOT_OK *out = std::shared_ptr<RecordBatchReader>( new OrcStripeReader(std::move(row_reader), schema, batch_size, pool_)); return Status::OK(); }
Status SelectStripeWithRowNumber(liborc::RowReaderOptions* opts, int64_t row_number, StripeInformation* out) { ARROW_RETURN_IF(row_number >= NumberOfRows(), Status::Invalid("Out of bounds row number: ", row_number)); for (auto it = stripes_.begin(); it != stripes_.end(); it++) { if (static_cast<uint64_t>(row_number) >= it->first_row_of_stripe && static_cast<uint64_t>(row_number) < it->first_row_of_stripe + it->num_rows) { opts->range(it->offset, it->length); *out = *it; return Status::OK(); } } return Status::Invalid("Invalid row number", row_number); }
Arrow/cpp/src/arrow/adapters/orc/adapter.cc
class ORCFileReader::Impl { public: Impl() {} ~Impl() {} Status Open(const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool) { std::unique_ptr<ArrowInputFile> io_wrapper(new ArrowInputFile(file)); liborc::ReaderOptions options; std::unique_ptr<liborc::Reader> liborc_reader; ORC_CATCH_NOT_OK(liborc_reader = createReader(std::move(io_wrapper), options)); pool_ = pool; reader_ = std::move(liborc_reader); current_row_ = 0; return Init(); } Status Init() { int64_t nstripes = reader_->getNumberOfStripes(); stripes_.resize(nstripes); std::unique_ptr<liborc::StripeInformation> stripe; uint64_t first_row_of_stripe = 0; for (int i = 0; i < nstripes; ++i) { stripe = reader_->getStripe(i); stripes_[i] = StripeInformation({stripe->getOffset(), stripe->getLength(), stripe->getNumberOfRows(), first_row_of_stripe}); first_row_of_stripe += stripe->getNumberOfRows(); } return Status::OK(); } int64_t NumberOfStripes() { return stripes_.size(); } int64_t NumberOfRows() { return reader_->getNumberOfRows(); } Status ReadSchema(std::shared_ptr<Schema>* out) { const liborc::Type& type = reader_->getType(); return GetArrowSchema(type, out); } Status ReadSchema(const liborc::RowReaderOptions& opts, std::shared_ptr<Schema>* out) { std::unique_ptr<liborc::RowReader> row_reader; ORC_CATCH_NOT_OK(row_reader = reader_->createRowReader(opts)); const liborc::Type& type = row_reader->getSelectedType(); return GetArrowSchema(type, out); } Status GetArrowSchema(const liborc::Type& type, std::shared_ptr<Schema>* out) { if (type.getKind() != liborc::STRUCT) { return Status::NotImplemented( "Only ORC files with a top-level struct " "can be handled"); } int size = static_cast<int>(type.getSubtypeCount()); std::vector<std::shared_ptr<Field>> fields; for (int child = 0; child < size; ++child) { std::shared_ptr<DataType> elemtype; RETURN_NOT_OK(GetArrowType(type.getSubtype(child), &elemtype)); std::string name = type.getFieldName(child); fields.push_back(field(name, elemtype)); } std::list<std::string> keys = reader_->getMetadataKeys(); std::shared_ptr<KeyValueMetadata> metadata; if (!keys.empty()) { metadata = std::make_shared<KeyValueMetadata>(); for (auto it = keys.begin(); it != keys.end(); ++it) { metadata->Append(*it, reader_->getMetadataValue(*it)); } } *out = std::make_shared<Schema>(fields, metadata); return Status::OK(); } Status Read(std::shared_ptr<Table>* out) { liborc::RowReaderOptions opts; std::shared_ptr<Schema> schema; RETURN_NOT_OK(ReadSchema(opts, &schema)); return ReadTable(opts, schema, out); } Status Read(const std::shared_ptr<Schema>& schema, std::shared_ptr<Table>* out) { liborc::RowReaderOptions opts; return ReadTable(opts, schema, out); } Status Read(const std::vector<int>& include_indices, std::shared_ptr<Table>* out) { liborc::RowReaderOptions opts; RETURN_NOT_OK(SelectIndices(&opts, include_indices)); std::shared_ptr<Schema> schema; RETURN_NOT_OK(ReadSchema(opts, &schema)); return ReadTable(opts, schema, out); } Status Read(const std::vector<std::string>& include_names, std::shared_ptr<Table>* out) { liborc::RowReaderOptions opts; RETURN_NOT_OK(SelectNames(&opts, include_names)); std::shared_ptr<Schema> schema; RETURN_NOT_OK(ReadSchema(opts, &schema)); return ReadTable(opts, schema, out); } Status Read(const std::shared_ptr<Schema>& schema, const std::vector<int>& include_indices, std::shared_ptr<Table>* out) { liborc::RowReaderOptions opts; RETURN_NOT_OK(SelectIndices(&opts, include_indices)); return ReadTable(opts, schema, out); } Status ReadStripe(int64_t stripe, std::shared_ptr<RecordBatch>* out) { liborc::RowReaderOptions opts; RETURN_NOT_OK(SelectStripe(&opts, stripe)); std::shared_ptr<Schema> schema; RETURN_NOT_OK(ReadSchema(opts, &schema)); return ReadBatch(opts, schema, stripes_[stripe].num_rows, out); } Status ReadStripe(int64_t stripe, const std::vector<int>& include_indices, std::shared_ptr<RecordBatch>* out) { liborc::RowReaderOptions opts; RETURN_NOT_OK(SelectIndices(&opts, include_indices)); RETURN_NOT_OK(SelectStripe(&opts, stripe)); std::shared_ptr<Schema> schema; RETURN_NOT_OK(ReadSchema(opts, &schema)); return ReadBatch(opts, schema, stripes_[stripe].num_rows, out); } Status ReadStripe(int64_t stripe, const std::vector<std::string>& include_names, std::shared_ptr<RecordBatch>* out) { liborc::RowReaderOptions opts; RETURN_NOT_OK(SelectNames(&opts, include_names)); RETURN_NOT_OK(SelectStripe(&opts, stripe)); std::shared_ptr<Schema> schema; RETURN_NOT_OK(ReadSchema(opts, &schema)); return ReadBatch(opts, schema, stripes_[stripe].num_rows, out); } Status SelectStripe(liborc::RowReaderOptions* opts, int64_t stripe) { ARROW_RETURN_IF(stripe < 0 || stripe >= NumberOfStripes(), Status::Invalid("Out of bounds stripe: ", stripe)); opts->range(stripes_[stripe].offset, stripes_[stripe].length); return Status::OK(); } Status SelectStripeWithRowNumber(liborc::RowReaderOptions* opts, int64_t row_number, StripeInformation* out) { ARROW_RETURN_IF(row_number >= NumberOfRows(), Status::Invalid("Out of bounds row number: ", row_number)); for (auto it = stripes_.begin(); it != stripes_.end(); it++) { if (static_cast<uint64_t>(row_number) >= it->first_row_of_stripe && static_cast<uint64_t>(row_number) < it->first_row_of_stripe + it->num_rows) { opts->range(it->offset, it->length); *out = *it; return Status::OK(); } } return Status::Invalid("Invalid row number", row_number); } Status SelectIndices(liborc::RowReaderOptions* opts, const std::vector<int>& include_indices) { std::list<uint64_t> include_indices_list; for (auto it = include_indices.begin(); it != include_indices.end(); ++it) { ARROW_RETURN_IF(*it < 0, Status::Invalid("Negative field index")); include_indices_list.push_back(*it); } opts->includeTypes(include_indices_list); return Status::OK(); } Status SelectNames(liborc::RowReaderOptions* opts, const std::vector<std::string>& include_names) { std::list<std::string> include_names_list(include_names.begin(), include_names.end()); opts->include(include_names_list); return Status::OK(); } Status ReadTable(const liborc::RowReaderOptions& row_opts, const std::shared_ptr<Schema>& schema, std::shared_ptr<Table>* out) { liborc::RowReaderOptions opts(row_opts); std::vector<std::shared_ptr<RecordBatch>> batches(stripes_.size()); for (size_t stripe = 0; stripe < stripes_.size(); stripe++) { opts.range(stripes_[stripe].offset, stripes_[stripe].length); RETURN_NOT_OK(ReadBatch(opts, schema, stripes_[stripe].num_rows, &batches[stripe])); } return Table::FromRecordBatches(schema, std::move(batches)).Value(out); } Status ReadBatch(const liborc::RowReaderOptions& opts, const std::shared_ptr<Schema>& schema, int64_t nrows, std::shared_ptr<RecordBatch>* out) { std::unique_ptr<liborc::RowReader> row_reader; std::unique_ptr<liborc::ColumnVectorBatch> batch; ORC_BEGIN_CATCH_NOT_OK row_reader = reader_->createRowReader(opts); batch = row_reader->createRowBatch(std::min(nrows, kReadRowsBatch)); ORC_END_CATCH_NOT_OK std::unique_ptr<RecordBatchBuilder> builder; RETURN_NOT_OK(RecordBatchBuilder::Make(schema, pool_, nrows, &builder)); // The top-level type must be a struct to read into an arrow table const auto& struct_batch = checked_cast<liborc::StructVectorBatch&>(*batch); const liborc::Type& type = row_reader->getSelectedType(); while (row_reader->next(*batch)) { for (int i = 0; i < builder->num_fields(); i++) { RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0, batch->numElements, builder->GetField(i))); } } RETURN_NOT_OK(builder->Flush(out)); return Status::OK(); } Status Seek(int64_t row_number) { ARROW_RETURN_IF(row_number >= NumberOfRows(), Status::Invalid("Out of bounds row number: ", row_number)); current_row_ = row_number; return Status::OK(); } Status NextStripeReader(int64_t batch_size, const std::vector<int>& include_indices, std::shared_ptr<RecordBatchReader>* out) { if (current_row_ >= NumberOfRows()) { out->reset(); return Status::OK(); } liborc::RowReaderOptions opts; if (!include_indices.empty()) { RETURN_NOT_OK(SelectIndices(&opts, include_indices)); } StripeInformation stripe_info({0, 0, 0, 0}); RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info)); std::shared_ptr<Schema> schema; RETURN_NOT_OK(ReadSchema(opts, &schema)); std::unique_ptr<liborc::RowReader> row_reader; ORC_BEGIN_CATCH_NOT_OK row_reader = reader_->createRowReader(opts); row_reader->seekToRow(current_row_); current_row_ = stripe_info.first_row_of_stripe + stripe_info.num_rows; ORC_END_CATCH_NOT_OK *out = std::shared_ptr<RecordBatchReader>( new OrcStripeReader(std::move(row_reader), schema, batch_size, pool_)); return Status::OK(); } Status NextStripeReader(int64_t batch_size, const std::vector<std::string>& include_names, std::shared_ptr<RecordBatchReader>* out) { if (current_row_ >= NumberOfRows()) { out->reset(); return Status::OK(); } liborc::RowReaderOptions opts; if (!include_names.empty()) { RETURN_NOT_OK(SelectNames(&opts, include_names)); } StripeInformation stripe_info({0, 0, 0, 0}); RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info)); std::shared_ptr<Schema> schema; RETURN_NOT_OK(ReadSchema(opts, &schema)); std::unique_ptr<liborc::RowReader> row_reader; ORC_BEGIN_CATCH_NOT_OK row_reader = reader_->createRowReader(opts); row_reader->seekToRow(current_row_); current_row_ = stripe_info.first_row_of_stripe + stripe_info.num_rows; ORC_END_CATCH_NOT_OK *out = std::shared_ptr<RecordBatchReader>( new OrcStripeReader(std::move(row_reader), schema, batch_size, pool_)); return Status::OK(); } Status NextStripeReader(int64_t batch_size, std::shared_ptr<RecordBatchReader>* out) { std::vector<int> empty_vec; return NextStripeReader(batch_size, empty_vec, out); } private: MemoryPool* pool_; std::unique_ptr<liborc::Reader> reader_; std::vector<StripeInformation> stripes_; int64_t current_row_; };
Arrow/cpp/src/arrow/adapters/orc/adapter.cc
ORCFileReader::ORCFileReader() { impl_.reset(new ORCFileReader::Impl()); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。