赞
踩
程序从stdin读取http/https URL,抓取网页并把内容打印到stdout,并将请求和响应的http header打印在stderr。
为了简单起见,程序用Ctrl-C退出,但会保证所有资源先被完全释放。
#include <signal.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <string> #include "workflow/HttpMessage.h" #include "workflow/HttpUtil.h" #include "workflow/WFTaskFactory.h" #ifndef _WIN32 #include <unistd.h> #endif #define REDIRECT_MAX 5 #define RETRY_MAX 2 void wget_callback(WFHttpTask *task) { protocol::HttpRequest *req = task->get_req(); protocol::HttpResponse *resp = task->get_resp(); int state = task->get_state(); int error = task->get_error(); switch (state) { case WFT_STATE_SYS_ERROR: fprintf(stderr, "system error: %s\n", strerror(error)); break; case WFT_STATE_DNS_ERROR: fprintf(stderr, "DNS error: %s\n", gai_strerror(error)); break; case WFT_STATE_SSL_ERROR: fprintf(stderr, "SSL error: %d\n", error); break; case WFT_STATE_TASK_ERROR: fprintf(stderr, "Task error: %d\n", error); break; case WFT_STATE_SUCCESS: break; } if (state != WFT_STATE_SUCCESS) { fprintf(stderr, "Failed. Press Ctrl-C to exit.\n"); return; } std::string name; std::string value; /* Print request. */ fprintf(stderr, "%s %s %s\r\n", req->get_method(), req->get_http_version(), req->get_request_uri()); protocol::HttpHeaderCursor req_cursor(req); while (req_cursor.next(name, value)) fprintf(stderr, "%s: %s\r\n", name.c_str(), value.c_str()); fprintf(stderr, "\r\n"); /* Print response header. */ fprintf(stderr, "%s %s %s\r\n", resp->get_http_version(), resp->get_status_code(), resp->get_reason_phrase()); protocol::HttpHeaderCursor resp_cursor(resp); while (resp_cursor.next(name, value)) fprintf(stderr, "%s: %s\r\n", name.c_str(), value.c_str()); fprintf(stderr, "\r\n"); /* Print response body. */ const void *body; size_t body_len; resp->get_parsed_body(&body, &body_len); fwrite(body, 1, body_len, stdout); fflush(stdout); fprintf(stderr, "\nSuccess. Press Ctrl-C to exit.\n"); } void sig_handler(int signo) { } int main(int argc, char *argv[]) { WFHttpTask *task; if (argc != 2) { fprintf(stderr, "USAGE: %s <http URL>\n", argv[0]); exit(1); } signal(SIGINT, sig_handler); std::string url = argv[1]; if (strncasecmp(argv[1], "http://", 7) != 0 && strncasecmp(argv[1], "https://", 8) != 0) { url = "http://" + url; } task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX, wget_callback); protocol::HttpRequest *req = task->get_req(); req->add_header_pair("Accept", "*/*"); req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)"); req->add_header_pair("Connection", "close"); task->start(); #ifndef _WIN32 pause(); #else getchar(); #endif return 0; }
#HttpTaskImpl.cc WFHttpTask *WFTaskFactory::create_http_task(const std::string& url, int redirect_max, int retry_max, http_callback_t callback) { auto *task = new ComplexHttpTask(redirect_max, retry_max, std::move(callback)); ParsedURI uri; URIParser::parse(url, uri); task->init(std::move(uri)); task->set_keep_alive(HTTP_KEEPALIVE_DEFAULT); return task; }
#HttpTaskImpl.cc class ComplexHttpTask : public WFComplexClientTask<HttpRequest, HttpResponse> { public: ComplexHttpTask(int redirect_max, int retry_max, http_callback_t&& callback): WFComplexClientTask(retry_max, std::move(callback)), redirect_max_(redirect_max), redirect_count_(0) { HttpRequest *client_req = this->get_req(); client_req->set_method(HttpMethodGet); client_req->set_http_version("HTTP/1.1"); } protected: virtual CommMessageOut *message_out(); virtual CommMessageIn *message_in(); virtual int keep_alive_timeout(); virtual bool init_success(); virtual void init_failed(); virtual bool finish_once(); protected: bool need_redirect(ParsedURI& uri); bool redirect_url(HttpResponse *client_resp, ParsedURI& uri); void set_empty_request(); void check_response(); private: int redirect_max_; int redirect_count_; };
#WFTaskFactory.inl template<class REQ, class RESP, typename CTX = bool> class WFComplexClientTask : public WFClientTask<REQ, RESP> { protected: using task_callback_t = std::function<void (WFNetworkTask<REQ, RESP> *)>; public: WFComplexClientTask(int retry_max, task_callback_t&& cb): WFClientTask<REQ, RESP>(NULL, WFGlobal::get_scheduler(), std::move(cb)) { type_ = TT_TCP; fixed_addr_ = false; retry_max_ = retry_max; retry_times_ = 0; redirect_ = false; ns_policy_ = NULL; router_task_ = NULL; } protected: // new api for children virtual bool init_success() { return true; } virtual void init_failed() {} virtual bool check_request() { return true; } virtual WFRouterTask *route(); virtual bool finish_once() { return true; } public: void init(const ParsedURI& uri) { uri_ = uri; init_with_uri(); } void init(ParsedURI&& uri) { uri_ = std::move(uri); init_with_uri(); } void init(TransportType type, const struct sockaddr *addr, socklen_t addrlen, const std::string& info); void set_transport_type(TransportType type) { type_ = type; } TransportType get_transport_type() const { return type_; } virtual const ParsedURI *get_current_uri() const { return &uri_; } void set_redirect(const ParsedURI& uri) { redirect_ = true; init(uri); } void set_redirect(TransportType type, const struct sockaddr *addr, socklen_t addrlen, const std::string& info) { redirect_ = true; init(type, addr, addrlen, info); } bool is_fixed_addr() const { return this->fixed_addr_; } protected: void set_fixed_addr(int fixed) { this->fixed_addr_ = fixed; } void set_info(const std::string& info) { info_.assign(info); } void set_info(const char *info) { info_.assign(info); } protected: virtual void dispatch(); virtual SubTask *done(); void clear_resp() { size_t size = this->resp.get_size_limit(); this->resp.~RESP(); new(&this->resp) RESP(); this->resp.set_size_limit(size); } void disable_retry() { retry_times_ = retry_max_; } protected: TransportType type_; ParsedURI uri_; std::string info_; bool fixed_addr_; bool redirect_; CTX ctx_; int retry_max_; int retry_times_; WFNSPolicy *ns_policy_; WFRouterTask *router_task_; RouteManager::RouteResult route_result_; WFNSTracing tracing_; public: CTX *get_mutable_ctx() { return &ctx_; } private: void clear_prev_state(); void init_with_uri(); bool set_port(); void router_callback(void *t); void switch_callback(void *t); };
#WFTask.inl template<class REQ, class RESP> class WFClientTask : public WFNetworkTask<REQ, RESP> { protected: virtual CommMessageOut *message_out() { /* By using prepare function, users can modify request after * the connection is established. */ if (this->prepare) this->prepare(this); return &this->req; } virtual CommMessageIn *message_in() { return &this->resp; } protected: virtual WFConnection *get_connection() const { CommConnection *conn; if (this->target) { conn = this->CommSession::get_connection(); if (conn) return (WFConnection *)conn; } errno = ENOTCONN; return NULL; } protected: virtual SubTask *done() { SeriesWork *series = series_of(this); if (this->state == WFT_STATE_SYS_ERROR && this->error < 0) { this->state = WFT_STATE_SSL_ERROR; this->error = -this->error; } if (this->callback) this->callback(this); delete this; return series->pop(); } public: void set_prepare(std::function<void (WFNetworkTask<REQ, RESP> *)> prep) { this->prepare = std::move(prep); } protected: std::function<void (WFNetworkTask<REQ, RESP> *)> prepare; public: WFClientTask(CommSchedObject *object, CommScheduler *scheduler, std::function<void (WFNetworkTask<REQ, RESP> *)>&& cb) : WFNetworkTask<REQ, RESP>(object, scheduler, std::move(cb)) { } protected: virtual ~WFClientTask() { } };
#WFTask.h template<class REQ, class RESP> class WFNetworkTask : public CommRequest { public: /* start(), dismiss() are for client tasks only. */ void start() { assert(!series_of(this)); Workflow::start_series_work(this, nullptr); } void dismiss() { assert(!series_of(this)); delete this; } public: REQ *get_req() { return &this->req; } RESP *get_resp() { return &this->resp; } public: void *user_data; public: int get_state() const { return this->state; } int get_error() const { return this->error; } /* Call when error is ETIMEDOUT, return values: * TOR_NOT_TIMEOUT, TOR_WAIT_TIMEOUT, TOR_CONNECT_TIMEOUT, * TOR_TRANSMIT_TIMEOUT (send or receive). * SSL connect timeout also returns TOR_CONNECT_TIMEOUT. */ int get_timeout_reason() const { return this->timeout_reason; } /* Call only in callback or server's process. */ long long get_task_seq() const { if (!this->target) { errno = ENOTCONN; return -1; } return this->get_seq(); } int get_peer_addr(struct sockaddr *addr, socklen_t *addrlen) const; virtual WFConnection *get_connection() const = 0; public: /* All in milliseconds. timeout == -1 for unlimited. */ void set_send_timeout(int timeout) { this->send_timeo = timeout; } void set_receive_timeout(int timeout) { this->receive_timeo = timeout; } void set_keep_alive(int timeout) { this->keep_alive_timeo = timeout; } public: /* Do not reply this request. */ void noreply() { if (this->state == WFT_STATE_TOREPLY) this->state = WFT_STATE_NOREPLY; } /* Push reply data synchronously. */ virtual int push(const void *buf, size_t size) { return this->scheduler->push(buf, size, this); } /* To check if the connection was closed before replying. Always returns 'true' in callback. */ bool closed() const { if (this->state == WFT_STATE_TOREPLY) return !this->get_target()->has_idle_conn(); else return this->state != WFT_STATE_UNDEFINED; } public: void set_callback(std::function<void (WFNetworkTask<REQ, RESP> *)> cb) { this->callback = std::move(cb); } protected: virtual int send_timeout() { return this->send_timeo; } virtual int receive_timeout() { return this->receive_timeo; } virtual int keep_alive_timeout() { return this->keep_alive_timeo; } protected: int send_timeo; int receive_timeo; int keep_alive_timeo; REQ req; RESP resp; std::function<void (WFNetworkTask<REQ, RESP> *)> callback; protected: WFNetworkTask(CommSchedObject *object, CommScheduler *scheduler, std::function<void (WFNetworkTask<REQ, RESP> *)>&& cb) : CommRequest(object, scheduler), callback(std::move(cb)) { this->send_timeo = -1; this->receive_timeo = -1; this->keep_alive_timeo = 0; this->target = NULL; this->timeout_reason = TOR_NOT_TIMEOUT; this->user_data = NULL; this->state = WFT_STATE_UNDEFINED; this->error = 0; } virtual ~WFNetworkTask() { } };
# CommRequest.h class CommRequest : public SubTask, public CommSession { public: CommRequest(CommSchedObject *object, CommScheduler *scheduler) { this->scheduler = scheduler; this->object = object; this->wait_timeout = 0; } CommSchedObject *get_request_object() const { return this->object; } void set_request_object(CommSchedObject *object) { this->object = object; } int get_wait_timeout() const { return this->wait_timeout; } void set_wait_timeout(int timeout) { this->wait_timeout = timeout; } public: virtual void dispatch() { if (this->scheduler->request(this, this->object, this->wait_timeout, &this->target) < 0) { this->handle(CS_STATE_ERROR, errno); } } protected: int state; int error; protected: CommTarget *target; #define TOR_NOT_TIMEOUT 0 #define TOR_WAIT_TIMEOUT 1 #define TOR_CONNECT_TIMEOUT 2 #define TOR_TRANSMIT_TIMEOUT 3 int timeout_reason; protected: int wait_timeout; CommSchedObject *object; CommScheduler *scheduler; protected: virtual void handle(int state, int error); };
(1)应用程序调用start()时,调用到 WFNetworkTask类的
`void start()
{
assert(!series_of(this));
Workflow::start_series_work(this, nullptr);
}`
(2)
inline void
Workflow::start_series_work(SubTask *first, series_callback_t callback)
{
new SeriesWork(first, std::move(callback));
first->dispatch();
}
(3)调用 WFComplexClientTask<REQ, RESP, CTX>::dispatch() (这里为什么不是调用 CommRequest 的 dispatch)
template<class REQ, class RESP, typename CTX> void WFComplexClientTask<REQ, RESP, CTX>::dispatch() { switch (this->state) { case WFT_STATE_UNDEFINED: // 第一次是这个状态 if (this->check_request()) // 这里直接return true { if (this->route_result_.request_object) // 第一次走着初始化是空的,直接到下面产生router_task_ { case WFT_STATE_SUCCESS: // 第二次就直接success了 this->set_request_object(route_result_.request_object); this->WFClientTask<REQ, RESP>::dispatch(); //这里会调用到 CommRequest 的 dispatch return; } // 第一次直接过来了,产生route做dns解析 // 产生一个router_task_插入到前面去做dns解析 router_task_ = this->route(); series_of(this)->push_front(this); series_of(this)->push_front(router_task_); } default: break; } this->subtask_done(); }
(4)CommRequest::dispatch 组成
dns解析完后,
this->WFClientTask<REQ, RESP>::dispatch();
调用CommRequest的dispatch
void CommRequest::dispatch()
{
// 发送请求
this->scheduler->request(this, this->object, this->wait_timeout,
&this->target);
...
}
(5)scheduler 的reques执行的是
/* wait_timeout in microseconds, -1 for no timeout. */ int request(CommSession *session, CommSchedObject *object, int wait_timeout, CommTarget **target) { int ret = -1; *target = object->acquire(wait_timeout); //获取通信target if (*target) { ret = this->comm.request(session, *target); // 调用request去发request请求 if (ret < 0) (*target)->release(); } return ret; }
这里CommTarget 才是通讯目标,基本上就是ip+port, 还有两个超时参数。连接池什么的都在target里
(6)request_idle_conn()为复用连接,没有则创建新的连接 this->request_new_conn
int Communicator::request(CommSession *session, CommTarget *target) { int errno_bak; if (session->passive) { errno = EINVAL; return -1; } errno_bak = errno; session->target = target; session->out = NULL; session->in = NULL; //有可复用连接 if (this->request_idle_conn(session, target) < 0) { //没有可复用连接 if (this->request_new_conn(session, target) < 0) { session->conn = NULL; session->seq = 0; return -1; } } errno = errno_bak; return 0; }
(7)有可复用的连接
int Communicator::request_idle_conn(CommSession *session, CommTarget *target) { struct CommConnEntry *entry; struct list_head *pos; int ret = -1; while (1) { //1. 寻找可以复用的连接 pthread_mutex_lock(&target->mutex); if (!list_empty(&target->idle_list)) { pos = target->idle_list.next; entry = list_entry(pos, struct CommConnEntry, list); list_del(pos); pthread_mutex_lock(&entry->mutex); } else entry = NULL; pthread_mutex_unlock(&target->mutex); if (!entry) { errno = ENOENT; return -1; } if (mpoller_set_timeout(entry->sockfd, -1, this->mpoller) >= 0) break; entry->state = CONN_STATE_CLOSING; pthread_mutex_unlock(&entry->mutex); } //2. 拼凑req请求,添加一些字段 entry->session = session; session->conn = entry->conn; session->seq = entry->seq++; session->out = session->message_out(); //3. 发送消息 if (session->out) ret = this->send_message(entry); if (ret < 0) { entry->error = errno; mpoller_del(entry->sockfd, this->mpoller); entry->state = CONN_STATE_ERROR; ret = 1; } pthread_mutex_unlock(&entry->mutex); return ret; }
(8)没有可复用的连接
int Communicator::request_new_conn(CommSession *session, CommTarget *target) { struct CommConnEntry *entry; struct poller_data data; int timeout; entry = this->launch_conn(session, target); if (entry) { session->conn = entry->conn; session->seq = entry->seq++; data.operation = PD_OP_CONNECT; data.fd = entry->sockfd; data.ssl = NULL; data.context = entry; timeout = session->target->connect_timeout; if (mpoller_add(&data, timeout, this->mpoller) >= 0) return 0; this->release_conn(entry); } return -1; }
(a)如果没有可以复用的连接,我们先去建立连接,然后把connect操作挂到epoll上面监听(异步connect)
struct CommConnEntry *Communicator::launch_conn(CommSession *session, CommTarget *target) { struct CommConnEntry *entry; int sockfd; int ret; //1. connect 建立连接 sockfd = this->nonblock_connect(target); if (sockfd >= 0) { entry = (struct CommConnEntry *)malloc(sizeof (struct CommConnEntry)); if (entry) { ret = pthread_mutex_init(&entry->mutex, NULL); if (ret == 0) { //2. 创建新的CommConnection // 然后初始化entry entry->conn = target->new_connection(sockfd); if (entry->conn) { entry->seq = 0; entry->mpoller = this->mpoller; entry->service = NULL; entry->target = target; entry->session = session; entry->ssl = NULL; entry->sockfd = sockfd; entry->state = CONN_STATE_CONNECTING; entry->ref = 1; return entry; } pthread_mutex_destroy(&entry->mutex); } else errno = ret; free(entry); } close(sockfd); } return NULL; }
1、connect建立连接
int Communicator::nonblock_connect(CommTarget *target) { // 创建cfd int sockfd = target->create_connect_fd(); ... // 设置非阻塞 __set_fd_nonblock(sockfd) ... // 然后调用connec连接 if (connect(sockfd, target->addr, target->addrlen) >= 0 || errno == EINPROGRESS) { return sockfd; } ... }
2、创建新的CommConnection
virtual CommConnection *new_connection(int connect_fd)
{
return new CommConnection;
}
(b)异步connect (这里没懂,为什么会到这里)
然后我们poller检测出这个事件后
// __poller_thread_routines 中调用 __poller_handle_connect(node, poller); static void __poller_handle_connect(struct __poller_node *node, poller_t *poller) { socklen_t len = sizeof (int); int error; if (getsockopt(node->data.fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) error = errno; if (__poller_remove_node(node, poller)) return; ... //放入消息队列 poller->cb((struct poller_result *)node, poller->ctx); }
poller->cb((struct poller_result *)node, poller->ctx);就是把node(res) 放入这个msg queue里
(c)Communicator::handler_thread_routine
void Communicator::handler_thread_routine(void *context) { Communicator *comm = (Communicator *)context; struct poller_result *res; while ((res = (struct poller_result *)msgqueue_get(comm->queue)) != NULL) { switch (res->data.operation) { ... case PD_OP_CONNECT: case PD_OP_SSL_CONNECT: comm->handle_connect_result(res); break; ... } }
(d)Communicator::handle_connect_result
于是来处理connect,发送entry,并把read放到epoll上间监听
void Communicator::handle_connect_result(struct poller_result *res) { struct CommConnEntry *entry = (struct CommConnEntry *)res->data.context; CommSession *session = entry->session; CommTarget *target = entry->target; session->out = session->message_out(); ret = this->send_message(entry); res->data.operation = PD_OP_READ; res->data.message = NULL; timeout = session->first_timeout(); if (timeout == 0) timeout = Communicator::first_timeout_recv(session); else { session->timeout = -1; session->begin_time.tv_nsec = -1; } ... mpoller_add(&res->data, timeout, this->mpoller); ... }
(9)send_message
int Communicator::send_message(struct CommConnEntry *entry) { struct iovec vectors[ENCODE_IOV_MAX]; struct iovec *end; int cnt; //消息序列化到vectors的数组 cnt = entry->session->out->encode(vectors, ENCODE_IOV_MAX); ... end = vectors + cnt; if (!entry->ssl) { //发送消息 cnt = this->send_message_sync(vectors, cnt, entry); if (cnt <= 0) return cnt; } return this->send_message_async(end - cnt, cnt, entry); }
https://github.com/chanchann/workflow_annotation/blob/main/src_analysis/18_http_01.md
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。