当前位置:   article > 正文

构建一个简单的数据库系列(六)cursor抽象_用cursor.so完整开发一下项目

用cursor.so完整开发一下项目

英文连接:https://cstack.github.io/db_tutorial/parts/part6.html

为了实现B-tree,这一节先对当前的实现进行一点重构。

我们增加一个概念:Cursor(游标),代表了对象在数据库中的位置。那么关于cursor有几件事需要完成

1、在表之前创建cursor

2、在表之后创建cursor

3、访问cursor所指向的行

4、访问cursor的下一行

完成这些之后,我们还会继续实现:

1、使用cursor删除行

2、使用cursor修改行

3、使用ID查询表,并且在在该行后创建cursor

在没有实现ado,Cursor类型暂时按如下定义:

  1. 批注:ADO 连接对象(ADO Connection Object)
  2. ADO 连接对象用来创建到某个数据源的开放连接。通过此连接,您可以对此数据库进行访问和操作。
  3. 查看此连接对象的所有方法和属性
  1. struct Cursor_t {
  2. Table* table;
  3. uint32_t row_num;
  4. bool end_of_table; // Indicates a position one past the last element
  5. };
  6. typedef struct Cursor_t Cursor;

table_start() and table_end() create new cursors:

  1. Cursor* table_start(Table* table) {
  2. Cursor* cursor = malloc(sizeof(Cursor));
  3. cursor->table = table;
  4. cursor->row_num = 0;
  5. cursor->end_of_table = (table->num_rows == 0);
  6. return cursor;
  7. }
  8. Cursor* table_end(Table* table) {
  9. Cursor* cursor = malloc(sizeof(Cursor));
  10. cursor->table = table;
  11. cursor->row_num = table->num_rows;
  12. cursor->end_of_table = true;
  13. return cursor;
  14. }

row_slot()修改为cursor_value(), 该函数作用:指向了sursor所执行的位置

定义1个函数cursor_advance,实现对num_rows加1。

  1. void cursor_advance(Cursor* cursor) {
  2. cursor->row_num = 1;
  3. if (cursor->row_num >= cursor->table->num_rows) {
  4. cursor->end_of_table = true;
  5. }
  6. }

最后,我们修改“virtual machine”,改用抽象的对象:Cursor。当插入一行时,我们打开一个Cursor,指向表尾。在cursor后写入后,关闭Cursor。

  1. Row* row_to_insert = &(statement->row_to_insert);
  2. + Cursor* cursor = table_end(table);
  3. - serialize_row(row_to_insert, row_slot(table, table->num_rows));
  4. + serialize_row(row_to_insert, cursor_value(cursor));
  5. table->num_rows += 1;
  6. + free(cursor);
  7. +
  8. return EXECUTE_SUCCESS;
  9. }

同理,修改execute_select的实现,使用cursor替换row_slot

  1. ExecuteResult execute_select(Statement* statement, Table* table) {
  2. + Cursor* cursor = table_start(table);
  3. +
  4. Row row;
  5. - for (uint32_t i = 0; i < table->num_rows; i++) {
  6. - deserialize_row(row_slot(table, i), &row);
  7. + while (!(cursor->end_of_table)) {
  8. + deserialize_row(cursor_value(cursor), &row);
  9. print_row(&row);
  10. + cursor_advance(cursor);
  11. }
  12. +
  13. + free(cursor);
  14. +
  15. return EXECUTE_SUCCESS;
  16. }

至此,execute_select和execute_insert就不需要在做任何关于表存储的假设,就可以通过cursor和table进行交互了。

还是使用上节的用例测试下:

  1. db > insert 1 cstack foo@bar.com
  2. Executed.
  3. db > insert 2 hello hello@126.com
  4. Executed.
  5. db > select
  6. (1, cstack, foo@bar.com)
  7. (2, hello, hello@126.com)
  8. (1, cstack, foo@bar.com)
  9. (2, hello, hello@126.com)
  10. Executed.
  11. db >

至此最新代码:

  1. #include <errno.h>
  2. #include <fcntl.h>
  3. #include <stdbool.h>
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <string.h>
  7. #include <unistd.h>
  8. /* 定义 元数据操作结果*/
  9. enum MetaCommandResult_t {
  10. META_COMMAND_SUCCESS,
  11. META_COMMAND_UNRECOGNIZED_COMMAND
  12. };
  13. typedef enum MetaCommandResult_t MetaCommandResult;
  14. /* 执行结果*/
  15. enum ExecuteResult_t { EXECUTE_SUCCESS, EXECUTE_TABLE_FULL };
  16. typedef enum ExecuteResult_t ExecuteResult;
  17. /* sql解析结果 */
  18. enum PrepareResult_t {
  19. PREPARE_SUCCESS,
  20. PREPARE_NEGATIVE_ID,
  21. PREPARE_STRING_TOO_LONG,
  22. PREPARE_SYNTAX_ERROR,
  23. PREPARE_UNRECOGNIZED_STATEMENT
  24. };
  25. typedef enum PrepareResult_t PrepareResult;
  26. /* 行定义,对应具体的业务 */
  27. const uint32_t COLUMN_USERNAME_SIZE = 32;
  28. const uint32_t COLUMN_EMAIL_SIZE = 255;
  29. struct Row_t {
  30. uint32_t id;
  31. char username[COLUMN_USERNAME_SIZE + 1];
  32. char email[COLUMN_EMAIL_SIZE + 1];
  33. };
  34. typedef struct Row_t Row;
  35. /* sql type*/
  36. enum StatementType_t{
  37. STATEMENT_INSERT,
  38. STATEMENT_SELECT
  39. };
  40. typedef enum StatementType_t StatementType;
  41. struct Statement_t {
  42. StatementType type;
  43. Row row_to_insert; /* only used by insert statement */
  44. };
  45. typedef struct Statement_t Statement;
  46. /**/
  47. #define size_of_attribute(Struct, Attribute) sizeof(((Struct*)0)->Attribute)
  48. const uint32_t ID_SIZE = size_of_attribute(Row, id);
  49. const uint32_t USERNAME_SIZE = size_of_attribute(Row, username);
  50. const uint32_t EMAIL_SIZE = size_of_attribute(Row, email);
  51. const uint32_t ID_OFFSET = 0;
  52. const uint32_t USERNAME_OFFSET = ID_OFFSET + ID_SIZE;
  53. const uint32_t EMAIL_OFFSET = USERNAME_OFFSET + USERNAME_SIZE;
  54. const uint32_t ROW_SIZE = ID_SIZE + USERNAME_SIZE + EMAIL_SIZE;
  55. /* 大部分系统结构的页大小都是4K ,所以这里也定义为4K 这样就不用做转换 */
  56. const uint32_t PAGE_SIZE = 4096;
  57. const uint32_t TABLE_MAX_PAGES = 100;
  58. const uint32_t ROWS_PER_PAGE = PAGE_SIZE / ROW_SIZE;
  59. const uint32_t TABLE_MAX_ROWS = ROWS_PER_PAGE * TABLE_MAX_PAGES;
  60. /*
  61. * 页和表定义
  62. */
  63. struct Pager_t {
  64. int file_descriptor;
  65. uint32_t file_length;
  66. void *pages[TABLE_MAX_PAGES];
  67. };
  68. typedef struct Pager_t Pager;
  69. struct Table_t {
  70. Pager *pager;
  71. uint32_t num_rows;
  72. };
  73. typedef struct Table_t Table;
  74. /* 序列化: 将row写到内存中 */
  75. void serialize_row(Row* source, void* destination) {
  76. memcpy(destination + ID_OFFSET, &(source->id), ID_SIZE);
  77. memcpy(destination + USERNAME_OFFSET, &(source->username), USERNAME_SIZE);
  78. memcpy(destination + EMAIL_OFFSET, &(source->email), EMAIL_SIZE);
  79. }
  80. /* 反序列化 */
  81. void deserialize_row(void* source, Row* destination) {
  82. memcpy(&(destination->id), source + ID_OFFSET, ID_SIZE);
  83. memcpy(&(destination->username), source + USERNAME_OFFSET, USERNAME_SIZE);
  84. memcpy(&(destination->email), source + EMAIL_OFFSET, EMAIL_SIZE);
  85. }
  86. /**
  87. * 获取页num对应的页
  88. * @param table
  89. * @param row_num
  90. * @return
  91. */
  92. void *get_page(Pager *pager, uint32_t page_num)
  93. {
  94. if (page_num > TABLE_MAX_PAGES) {
  95. printf("Tried to fetch page number out of bounds. %d > %d\n", page_num,
  96. TABLE_MAX_PAGES);
  97. exit(EXIT_FAILURE);
  98. }
  99. if (pager->pages[page_num] == NULL) {
  100. // Cache miss. Allocate memory and load from file.
  101. void *page = malloc(PAGE_SIZE);
  102. uint32_t num_pages = pager->file_length / PAGE_SIZE;
  103. // We might save a partial page at the end of the file
  104. if (pager->file_length % PAGE_SIZE) {
  105. num_pages += 1;
  106. }
  107. if (page_num <= num_pages) {
  108. lseek(pager->file_descriptor, page_num * PAGE_SIZE, SEEK_SET);
  109. ssize_t bytes_read = read(pager->file_descriptor, page, PAGE_SIZE);
  110. if (bytes_read == -1) {
  111. printf("Error reading file: %d\n", errno);
  112. exit(EXIT_FAILURE);
  113. }
  114. }
  115. pager->pages[page_num] = page;
  116. }
  117. return pager->pages[page_num];
  118. }
  119. /**
  120. * 打开pager
  121. * @param filename
  122. * @return
  123. */
  124. Pager *pager_open(const char *filename)
  125. {
  126. int fd = open(filename,
  127. O_RDWR | // Read/Write mode
  128. O_CREAT, // Create file if it does not exist
  129. S_IWUSR | // User write permission
  130. S_IRUSR // User read permission
  131. );
  132. if (fd == -1) {
  133. printf("Unable to open file\n");
  134. exit(EXIT_FAILURE);
  135. }
  136. off_t file_length = lseek(fd, 0, SEEK_END);
  137. Pager *pager = malloc(sizeof(Pager));
  138. pager->file_descriptor = fd;
  139. pager->file_length = file_length;
  140. for (uint32_t i = 0; i < TABLE_MAX_PAGES; i++) {
  141. pager->pages[i] = NULL;
  142. }
  143. return pager;
  144. }
  145. /**
  146. * Cursor定义
  147. */
  148. struct Cursor_t {
  149. Table* table;
  150. uint32_t row_num;
  151. bool end_of_table; // Indicates a position one past the last element
  152. };
  153. typedef struct Cursor_t Cursor;
  154. /**
  155. * Cursor api
  156. * @param row
  157. */
  158. Cursor* table_start(Table* table) {
  159. Cursor* cursor = malloc(sizeof(Cursor));
  160. cursor->table = table;
  161. cursor->row_num = 0;
  162. cursor->end_of_table = (table->num_rows == 0);
  163. return cursor;
  164. }
  165. Cursor* table_end(Table* table) {
  166. Cursor* cursor = malloc(sizeof(Cursor));
  167. cursor->table = table;
  168. cursor->row_num = table->num_rows;
  169. cursor->end_of_table = true;
  170. return cursor;
  171. }
  172. /**
  173. * 计算插入位置, 行插入槽
  174. */
  175. void* cursor_value(Cursor* cursor)
  176. {
  177. uint32_t row_num = cursor->row_num;
  178. uint32_t page_num = row_num / ROWS_PER_PAGE;
  179. void* page = get_page(cursor->table->pager, page_num);
  180. uint32_t row_offset = row_num % ROWS_PER_PAGE;
  181. uint32_t byte_offset = row_offset * ROW_SIZE;
  182. return page + byte_offset;
  183. }
  184. /**
  185. * rownum 加1
  186. * @param cursor
  187. */
  188. void cursor_advance(Cursor* cursor)
  189. {
  190. cursor->row_num += 1;
  191. if (cursor->row_num >= cursor->table->num_rows) {
  192. cursor->end_of_table = true;
  193. }
  194. }
  195. void print_row(Row* row) {
  196. printf("(%d, %s, %s)\n", row->id, row->username, row->email);
  197. }
  198. /**
  199. * 打开数据库文件并建立连接
  200. * @param filename
  201. * @return
  202. */
  203. Table *db_open(const char *filename)
  204. {
  205. Pager *pager = pager_open(filename);
  206. uint32_t num_rows = pager->file_length / ROW_SIZE;
  207. Table *table = malloc(sizeof(Table));
  208. table->num_rows = 0;
  209. table->pager = pager;
  210. table->num_rows = num_rows;
  211. return table;
  212. }
  213. /* 接收输入*/
  214. struct InputBuffer_t {
  215. char* buffer;
  216. size_t buffer_length;
  217. ssize_t input_length;
  218. };
  219. typedef struct InputBuffer_t InputBuffer;
  220. /* 初始化buffer */
  221. InputBuffer* new_input_buffer()
  222. {
  223. InputBuffer* input_buffer = malloc(sizeof(InputBuffer));
  224. input_buffer->buffer = NULL;
  225. input_buffer->buffer_length = 0;
  226. input_buffer->input_length = 0;
  227. return input_buffer;
  228. }
  229. void print_prompt() { printf("db > "); }
  230. /* 按行从标准输入读取 */
  231. void read_input(InputBuffer* input_buffer)
  232. {
  233. ssize_t bytes_read =
  234. getline(&(input_buffer->buffer), &(input_buffer->buffer_length), stdin);
  235. if (bytes_read <= 0) {
  236. printf("Error reading input\n");
  237. exit(EXIT_FAILURE);
  238. }
  239. // Ignore trailing newline
  240. input_buffer->input_length = bytes_read - 1;
  241. input_buffer->buffer[bytes_read - 1] = 0;
  242. }
  243. /**
  244. * 刷新页到文件(指定页号)
  245. * @param pager
  246. * @param page_num
  247. * @param size
  248. */
  249. void pager_flush(Pager *pager, uint32_t page_num, uint32_t size)
  250. {
  251. if (pager->pages[page_num] == NULL) {
  252. printf("Tried to flush null page\n");
  253. exit(EXIT_FAILURE);
  254. }
  255. off_t offset = lseek(pager->file_descriptor, page_num * PAGE_SIZE, SEEK_SET);
  256. if (offset == -1) {
  257. printf("Error seeking: %d\n", errno);
  258. exit(EXIT_FAILURE);
  259. }
  260. ssize_t bytes_written =
  261. write(pager->file_descriptor, pager->pages[page_num], size);
  262. if (bytes_written == -1) {
  263. printf("Error writing: %d\n", errno);
  264. exit(EXIT_FAILURE);
  265. }
  266. }
  267. /**
  268. * 关闭数据连接
  269. */
  270. void db_close(Table *table) {
  271. Pager *pager = table->pager;
  272. uint32_t num_full_pages = table->num_rows / ROWS_PER_PAGE;
  273. for (uint32_t i = 0; i < num_full_pages; i++) {
  274. if (pager->pages[i] == NULL) {
  275. continue;
  276. }
  277. pager_flush(pager, i, PAGE_SIZE);
  278. free(pager->pages[i]);
  279. pager->pages[i] = NULL;
  280. }
  281. // There may be a partial page to write to the end of the file
  282. // This should not be needed after we switch to a B-tree
  283. uint32_t num_additional_rows = table->num_rows % ROWS_PER_PAGE;
  284. if (num_additional_rows > 0) {
  285. uint32_t page_num = num_full_pages;
  286. if (pager->pages[page_num] != NULL) {
  287. pager_flush(pager, page_num, num_additional_rows * ROW_SIZE);
  288. free(pager->pages[page_num]);
  289. pager->pages[page_num] = NULL;
  290. }
  291. }
  292. int result = close(pager->file_descriptor);
  293. if (result == -1) {
  294. printf("Error closing db file.\n");
  295. exit(EXIT_FAILURE);
  296. }
  297. for (uint32_t i = 0; i < TABLE_MAX_PAGES; i++) {
  298. void *page = pager->pages[i];
  299. if (page) {
  300. free(page);
  301. pager->pages[i] = NULL;
  302. }
  303. }
  304. free(pager);
  305. }
  306. /* 元数据命令处理 */
  307. MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table* table)
  308. {
  309. if (strcmp(input_buffer->buffer, ".exit") == 0) {
  310. db_close(table);
  311. exit(EXIT_SUCCESS);
  312. } else {
  313. return META_COMMAND_UNRECOGNIZED_COMMAND;
  314. }
  315. }
  316. /* insert解析和校验*/
  317. PrepareResult prepare_insert(InputBuffer *input_buffer, Statement *statement) {
  318. statement->type = STATEMENT_INSERT;
  319. char *keyword = strtok(input_buffer->buffer, " ");
  320. char *id_string = strtok(NULL, " ");
  321. char *username = strtok(NULL, " ");
  322. char *email = strtok(NULL, " ");
  323. if (id_string == NULL || username == NULL || email == NULL) {
  324. return PREPARE_SYNTAX_ERROR;
  325. }
  326. int id = atoi(id_string);
  327. if (id < 0) {
  328. return PREPARE_NEGATIVE_ID;
  329. }
  330. if (strlen(username) > COLUMN_USERNAME_SIZE) {
  331. return PREPARE_STRING_TOO_LONG;
  332. }
  333. if (strlen(email) > COLUMN_EMAIL_SIZE) {
  334. return PREPARE_STRING_TOO_LONG;
  335. }
  336. statement->row_to_insert.id = id;
  337. strcpy(statement->row_to_insert.username, username);
  338. strcpy(statement->row_to_insert.email, email);
  339. return PREPARE_SUCCESS;
  340. }
  341. /* sql解析 */
  342. PrepareResult prepare_statement(InputBuffer* input_buffer,Statement* statement)
  343. {
  344. if (strncasecmp(input_buffer->buffer, "insert", 6) == 0) {
  345. return prepare_insert(input_buffer, statement);
  346. }
  347. if (strncasecmp(input_buffer->buffer, "select", 6) == 0) {
  348. statement->type = STATEMENT_SELECT;
  349. return PREPARE_SUCCESS;
  350. }
  351. return PREPARE_UNRECOGNIZED_STATEMENT;
  352. }
  353. /* 执行insert*/
  354. ExecuteResult execute_insert(Statement *statement, Table *table)
  355. {
  356. if (table->num_rows >= TABLE_MAX_ROWS) {
  357. return EXECUTE_TABLE_FULL;
  358. }
  359. Row *row_to_insert = &(statement->row_to_insert);
  360. Cursor* cursor = table_end(table);
  361. serialize_row(row_to_insert, cursor_value(cursor));
  362. table->num_rows += 1;
  363. free(cursor);
  364. return EXECUTE_SUCCESS;
  365. }
  366. /* 执行查询*/
  367. ExecuteResult execute_select(Statement *statement, Table *table)
  368. {
  369. Row row;
  370. Cursor* cursor = table_start(table);
  371. while (!(cursor->end_of_table)) {
  372. deserialize_row(cursor_value(cursor), &row);
  373. print_row(&row);
  374. cursor_advance(cursor);
  375. }
  376. free(cursor);
  377. return EXECUTE_SUCCESS;
  378. }
  379. /* sql执行*/
  380. ExecuteResult execute_statement(Statement* statement , Table* table)
  381. {
  382. switch (statement->type)
  383. {
  384. case (STATEMENT_INSERT):
  385. return execute_insert(statement, table);
  386. case (STATEMENT_SELECT):
  387. return execute_select(statement, table);
  388. }
  389. }
  390. /* 主函数*/
  391. int main(int argc, char* argv[])
  392. {
  393. if (argc < 2) {
  394. printf("Must supply a database filename.\n");
  395. exit(EXIT_FAILURE);
  396. }
  397. char *filename = argv[1];
  398. Table *table = db_open(filename);
  399. InputBuffer *input_buffer = new_input_buffer();
  400. while (true)
  401. {
  402. print_prompt();
  403. read_input(input_buffer);
  404. if (input_buffer->buffer[0] == '.')
  405. {
  406. switch (do_meta_command(input_buffer,table))
  407. {
  408. case (META_COMMAND_SUCCESS):
  409. continue;
  410. case (META_COMMAND_UNRECOGNIZED_COMMAND):
  411. printf("Unrecognized command '%s'\n", input_buffer->buffer);
  412. continue;
  413. }
  414. }
  415. Statement statement;
  416. switch (prepare_statement(input_buffer, &statement))
  417. {
  418. case (PREPARE_SUCCESS):
  419. break;
  420. case (PREPARE_NEGATIVE_ID):
  421. printf("ID must be positive.\n");
  422. continue;
  423. case (PREPARE_STRING_TOO_LONG):
  424. printf("String is too long.\n");
  425. continue;
  426. case (PREPARE_SYNTAX_ERROR):
  427. printf("Syntax error. Could not parse statement.\n");
  428. continue;
  429. case (PREPARE_UNRECOGNIZED_STATEMENT):
  430. printf("Unrecognized keyword at start of '%s'.\n",
  431. input_buffer->buffer);
  432. continue;
  433. }
  434. switch (execute_statement(&statement, table))
  435. {
  436. case (EXECUTE_SUCCESS):
  437. printf("Executed.\n");
  438. break;
  439. case (EXECUTE_TABLE_FULL):
  440. printf("Error: Table full.\n");
  441. break;
  442. }
  443. }
  444. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/324274
推荐阅读
相关标签
  

闽ICP备14008679号