当前位置:   article > 正文

Nginx upstream实战_nginx拼接upstream字符串

nginx拼接upstream字符串

实现功能:

即以访问mytest模块的URL参数作为搜索引擎的关键字, 用upstream方式访

问百度, 查询URL里的参数, 然后把百度的结果返回给用户。 这个场景非常适合使用upstream方式, 因为Nginx访问google的服务器使用的是HTTP, 它当然符合upstream的使用场景: 上游服务器提供基于TCP的协议

配置参数:

1.每一个HTTP请求都会有独立的ngx_http_upstream_conf_t结构体, 出于简单考虑, 在mytest模块的例子中, 所有的请求都将共享同一个ngx_http_upstream_conf_t结构体, 因此, 这里把它放ngx_http_mytest_conf_t配置结构体中, 如下所示:

  1. typedef struct {
  2. ngx_http_upstream_conf_t upstream;
  3. } ngx_http_mytest_conf_t;

在启动upstream前, 先将ngx_http_mytest_conf_t下的upstream成员赋给r->upstream->conf成员

2.init ngx_http_upstream_conf_t结构中的各成员可以通用预设的配置项解析参数来赋值.

  1. static void* ngx_http_mytest_create_loc_conf(ngx_conf_t *cf)
  2. {
  3. ngx_http_mytest_conf_t *mycf;
  4. mycf = (ngx_http_mytest_conf_t *)ngx_pcalloc(cf->pool, sizeof(ngx_http_mytest_conf_t));
  5. if (mycf == NULL)
  6. {
  7. return NULL;
  8. }
  9. //以下简单的硬编码ngx_http_upstream_conf_t结构中的各成员,例如
  10. //超时时间都设为1分钟。这也是http反向代理模块的默认值
  11. mycf->upstream.connect_timeout = 60000;
  12. mycf->upstream.send_timeout = 60000;
  13. mycf->upstream.read_timeout = 60000;
  14. mycf->upstream.store_access = 0600;
  15. //实际上buffering已经决定了将以固定大小的内存作为缓冲区来转发上游的
  16. //响应包体,这块固定缓冲区的大小就是buffer_size。如果buffering为1
  17. //就会使用更多的内存缓存来不及发往下游的响应,例如最多使用bufs.num个
  18. //缓冲区、每个缓冲区大小为bufs.size,另外还会使用临时文件,临时文件的
  19. //最大长度为max_temp_file_size
  20. mycf->upstream.buffering = 0;
  21. mycf->upstream.bufs.num = 8;
  22. mycf->upstream.bufs.size = ngx_pagesize;
  23. mycf->upstream.buffer_size = ngx_pagesize;
  24. mycf->upstream.busy_buffers_size = 2 * ngx_pagesize;
  25. mycf->upstream.temp_file_write_size = 2 * ngx_pagesize;
  26. mycf->upstream.max_temp_file_size = 1024 * 1024 * 1024;
  27. //upstream模块要求hide_headers成员必须要初始化(upstream在解析
  28. //完上游服务器返回的包头时,会调用
  29. //ngx_http_upstream_process_headers方法按照hide_headers成员将
  30. //本应转发给下游的一些http头部隐藏),这里将它赋为
  31. //NGX_CONF_UNSET_PTR ,是为了在merge合并配置项方法中使用
  32. //upstream模块提供的ngx_http_upstream_hide_headers_hash
  33. //方法初始化hide_headers 成员
  34. mycf->upstream.hide_headers = NGX_CONF_UNSET_PTR;
  35. mycf->upstream.pass_headers = NGX_CONF_UNSET_PTR;
  36. return mycf;
  37. }

note1:upstream中的时间都以毫秒为时间单位

note2:

unsigned buffering:1在向客户端转发上游服务器的包体时才有用。 当buffering为1时, 表示使用多个缓冲区以及磁盘文件来转发上游的响应包体。当Nginx与上游间的网速远大于Nginx与下游客户端间的网速时, 让Nginx开辟更多的内存甚至使用磁盘文件来缓存上游的响应包体, 这是有意义的, 它可以减轻上游服务器的并发压力。

当buffering为0时, 表示只使用上面的这一个buffer缓冲区来向下游转发响应包体

note3:hide_headers的类型是ngx_array_t动态数组(实际上, upstream模块将会通过hide_headers来构造hide_headers_hash散列表) 。 由于upstream模块要求hide_headers不可以为NULL, 所以必须要初始化hide_headers成员。upstream模块提供了ngx_http_upstream_hide_headers_hash方法

来初始化hide_headers, 但仅可用在合并配置项方法内

  1. static char *ngx_http_mytest_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
  2. {
  3. ngx_http_mytest_conf_t *prev = (ngx_http_mytest_conf_t *)parent;
  4. ngx_http_mytest_conf_t *conf = (ngx_http_mytest_conf_t *)child;
  5. ngx_hash_init_t hash;
  6. hash.max_size = 100;
  7. hash.bucket_size = 1024;
  8. hash.name = "proxy_headers_hash";
  9. if (ngx_http_upstream_hide_headers_hash(cf, &conf->upstream,
  10. &prev->upstream, ngx_http_proxy_hide_headers, &hash)
  11. != NGX_OK)
  12. {
  13. return NGX_CONF_ERROR;
  14. }
  15. return NGX_CONF_OK;
  16. }

请求上下文

本次介绍的例子就必须要使用上下文才能正确地解析upstream上游服务器的响应包, 因为upstream模块每次接收到一段TCP流时都会回调mytest模块实现的process_header方法解析,这样就需要有一个上下文保存解析状态。 在解析HTTP响应行时, 可以使用HTTP框架提供的ngx_http_status_t结构

  1. typedef struct {
  2. ngx_uint_t code;
  3. ngx_uint_t count;
  4. u_char *start;
  5. u_char *end;
  6. } ngx_http_statu
  7. typedef struct {
  8. ngx_http_status_t status;
  9. } ngx_http_mytest_ctx_t;

具体三大回调实现

create_request

这里定义的mytest_upstream_create_request方法用于创建发送给上游服务器的HTTP请求, upstream模块将会回调它, 实现如下

  1. static ngx_int_t
  2. mytest_upstream_create_request(ngx_http_request_t *r)
  3. {
  4. //发往百度上游服务器的请求很简单,就是模仿正常的搜索请求,
  5. //以/search?q=…的URL来发起搜索请求。backendQueryLine中的%V等转化格式的用法,
  6. static ngx_str_t backendQueryLine =
  7. ngx_string("GET /search?q=%V HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: close\r\n\r\n");
  8. ngx_int_t queryLineLen = backendQueryLine.len + r->args.len - 2;
  9. ngx_buf_t* b = ngx_create_temp_buf(r->pool, queryLineLen);
  10. if (b == NULL)
  11. return NGX_ERROR;
  12. //last要指向请求的末尾
  13. b->last = b->pos + queryLineLen;
  14. //作用相当于snprintf,
  15. ngx_snprintf(b->pos, queryLineLen ,
  16. (char*)backendQueryLine.data, &r->args);
  17. // r->upstream->request_bufs是一个ngx_chain_t结构,它包含着要
  18. //发送给上游服务器的请求
  19. r->upstream->request_bufs = ngx_alloc_chain_link(r->pool);
  20. if (r->upstream->request_bufs == NULL)
  21. return NGX_ERROR;
  22. // request_bufs这里只包含1个ngx_buf_t缓冲区
  23. r->upstream->request_bufs->buf = b;
  24. r->upstream->request_bufs->next = NULL;
  25. r->upstream->request_sent = 0;
  26. r->upstream->header_sent = 0;
  27. // header_hash不可以为0
  28. r->header_hash = 1;
  29. return NGX_OK;
  30. }

note1: 转换ngx_str_t类型,%V对应的参数必须是ngx_str_t变量的地址。它将会按照ngx_str_t类型的len长度来输出data字符串

note2:必须由内存池中申请内存,这有两点好处:在网络情况不佳的情况下,向上游服务器发送请求时,可能需要epoll多次调度send发送才能完成,这时必须保证这段内存不会被释放;请求结束时,这段内存会被自动释放,降低内存泄漏的可能

note3:r->upstream->request_bufs = ngx_alloc_chain_link(r->pool)

将内存空间link到这次请求的链条上

实现process_heade

process_header负责解析上游服务器发来的基于TCP的包头, 在本例中, 就是解析HTTP响应行和HTTP头部, 因此, 这里使用mytest_process_status_line方法解析HTTP响应行, 使用mytest_upstream_process_header方法解析http响应头部。 之所以使用两个方法解析包头, 这也是HTTP的复杂性造成的, 因为无论是响应行还是响应头部都是不定长的, 都需要使用状态机来解析。 实际上, 这两个方法也是通用的, 它们适用于解析所有的HTTP响应包, 而且这个方法的代码ngx_http_proxy_module模块的实现几乎是完全一致的。

解析行

  1. static ngx_int_t
  2. mytest_process_status_line(ngx_http_request_t *r)
  3. {
  4. size_t len;
  5. ngx_int_t rc;
  6. ngx_http_upstream_t *u;
  7. //上下文中才会保存多次解析http响应行的状态,首先取出请求的上下文
  8. ngx_http_mytest_ctx_t* ctx = ngx_http_get_module_ctx(r, ngx_http_mytest_module);
  9. if (ctx == NULL)
  10. {
  11. return NGX_ERROR;
  12. }
  13. u = r->upstream;
  14. //http框架提供的ngx_http_parse_status_line方法可以解析http
  15. //响应行,它的输入就是收到的字符流和上下文中的ngx_http_status_t结构
  16. rc = ngx_http_parse_status_line(r, &u->buffer, &ctx->status);
  17. //返回NGX_AGAIN表示还没有解析出完整的http响应行,需要接收更多的
  18. //字符流再来解析
  19. if (rc == NGX_AGAIN)
  20. {
  21. return rc;
  22. }
  23. //返回NGX_ERROR则没有接收到合法的http响应行
  24. if (rc == NGX_ERROR)
  25. {
  26. ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
  27. "upstream sent no valid HTTP/1.0 header");
  28. r->http_version = NGX_HTTP_VERSION_9;
  29. u->state->status = NGX_HTTP_OK;
  30. return NGX_OK;
  31. }
  32. //以下表示解析到完整的http响应行,这时会做一些简单的赋值操作,将解析出
  33. //的信息设置到r->upstream->headers_in结构体中,upstream解析完所
  34. //有的包头时,就会把headers_in中的成员设置到将要向下游发送的
  35. //r->headers_out结构体中,也就是说,现在我们向headers_in中设置的
  36. //信息,最终都会发往下游客户端。为什么不是直接设置r->headers_out而要
  37. //这样多此一举呢?这是因为upstream希望能够按照
  38. //ngx_http_upstream_conf_t配置结构体中的hide_headers等成员对
  39. //发往下游的响应头部做统一处理
  40. if (u->state)
  41. {
  42. u->state->status = ctx->status.code;
  43. }
  44. u->headers_in.status_n = ctx->status.code;
  45. len = ctx->status.end - ctx->status.start;
  46. u->headers_in.status_line.len = len;
  47. u->headers_in.status_line.data = ngx_pnalloc(r->pool, len);
  48. if (u->headers_in.status_line.data == NULL)
  49. {
  50. return NGX_ERROR;
  51. }
  52. ngx_memcpy(u->headers_in.status_line.data, ctx->status.start, len);
  53. //下一步将开始解析http头部,设置process_header回调方法为
  54. //mytest_upstream_process_header,
  55. //之后再收到的新字符流将由mytest_upstream_process_header解析
  56. u->process_header = mytest_upstream_process_header;
  57. //如果本次收到的字符流除了http响应行外,还有多余的字符,
  58. //将由mytest_upstream_process_header方法解析
  59. return mytest_upstream_process_header(r);
  60. }

note:上游服务器发送的HTTP头部添加到了请求r->upstream->headers_in.headers链表中。如果有需要特殊处理的HTTP头部,那么也应该在mytest_upstream_process_header方法中进行。

解析头部

  1. static ngx_int_t
  2. mytest_upstream_process_header(ngx_http_request_t *r)
  3. {
  4. ngx_int_t rc;
  5. ngx_table_elt_t *h;
  6. ngx_http_upstream_header_t *hh;
  7. ngx_http_upstream_main_conf_t *umcf;
  8. //这里将upstream模块配置项ngx_http_upstream_main_conf_t取了
  9. //出来,目的只有1个,对将要转发给下游客户端的http响应头部作统一
  10. //处理。该结构体中存储了需要做统一处理的http头部名称和回调方法
  11. umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);
  12. //循环的解析所有的http头部
  13. for ( ;; )
  14. {
  15. // http框架提供了基础性的ngx_http_parse_header_line
  16. //方法,它用于解析http头部
  17. rc = ngx_http_parse_header_line(r, &r->upstream->buffer, 1);
  18. //返回NGX_OK表示解析出一行http头部
  19. if (rc == NGX_OK)
  20. {
  21. //向headers_in.headers这个ngx_list_t链表中添加http头部
  22. h = ngx_list_push(&r->upstream->headers_in.headers);
  23. if (h == NULL)
  24. {
  25. return NGX_ERROR;
  26. }
  27. //以下开始构造刚刚添加到headers链表中的http头部
  28. h->hash = r->header_hash;
  29. h->key.len = r->header_name_end - r->header_name_start;
  30. h->value.len = r->header_end - r->header_start;
  31. //必须由内存池中分配存放http头部的内存
  32. h->key.data = ngx_pnalloc(r->pool,
  33. h->key.len + 1 + h->value.len + 1 + h->key.len);
  34. if (h->key.data == NULL)
  35. {
  36. return NGX_ERROR;
  37. }
  38. h->value.data = h->key.data + h->key.len + 1;
  39. h->lowcase_key = h->key.data + h->key.len + 1 + h->value.len + 1;
  40. ngx_memcpy(h->key.data, r->header_name_start, h->key.len);
  41. h->key.data[h->key.len] = '\0';
  42. ngx_memcpy(h->value.data, r->header_start, h->value.len);
  43. h->value.data[h->value.len] = '\0';
  44. if (h->key.len == r->lowcase_index)
  45. {
  46. ngx_memcpy(h->lowcase_key, r->lowcase_header, h->key.len);
  47. }
  48. else
  49. {
  50. ngx_strlow(h->lowcase_key, h->key.data, h->key.len);
  51. }
  52. //upstream模块会对一些http头部做特殊处理
  53. hh = ngx_hash_find(&umcf->headers_in_hash, h->hash,
  54. h->lowcase_key, h->key.len);
  55. if (hh && hh->handler(r, h, hh->offset) != NGX_OK)
  56. {
  57. return NGX_ERROR;
  58. }
  59. continue;
  60. }
  61. //返回NGX_HTTP_PARSE_HEADER_DONE表示响应中所有的http头部都解析
  62. //完毕,接下来再接收到的都将是http包体
  63. if (rc == NGX_HTTP_PARSE_HEADER_DONE)
  64. {
  65. //如果之前解析http头部时没有发现server和date头部,以下会
  66. //根据http协议添加这两个头部
  67. if (r->upstream->headers_in.server == NULL)
  68. {
  69. h = ngx_list_push(&r->upstream->headers_in.headers);
  70. if (h == NULL)
  71. {
  72. return NGX_ERROR;
  73. }
  74. h->hash = ngx_hash(ngx_hash(ngx_hash(ngx_hash(
  75. ngx_hash('s', 'e'), 'r'), 'v'), 'e'), 'r');
  76. ngx_str_set(&h->key, "Server");
  77. ngx_str_null(&h->value);
  78. h->lowcase_key = (u_char *) "server";
  79. }
  80. if (r->upstream->headers_in.date == NULL)
  81. {
  82. h = ngx_list_push(&r->upstream->headers_in.headers);
  83. if (h == NULL)
  84. {
  85. return NGX_ERROR;
  86. }
  87. h->hash = ngx_hash(ngx_hash(ngx_hash('d', 'a'), 't'), 'e');
  88. ngx_str_set(&h->key, "Date");
  89. ngx_str_null(&h->value);
  90. h->lowcase_key = (u_char *) "date";
  91. }
  92. return NGX_OK;
  93. }
  94. //如果返回NGX_AGAIN则表示状态机还没有解析到完整的http头部,
  95. //要求upstream模块继续接收新的字符流再交由process_header
  96. //回调方法解析
  97. if (rc == NGX_AGAIN)
  98. {
  99. return NGX_AGAIN;
  100. }
  101. //其他返回值都是非法的
  102. ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
  103. "upstream sent invalid header");
  104. return NGX_HTTP_UPSTREAM_INVALID_HEADER;
  105. }
  106. }

实现finalize_request

当请求结束时, 将会回调finalize_request方法, 如果我们希望此时释放资源, 如打开的句柄等, 那么可以把这样的代码添加到finalize_request方法中。 本例中定义了mytest_upstream_finalize_request方法, 由于我们没有任何需要释放的资源, 所以该方法没有完成任何实际工作, 只是因为upstream模块要求必须实现finalize_request回调方法

  1. static void
  2. mytest_upstream_finalize_request(ngx_http_request_t *r, ngx_int_t rc) {
  3. ngx_log_error(NGX_LOG_DEBUG, r->connection->log,0, "mytest_upstream_finalize_request"); }

在ngx_http_mytest_handler方法中启动upstream

在开始介入处理客户端请求的ngx_http_mytest_handler方法中启动upstream机制, 而何时请求会结束, 则视Nginx与上游的百度服务器间的通信而定。 通常, 在启动upstream时, 我们将决定以何种方式处理上游响应的包体, 前文说过, 我们会原封不动地转发百度的响应包体到客户端, 这一行为是由ngx_http_request_t结构体中的subrequest_in_memory标志位决定的, 默认情况subrequest_in_memory为0, 即表示将转发上游的包体到下游。 上面介绍过, 当ngx_http_upstream_conf_t结构体中的buffering标志位为0时, 意味着以固定大小的缓冲区来转发包体

  1. static char *
  2. ngx_http_mytest(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
  3. {
  4. ngx_http_core_loc_conf_t *clcf;
  5. //首先找到mytest配置项所属的配置块,clcf貌似是location块内的数据
  6. //结构,其实不然,它可以是main、srv或者loc级别配置项,也就是说在每个
  7. //http{}和server{}内也都有一个ngx_http_core_loc_conf_t结构体
  8. clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
  9. //http框架在处理用户请求进行到NGX_HTTP_CONTENT_PHASE阶段时,如果
  10. //请求的主机域名、URI与mytest配置项所在的配置块相匹配,就将调用我们
  11. //实现的ngx_http_mytest_handler方法处理这个请求
  12. clcf->handler = ngx_http_mytest_handler;
  13. return NGX_CONF_OK;
  14. }
  15. static ngx_int_t
  16. ngx_http_mytest_handler(ngx_http_request_t *r)
  17. {
  18. //首先建立http上下文结构体ngx_http_mytest_ctx_t
  19. ngx_http_mytest_ctx_t* myctx = ngx_http_get_module_ctx(r, ngx_http_mytest_module);
  20. if (myctx == NULL)
  21. {
  22. myctx = ngx_palloc(r->pool, sizeof(ngx_http_mytest_ctx_t));
  23. if (myctx == NULL)
  24. {
  25. return NGX_ERROR;
  26. }
  27. //将新建的上下文与请求关联起来
  28. ngx_http_set_ctx(r, myctx, ngx_http_mytest_module);
  29. }
  30. //对每1个要使用upstream的请求,必须调用且只能调用1次
  31. //ngx_http_upstream_create方法,它会初始化r->upstream成员
  32. if (ngx_http_upstream_create(r) != NGX_OK)
  33. {
  34. ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "ngx_http_upstream_create() failed");
  35. return NGX_ERROR;
  36. }
  37. //得到配置结构体ngx_http_mytest_conf_t
  38. ngx_http_mytest_conf_t *mycf = (ngx_http_mytest_conf_t *) ngx_http_get_module_loc_conf(r, ngx_http_mytest_module);
  39. ngx_http_upstream_t *u = r->upstream;
  40. //这里用配置文件中的结构体来赋给r->upstream->conf成员
  41. u->conf = &mycf->upstream;
  42. //决定转发包体时使用的缓冲区
  43. u->buffering = mycf->upstream.buffering;
  44. //以下代码开始初始化resolved结构体,用来保存上游服务器的地址
  45. u->resolved = (ngx_http_upstream_resolved_t*) ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_resolved_t));
  46. if (u->resolved == NULL)
  47. {
  48. ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
  49. "ngx_pcalloc resolved error. %s.", strerror(errno));
  50. return NGX_ERROR;
  51. }
  52. //这里的上游服务器就是www.google.com
  53. static struct sockaddr_in backendSockAddr;
  54. struct hostent *pHost = gethostbyname((char*) "www.google.com");
  55. if (pHost == NULL)
  56. {
  57. ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
  58. "gethostbyname fail. %s", strerror(errno));
  59. return NGX_ERROR;
  60. }
  61. //访问上游服务器的80端口
  62. backendSockAddr.sin_family = AF_INET;
  63. backendSockAddr.sin_port = htons((in_port_t) 80);
  64. char* pDmsIP = inet_ntoa(*(struct in_addr*) (pHost->h_addr_list[0]));
  65. backendSockAddr.sin_addr.s_addr = inet_addr(pDmsIP);
  66. myctx->backendServer.data = (u_char*)pDmsIP;
  67. myctx->backendServer.len = strlen(pDmsIP);
  68. //将地址设置到resolved成员中
  69. u->resolved->sockaddr = (struct sockaddr *)&backendSockAddr;
  70. u->resolved->socklen = sizeof(struct sockaddr_in);
  71. u->resolved->naddrs = 1;
  72. //设置三个必须实现的回调方法,
  73. u->create_request = mytest_upstream_create_request;
  74. u->process_header = mytest_process_status_line;
  75. u->finalize_request = mytest_upstream_finalize_request;
  76. //这里必须将count成员加1,
  77. r->main->count++;
  78. //启动upstream
  79. ngx_http_upstream_init(r);
  80. //必须返回NGX_DONE
  81. return NGX_DONE;
  82. }

配置文件

  1. #user nobody;
  2. worker_processes 1;
  3. error_log logs/error.log debug;
  4. events {
  5. worker_connections 1024;
  6. }
  7. http {
  8. include mime.types;
  9. default_type application/octet-stream;
  10. #log_format main '$remote_addr - $remote_user [$time_local] "$request" '
  11. # '$status $body_bytes_sent "$http_referer" '
  12. # '"$http_user_agent" "$http_x_forwarded_for"';
  13. #access_log logs/access.log main;
  14. keepalive_timeout 65;
  15. server {
  16. listen 8080;
  17. location /test {
  18. mytest;
  19. }
  20. }
  21. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/192626
推荐阅读
相关标签
  

闽ICP备14008679号