赞
踩
《【数据库篇】MySQL源码整体分析》分析了查询的整体流程,《【数据库篇】MySQL源码分析之row_search_mvcc详细分析》分析了Innodb Page的加载逻辑以及索引执行逻辑,接下来分析索引、表关联以及排序是如何执行的。
Mysql的查询、关联、排序都是通过RowIterator的继承类实现的。RefIterator从连接右表中读取指定key的行。SortingIterator对另一个迭代器输出进行排序。BKAIterator通过缓存和索引进行表关联,NestedLoopIterator使用嵌套循环算法连接两个迭代器。
1.1、RowIterator的数据结构,Read()用于数据的读取,对于索引返回的是查询数据,对于排序返回的是排序后的数据,对于表关联返回的是子表查询数据
class RowIterator {
virtual bool Init() = 0;
virtual int Read() = 0;
...
}
1.2、以下是执行的主要逻辑,fields查询字段,通过for循环Read数据,然后send_data结果
//sql_union.cc
bool SELECT_LEX_UNIT::ExecuteIteratorQuery(THD *thd) {
List<Item> *fields = get_field_list(); //查询到的结果
Query_result *query_result = this->query_result();
ha_rows *send_records_ptr;
send_records_ptr = &first_select()->join->send_records; //如果是简单查询
for (;;) {
int error = m_root_iterator->Read();
++*send_records_ptr;
query_result->send_data(thd, *fields) //遍历行
}
thd->current_found_rows = *send_records_ptr;
return query_result->send_eof(thd); //发送结束
}
1.3、RowIterator在optimize阶段创建
bool SELECT_LEX_UNIT::optimize(THD *thd, TABLE *materialize_destination) {
for (SELECT_LEX *sl = first_select(); sl; sl = sl->next_select()) {
sl->optimize(thd) //bool SELECT_LEX::optimize(THD *thd)
->JOIN *const join_local = new (thd->mem_root) JOIN(thd, this);
->join->optimize()
}
if (item != nullptr) {
item->create_iterators(thd); //item是Item_subselect
->indexsubquery_engine->create_iterators(thd); //indexsubquery_engine是subselect_hash_sj_engine
->void subselect_hash_sj_engine::create_iterators(THD *thd)
//通过索引查询
->tab->iterator = NewIterator<RefIterator<false>>(thd, tab->table(), &tab->ref(), false, tab, nullptr);
}
}
bool JOIN::optimize() {
create_iterators();
->create_table_iterators();
->for (unsigned table_idx = const_tables; table_idx < tables; ++table_idx)
->QEP_TAB *qep_tab = &this->qep_tab[table_idx];
//处理order by
->if (qep_tab->filesort)
->qep_tab->iterator = NewIterator<SortingIterator>(qep_tab->join()->thd, qep_tab, qep_tab->filesort, move(iterator), &qep_tab->join()->examined_rows);
->qep_tab->table()->sorting_iterator = down_cast<SortingIterator *>(qep_tab->iterator->real_iterator());
//处理join
->unique_ptr_destroy_only<RowIterator> iterator = create_root_iterator_for_join();
->iterator = ConnectJoins(NO_PLAN_IDX, const_tables, primary_tables, qep_tab, thd, TOP_LEVEL, nullptr, nullptr,nullptr, &unhandled_duplicates, &conditions_depend_on_outer_tables);
}
static unique_ptr_destroy_only<RowIterator> ConnectJoins(plan_idx upper_first_idx, plan_idx first_idx, plan_idx last_idx,QEP_TAB *qep_tabs, THD *thd, CallingContext calling_context,vector<PendingCondition> *pending_conditions,vector<PendingInvalidator> *pending_invalidators,vector<PendingCondition> *pending_join_conditions,qep_tab_map *unhandled_duplicates,table_map *conditions_depend_on_outer_tables){ for (plan_idx i = first_idx; i < last_idx;) { if (substructure == Substructure::OUTER_JOIN || substructure == Substructure::SEMIJOIN) { if (i != first_idx && qep_tabs[i - 1].do_loosescan() && qep_tabs[i - 1].match_tab != i - 1) { iterator = NewIterator<NestedLoopIterator>(thd, move(iterator), move(subtree_iterator), join_type, pfs_batch_mode); ->NestedLoopIterator(THD *thd,unique_ptr_destroy_only<RowIterator> source_outer,unique_ptr_destroy_only<RowIterator> source_inner,JoinType join_type, bool pfs_batch_mode): RowIterator(thd),m_source_outer(move(source_outer)),m_source_inner(move(source_inner)),m_join_type(join_type),m_pfs_batch_mode(pfs_batch_mode) } else if (((UseHashJoin(qep_tab) && !PushedJoinRejectsHashJoin(qep_tab->join(), left_tables, right_tables, join_type) && !right_side_depends_on_outer) || UseBKA(qep_tab)) && !QueryMixesOuterBKAAndBNL(qep_tab->join())) { if (UseBKA(qep_tab)) { iterator = CreateBKAIterator(thd, qep_tab->join(), move(iterator), left_tables, move(subtree_iterator), right_tables, qep_tab->table(), qep_tab->table_ref, &qep_tab->ref(), qep_tab->mrr_iterator, join_type); ->return NewIterator<BKAIterator>(thd, join, move(iterator), left_tables, move(subtree_iterator), thd->variables.join_buff_size, table->file->stats.mrr_length_per_rec, rec_per_key, mrr_iterator, join_type); ->BKAIterator::BKAIterator(THD *thd, JOIN *join, unique_ptr_destroy_only<RowIterator> outer_input,qep_tab_map outer_input_tables,unique_ptr_destroy_only<RowIterator> inner_input,size_t max_memory_available,size_t mrr_bytes_needed_for_single_inner_row,float expected_inner_rows_per_outer_row,MultiRangeRowIterator *mrr_iterator,JoinType join_type) : RowIterator(thd), m_outer_input(move(outer_input)), m_inner_input(move(inner_input)), m_mem_root(key_memory_hash_join, 16384 /* 16 kB */), m_rows(&m_mem_root), m_outer_input_tables(join, outer_input_tables), m_max_memory_available(max_memory_available), m_mrr_bytes_needed_for_single_inner_row(mrr_bytes_needed_for_single_inner_row), m_mrr_iterator(mrr_iterator), m_join_type(join_type) } else { iterator = CreateHashJoinIterator(thd, qep_tab, move(subtree_iterator), right_tables, move(iterator), left_tables, join_type, &join_conditions, conditions_depend_on_outer_tables); } } } } }
construct_lookup_ref(thd(), table(), m_ref)获取外表关联字段值存入m_ref,row_search_mvcc(buf, mode, m_prebuilt, match_mode, 0);从缓存或磁盘中查询数据(即上一篇文章的分析内容)
template <> int RefIterator<false>::Read() { if (m_first_record_since_init) { construct_lookup_ref(thd(), table(), m_ref) ->my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set); ->for (uint part_no = 0; part_no < ref->key_parts; part_no++) ->store_key *s_key = ref->key_copy[part_no]; //to_field.ptr指向m_ref->key_buff ->s_key->copy() ->result = copy_inner(); ->cached_result = store_key_item::copy_inner(); ->TABLE *table = to_field->table; ->type_conversion_status save_res = item->save_in_field(to_field, true); ->const type_conversion_status ret = save_in_field_inner(field, no_conversions); ->return field->store(value.integer, unsigned_flag); ->type_conversion_status Field_long::store(longlong nr, bool unsigned_val) ->res = (int32)(uint32)nr; ->int4store(ptr, res); ->*(T) = (uchar)(A); ->*(T + 1) = (uchar)(A >> 8); ->*(T + 2) = (uchar)(A >> 16); ->*(T + 3) = (uchar)(A >> 24); ->res = type_conversion_status_to_store_key(table->in_use, save_res); ->dbug_tmp_restore_column_map(table->write_set, old_map); pair<uchar *, key_part_map> key_buff_and_map = FindKeyBufferAndMap(m_ref); //key_buff_and_map.first是ref->key_buff //key_buff_and_map.second是make_prev_keypart_map(ref->key_parts) ->return make_pair(ref->key_buff, make_prev_keypart_map(ref->key_parts)); ->#define make_prev_keypart_map(N) (((key_part_map)1 << (N)) - 1) //bitmap with first N bits set //https://blog.csdn.net/u011499425/article/details/52756088 ->std::make_pair // 将两个数据组合成一个数据 int error = table()->file->ha_index_read_map(table()->record[0], key_buff_and_map.first, key_buff_and_map.second,HA_READ_KEY_EXACT); //key是key_buff_and_map.first,key_buff_and_map.second是keypart_map,typedef ulong key_part_map; ->MYSQL_TABLE_IO_WAIT(PSI_TABLE_FETCH_ROW, active_index, result, { result = index_read_map(buf, key, keypart_map, find_flag); }) ->return index_read(buf, key, key_len, find_flag); //将key_ptr转换成m_prebuilt->search_tuple,将key_ptr存入m_prebuilt->srch_key_val1 ->row_sel_convert_mysql_key_to_innobase(m_prebuilt->search_tuple, m_prebuilt->srch_key_val1, m_prebuilt->srch_key_val_len, index, key_ptr, key_len, m_prebuilt->trx); ->ret = row_search_mvcc(buf, mode, m_prebuilt, match_mode, 0); } else { int error = table()->file->ha_index_next_same(table()->record[0], m_ref->key_buff, m_ref->key_length); ->MYSQL_TABLE_IO_WAIT(PSI_TABLE_FETCH_ROW, active_index, result, { result = index_next_same(buf, key, keylen); }) ->int handler::index_next_same(uchar *buf, const uchar *key, uint keylen) } }
RowIterator是层层嵌套的比如这里的m_source_iterator可能就是RefIterator,获取它的查询结果后进行排序,如果数据量少直接在内存中进行排序,如果数据量大则根据sort_buffer_size等参数进行分片存入磁盘最后归并,也有可能单行数据量大于max_length_for_sort_data执行双路运算
bool SortingIterator::Init() {
DoSort()
m_result_iterator.reset(new (&m_result_iterator_holder.sort_buffer)SortBufferIterator<false>(thd(), table, &m_fs_info, &m_sort_result, m_examined_rows));
return m_result_iterator->Init();
}
int SortingIterator::DoSort() {
m_sort_result.io_cache = (IO_CACHE *)my_malloc(key_memory_TABLE_sort_io_cache, sizeof(IO_CACHE), MYF(MY_WME | MY_ZEROFILL));
JOIN *join = m_qep_tab->join();
get_schema_tables_result(join, PROCESSED_BY_CREATE_SORT_INDEX)
bool error = filesort(thd(), m_filesort, m_source_iterator.get(), &m_fs_info, &m_sort_result, &found_rows);
m_qep_tab->set_records(found_rows);
}
@param thd Current thread @param filesort How to sort the table @param source_iterator Where to read the rows to be sorted from. @param fs_info Owns the buffers for sort_result. @param sort_result Where to store the sort result. @param[out] found_rows Store the number of found rows here. This is the number of found rows after applying WHERE condition. bool filesort(THD *thd, Filesort *filesort, RowIterator *source_iterator, Filesort_info *fs_info, Sort_result *sort_result, ha_rows *found_rows) { Sort_param *param = &filesort->m_sort_param; trace_filesort_information(trace, filesort->sortorder, s_length); param->init_for_filesort(filesort, make_array(filesort->sortorder, s_length), sortlength(thd, filesort->sortorder, s_length), table, max_rows, filesort->m_remove_duplicates); fs_info->addon_fields = param->addon_fields; //查询字段 thd->inc_status_sort_scan(); //读数据并生成最小堆或者最大堆 num_rows_found = read_all_rows(thd, param, fs_info, &chunk_file, &tempfile, param->using_pq ? &pq : nullptr, source_iterator, found_rows, &longest_key, &longest_addons); ->for (;;) ->source_iterator->Read() ->++(*found_rows); /******************************* 通过内存直接排序_S *******************************/ ->if (pq) ->pq->push(ref_pos); ->const uint rec_sz = m_sort_param->make_sortkey(m_sort_keys[m_queue.size()], element_size, element); ->return make_sortkey(Bounds_checked_array<uchar>(dst, dst_len), ref_pos, &longest_addons); ->for (sort_field = local_sortorder.begin(); sort_field != local_sortorder.end(); sort_field++) ->if (using_addon_fields()) //addon是select中的字段 // -> ->else //如果采用双路排序 ->memcpy(to, ref_pos, ref_length); ->to += ref_length; ->return to - orig_to; //似乎是最小堆或者最大堆 ->m_queue.push(m_sort_keys[m_queue.size()]); //m_queue是Priority_queue ->m_container.push_back(x); ->reverse_heapify(m_container.size() - 1); ->while (i > 0 && !Base::operator()(m_container[i], m_container[parent(i)])) ->std::swap(m_container[parent(i)], m_container[i]); ->i = parent(i); /******************************* 通过内存直接排序_E *******************************/ /******************************* 通过临时文件排序_S *******************************/ ->else ->bool out_of_mem = alloc_and_make_sortkey(param, fs_info, ref_pos, &key_length, &longest_addon_so_far); ->if (out_of_mem) // Out of room, so flush chunk to disk (if there's anything to flush). ->if (num_records_this_chunk > 0) ->write_keys(param, fs_info, num_records_this_chunk, chunk_file, tempfile) ->count = fs_info->sort_buffer(param, count); ->return filesort_buffer.sort_buffer(param, count); ->const Mem_compare_varlen_key comp(param->local_sortorder, param->use_hash); ->if (force_stable_sort) //https://blog.csdn.net/lycx1234/article/details/54891827 //std::stable_sort ->stable_sort(it_begin, it_end, comp); ->else //https://blog.csdn.net/weixin_41582705/article/details/81354508 //std::sort ->sort(it_begin, it_end, comp); //循环比较,comp是自定义比较函数 ->comp是自定义比较工具 Mem_compare_varlen_key::operator()(const uchar *s1, const uchar *s2) ->return cmp_varlen_keys(sort_field_array, use_hash, s1, s2); ->for (const st_sort_field &sort_field : sort_field_array) ->res = memcmp(kp1, kp2, kp_len); //比较大小等 ->kp1 += kp1_len; ->kp2 += kp2_len; ->for (uint ix = 0; ix < count; ++ix) ->uchar *record = fs_info->get_sorted_record(ix); //将结果写入临时文件 ->my_b_write(tempfile, record, rec_length) ->num_records_this_chunk = 0; ->fs_info->reset(); //重置 // Now we should have room for a new row. ->out_of_mem = alloc_and_make_sortkey(param, fs_info, ref_pos, &key_length, &longest_addon_so_far); ->num_records_this_chunk++; /******************************* 通过临时文件排序_E *******************************/ ->return num_total_records; fs_info->read_chunk_descriptors(&chunk_file, num_chunks); open_cached_file(outfile, mysql_tmpdir, TEMP_PREFIX, READ_RECORD_BUFFER, MYF(MY_WME)) reinit_io_cache(outfile, WRITE_CACHE, 0L, false, false) //合并排序 merge_many_buff(thd, param, merge_buf, fs_info->merge_chunks, &num_chunks, &tempfile) flush_io_cache(&tempfile) //刷入磁盘 if (num_chunks == 0) { //不需要临时文件排序, save_index(param, rows_in_chunk, fs_info, sort_result) ->sort_result->sorted_result.reset(static_cast<uchar *>(my_malloc(key_memory_Filesort_info_record_pointers, buf_size, MYF(MY_WME)))); ->to = sort_result->sorted_result.get() ->for (uint ix = 0; ix < count; ++ix) ->uchar *record = table_sort->get_sorted_record(ix); ->return m_record_pointers[ix]; ->uchar *start_of_payload = param->get_start_of_payload(record); ->memcpy(to, start_of_payload, res_length); ->to += res_length; } else { //如果sort buffer满了,则进行归并排序 //对于归并排序来讲,这里可能会生成另外2个临时文件用于存储最终排序的结果,它们依然以MY开头,且依然是存储在tmpdir参数指定的位置。 //https://www.jianshu.com/p/069428a6594e merge_index(thd, param, merge_buf, Merge_chunk_array(fs_info->merge_chunks.begin(), num_chunks), &tempfile, outfile) } close_cached_file(&tempfile); close_cached_file(&chunk_file); flush_io_cache(outfile) }
关联查询有多种方式,关联方式分为非索引关联查询和索引关联查询,关联细节有分为通过缓存外表批量查询子表数据和非缓存方式,即参数join_buffer_size(以检查对子表的循环次数)。BKAIterator是通过索引进行表关联的,和MultiRangeRowIterator配合使用缓存一批外表数据并排序后顺序加载子表Record以达到利用缓存减少并顺序执行IO提高性能。
4.1、ReadOuterRows();查询外表数据,m_inner_input应该就是MultiRangeRowIterator,调用m_inner_input->Read()批量查询子表数据
int BKAIterator::Read() { for (;;) { switch (m_state) { //从外表一次性读取一批数据 case State::NEED_OUTER_ROWS: { int err = ReadOuterRows(); } //关联表通过外表这一批数据查询数据 case State::RETURNING_JOINED_ROWS: { int err = m_inner_input->Read(); //见3.1,m_inner_input应该就是m_mrr_iterator if (err == 0) { m_mrr_iterator->MarkLastRowAsRead(); return err; } if (m_join_type == JoinType::OUTER || m_join_type == JoinType::ANTI) { m_current_pos = m_rows.begin(); m_mrr_iterator->SetNullRowFlag(true); } else { BatchFinished(); } } } } }
4.2、从外表读取数据存入m_rows数组
1、m_outer_input->Read() 从外表数据
2、table->file->position(table->record[0]) 将数据存入table->file->ref
3、StoreFromTableBuffers(m_outer_input_tables, &m_outer_row_buffer) 将table->file->ref存到m_outer_row_buffer
4、memcpy(row, m_outer_row_buffer.ptr(), row_size) 将m_outer_row_buffer存入row
5、m_rows.push_back(BufferRow(row, row_size));将row存入m_rows数组
4.3、将m_rows给m_mrr_iterator
1、m_mrr_iterator->set_rows(m_rows.begin(), m_rows.end());
2、m_mrr_iterator->set_mrr_buffer(m_mem_root.ArrayAlloc(mrr_buffer_size), mrr_buffer_size);
4.4、初始化m_inner_input
1、m_inner_input->Init()
int BKAIterator::ReadOuterRows() { for (;;) { if (m_has_row_from_previous_batch) { //将m_outer_row_buffer复制给m_outer_input_tables的table->file->ref hash_join_buffer::LoadIntoTableBuffers(m_outer_input_tables,hash_join_buffer::Key(pointer_cast<const uchar *>(m_outer_row_buffer.ptr()), m_outer_row_buffer.length())); } else { int result = m_outer_input->Read(); RequestRowId(m_outer_input_tables.tables()); ->for (const hash_join_buffer::Table &it : tables) ->TABLE *table = it.qep_tab->table(); //table->record[0]复制给ref ->table->file->position(table->record[0]); ->key_copy(ref, (uchar *)record, key_info, key_info->key_length); //将每张表的table->file->ref复制给m_outer_row_buffer StoreFromTableBuffers(m_outer_input_tables, &m_outer_row_buffer) ->bool StoreFromTableBuffers(const TableCollection &tables, String *buffer) ->uchar *dptr = pointer_cast<uchar *>(buffer->ptr()); ->for (const Table &tbl : tables.tables()) ->const TABLE *table = tbl.qep_tab->table(); ->memcpy(dptr, table->file->ref, table->file->ref_length); ->dptr += table->file->ref_length; } if (!m_rows.empty() &&total_bytes_needed_after_this_row > m_max_memory_available) { m_has_row_from_previous_batch = true; break; } uchar *row = m_mem_root.ArrayAlloc<uchar>(row_size); memcpy(row, m_outer_row_buffer.ptr(), row_size); m_rows.push_back(BufferRow(row, row_size)); } m_mrr_iterator->set_rows(m_rows.begin(), m_rows.end()); ->m_begin = begin; ->m_end = end; m_mrr_iterator->set_mrr_buffer(m_mem_root.ArrayAlloc<uchar>(mrr_buffer_size), mrr_buffer_size); m_inner_input->Init() }
4.5、mrr_funcs.next(mrr_iter, &mrr_cur_range)
1、m_current_pos读取数据给mrr_cur_range(BKAIterator::ReadOuterRows()循环读取外表数据存在m_rows中,m_current_pos指向m_rows数组)
4.6、read_range_next(); 或read_range_first(…)
1、根据外表查询的数据,查询子表数据
int MultiRangeRowIterator::Read() {
//BufferRow 从这个名称可以确定rec_ptr是主表缓存数据
BufferRow *rec_ptr = nullptr;
do {
int error = m_file->ha_multi_range_read_next(pointer_cast<char **>(&rec_ptr));
->result = multi_range_read_next(range_info);
->mrr_funcs.next(mrr_iter, &mrr_cur_range)
->1、result = read_range_first(mrr_cur_range.start_key, &mrr_cur_range.end_key, mrr_cur_range.range_flag & EQ_RANGE, mrr_is_output_sorted);
->2、result = read_range_next();
->filter_dup_records()
->position(table->record[0]);
//似乎是读完一批数据读下一批
} while (m_join_type == JoinType::SEMI && RowHasBeenRead(rec_ptr)); //Check whether the given row has been marked as read
return 0;
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。