当前位置:   article > 正文

istio-proxy相关概念以及启动过程

istio-proxy

Istio-proxy相关概念

istio-proxy

   Istio代理是可在客户端和服务器端使用的微服务代理,并形成微服务网格。代理支持大量功能。

 客户端功能:

  • 发现和负载平衡。代理可以使用几个标准的服务发现和负载平衡API,以有效地将流量分配给服务。
  • 凭证注入。代理可以通过连接隧道或特定于协议的机制(例如HTTP请求的JWT令牌)注入客户端身份。
  • 连接管理。代理管理与服务的连接,处理运行状况检查,重试,故障转移和流控制。
  • 监控和记录。代理可以报告客户端指标并记录到混合器。

服务器端功能:

  • 速率限制和流量控制。代理可以防止后端系统过载,并提供客户端感知的速率限制。
  • 协议翻译。代理是gRPC网关,提供JSON-REST和gRPC之间的转换。
  • 认证与授权。代理支持多种身份验证机制,并且可以使用客户端身份通过混合器执行授权检查。
  • 监控和记录。代理可以报告服务器端指标并记录到混合器。

istio proxy和Envoy的关系

istio proxy这个项目工程既包含引用了Envoy的源码,还在此基础上自己做了扩展,这个扩展是通过Envoy filter(过滤器)的形式来提供,这样的话就可以使得proxy代理将策略执行决策委托给Mixer,这样就解释了为什么Mixer可以被设计为提供策略和遥测的组件,Mixer->istio proxy->Envoy这种形式来控制。这样通过这个方式就能:

  • 使用到Envoy的全部功能
  • 基于Envoy做扩展,结合istio本身做处理

编译环境

centos7.x ,首先参考Bazel的官方文档安装Bazel,并且需要安装gcc等相关工具。

  1. git clone https://github.com/istio/proxy.git
  2. cd proxy
  3. make build_envoy

项目主要目录如下:

  1. ├── "BUILD"
  2. ├── "Makefile"
  3. ├── "WORKSPACE"
  4. ├── src
  5. │ ├── envoy -- envoy filter 插件源码
  6. │ │ ├── alts
  7. │ │ │ ├── *.cc
  8. │ │ │ ├── *.h
  9. │ │ │ └── "BUILD"
  10. │ │ ├── "BUILD"
  11. │ │ ├── http
  12. │ │ │ ├── authn --认证 filte
  13. │ │ │ │ ├── *.cc
  14. │ │ │ │ ├── *.h
  15. │ │ │ │ └── "BUILD"
  16. │ │ │ ├── jwt_auth --jwt 认证 filter
  17. │ │ │ │ ├── *.cc
  18. │ │ │ │ ├── *.h
  19. │ │ │ │ └── "BUILD"
  20. │ │ │ └── mixer --mixer filter,实现metrics上报,Quota(Rate Limiting (处理http协议)
  21. │ │ │ ├── *.cc
  22. │ │ │ ├── *.h
  23. │ │ │ └── "BUILD"
  24. │ │ ├── tcp
  25. │ │ │ └── mixer --mixer filter(处理tcp协议)
  26. │ │ │ ├── *.cc
  27. │ │ │ ├── *.h
  28. │ │ │ └── "BUILD"
  29. │ │ └── utils
  30. │ │ ├── *.cc
  31. │ │ ├── *.h
  32. │ │ └── "BUILD"
  33. │ └── istio
  34. │ └── **
  35. ├── test
  36. │ └── **
  37. └── tools
  38. └── **

makefile文件的关键信息:其中//src/envoy:envoy为bazel的语法,我们根据路径查询对应的内容;

  1. build_envoy:
  2. export PATH=$(PATH) CC=$(CC) CXX=$(CXX) && bazel $(BAZEL_STARTUP_ARGS) build $(BAZEL_BUILD_ARGS) $(BAZEL_CONFIG_REL) //src/envoy:envoy

/src/envops/BUILD文件关键内容为:cc_binary表明该target对应的是c++二进制文件路径,其中deps部分是其依赖的其他target。前13个target都是本地依赖,对应到源码目录中的其他子目录下的BUILD文件,其中最后一个比较特殊,是一个外部依赖,该外部库为envoy,因此proxy会make时音容envoy的源码,也算是扩展吧。

  1. envoy_cc_binary(
  2. name = "envoy",
  3. repository = "@envoy",
  4. visibility = ["//visibility:public"],
  5. deps = [
  6. // 对应本地文件
  7. "//extensions/access_log_policy:access_log_policy_lib",
  8. "//extensions/metadata_exchange:metadata_exchange_lib",
  9. "//extensions/stackdriver:stackdriver_plugin",
  10. "//extensions/stats:stats_plugin",
  11. "//src/envoy/http/alpn:config_lib",
  12. "//src/envoy/http/authn:filter_lib",
  13. "//src/envoy/http/jwt_auth:http_filter_factory",
  14. "//src/envoy/http/mixer:filter_lib",
  15. "//src/envoy/tcp/forward_downstream_sni:config_lib",
  16. "//src/envoy/tcp/metadata_exchange:config_lib",
  17. "//src/envoy/tcp/mixer:filter_lib",
  18. "//src/envoy/tcp/sni_verifier:config_lib",
  19. "//src/envoy/tcp/tcp_cluster_rewrite:config_lib",
  20. // 对应外部的envoy
  21. "@envoy//source/exe:envoy_main_entry_lib",
  22. ],
  23. )

外部库定义在根目录下的workspace中,envoy的相关内容如图,在执行过程中,根据URL下载编译指定资源。

  1. http_archive(
  2. name = "envoy",
  3. sha256 = ENVOY_SHA256,
  4. strip_prefix = "envoy-wasm-" + ENVOY_SHA,
  5. url = "https://github.com/envoyproxy/envoy-wasm/archive/" + ENVOY_SHA + ".tar.gz",
  6. )

编译过程中的依赖关系如下图所示:

从istio-proxy项目中的Envoy BUILD中可以知道,这里会编译出name = "Envoy"的二进制程序,然后start_Envoy会启动Envoy,同时会根据一些默认参数和配置文件模板生成一个全新的配置文件,然后运行。一些关键参数如Envoy二进制的路径和配置路径、监听的端口、mixer的地址如下:

Envoy相关部分

istio-proxy源码中提供了envoy.conf.template 通用配置模板,这个模板文件最终会生成一个envoy的配置文件,然后envoy启动的时候指定运行。模板配置文件中已经配置好了Mixer相关的参数如mixer_server,这个Mixer对于Envoy来说就是一个cluster,因此是在cluster_manager里面进行管理配置。

然后Envoy的官方文档Listener discovery service (LDS)一文中有说明静态的Listener文件配置是无法通过LDS API进行修改或删除的,因此静态配置会一直生效,istio-proxy源码中则提供了Envoy的静态配置文件envoy_lds.conf 静态listeners配置

Filter相关

官方文档中描述的三个filtuers相关概念;Istio-proxy实现了Network::ReadFilter 和 Network::WriteFilter 过滤器,这样就可以通过filter API 绑定到Listener,然后当有数据读or写的时候调用到对应的filter了,这样数据流就从Envoy本身转到了filter中。

  1. // path:/src/envoy/tcp/mixer/filter.h
  2. // Network::ReadFilter
  3. Network::FilterStatus onData(Buffer::Instance &data, bool) override;
  4. // Network::WriteFilter
  5. Network::FilterStatus onWrite(Buffer::Instance &data, bool) override;

然后network filter再转到Envoy 的 http filter,根据官方文档HTTP filters的介绍也有两种Filters,为Decoder和Encoder,然后在istio-proxy源码中实现了 Http::StreamDecoderFilter和Http::StreamEncoderFilter这两个Filter,这样的话整个流程就串起来了。

 

  1. // path:/src/envoy/
  2. //Http::StreamDecoderFilter
  3. FilterHeadersStatus decodeHeaders(HeaderMap& headers, bool) override;
  4. FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override;
  5. // Http::StreamEncoderFilter
  6. FilterHeadersStatus encode100ContinueHeaders(HeaderMap&) override {
  7. return FilterHeadersStatus::Continue;
  8. }
  9. FilterHeadersStatus encodeHeaders(HeaderMap& headers, bool) override;
  10. FilterDataStatus encodeData(Buffer::Instance&, bool) override {
  11. return FilterDataStatus::Continue;
  12. }

Mixer client

原有mixer client仓库是独立的,现在已经整合到了istio-proxy的代码仓库中,这样就可以很方便的在Sidecar Envoy代理中实现:

  • 和mixer server交互
  • 添加一级缓存
  • 前置检查
  • 后置批量上报
  • 策略控制
  • 属性、字段转换和传递

对此Istio-proxy的理解,这个Envoy的扩展,就是在Envoy基础上,增加了一些Filter,然后通过这些个Filter在利用Mixer Client和 Mixer Server进行通信,这样就可以在proxy代理中:

  • 流量代理
  • 策略控制
  • 遥测代理

Envoy详解

Envoy是一个高性能的C++写的proxy转发器,那Envoy如何转发请求呢?需要定一些规则,然后按照这些规则进行转发。

规则可以是静态的,放在配置文件中的,启动的时候加载,要想重新加载,一般需要重新启动,但是Envoy支持热加载和热重启,一定程度上缓解了这个问题。

当然最好的方式是规则设置为动态的,放在统一的地方维护,这个统一的地方在Envoy眼中看来称为Discovery Service,过一段时间去这里拿一下配置,就修改了转发策略。

无论是静态的,还是动态的,在配置里面往往会配置四个东西。

  • listener,也即envoy既然是proxy,专门做转发,就得监听一个端口,接入请求,然后才能够根据策略转发,这个监听的端口称为listener
  • endpoint,是目标的ip地址和端口,这个是proxy最终将请求转发到的地方。
  • cluster,一个cluster是具有完全相同行为的多个endpoint,也即如果有三个容器在运行,就会有三个IP和端口,但是部署的是完全相同的三个服务,他们组成一个Cluster,从cluster到endpoint的过程称为负载均衡,可以轮询等。
  • route,有时候多个cluster具有类似的功能,但是是不同的版本号,可以通过route规则,选择将请求路由到某一个版本号,也即某一个cluster。
  • 静态配置表

 

Envoy启动过程解析

下载Envoy源码,启动入口函数在source/exe/main.cc

  1. std::unique_ptr<Envoy::MainCommon> main_common;
  2. // 声明并初始化Envoy::MainCommon实例为main_common
  3. try {
  4. main_common = std::make_unique<Envoy::MainCommon>(argc, argv);
  5. } catch (const Envoy::NoServingException& e) {
  6. return EXIT_SUCCESS;
  7. } catch (const Envoy::MalformedArgvException& e) {
  8. std::cerr << e.what() << std::endl;
  9. return EXIT_FAILURE;
  10. } catch (const Envoy::EnvoyException& e) {
  11. std::cerr << e.what() << std::endl;
  12. return EXIT_FAILURE;
  13. }
  14. // 声明并初始化Envoy::MainCommon实例为main_common,执行main_common->run启动Server
  15. return main_common->run() ? EXIT_SUCCESS : EXIT_FAILURE;

MainCommon关键代码

  1. class MainCommon {
  2. public:
  3. MainCommon(int argc, const char* const* argv);
  4. // 启动函数
  5. bool run() { return base_.run(); }
  6. .............
  7. Server::Instance* server() { return base_.server(); }
  8. MainCommonBase base_;
  9. };

MainCommonBase.cc关键代码

 

  1. // main_common.cc
  2. int main_common(OptionsImpl& options) {
  3. try {
  4. // 生成maincommonbase,在里面会做server instance的初始化
  5. MainCommonBase main_common(options);
  6. return main_common.run() ? EXIT_SUCCESS : EXIT_FAILURE;
  7. } catch (EnvoyException& e) {
  8. return EXIT_FAILURE;
  9. }
  10. return EXIT_SUCCESS;
  11. }
  12. MainCommonBase::MainCommonBase(OptionsImpl& options) : options_(options) {
  13. ......
  14. // 可以看到,MainCommon将会初始化Instance,即一个服务的实例,于是,InstanceImpl进行初始化
  15. server_.reset(new Server::InstanceImpl(
  16. options_, local_address, default_test_hooks_, *restarter_, *stats_store_, access_log_lock,
  17. component_factory_, std::make_unique<Runtime::RandomGeneratorImpl>(), *tls_));
  18. ......
  19. }

Instance会启动初始化,在初始化核心函数中,将会进行listenerConfig的全面注册

 

  1. // server.cc
  2. InstanceImpl::InstanceImpl(Options& options, Network::Address::InstanceConstSharedPtr local_address,
  3. TestHooks& hooks, HotRestart& restarter, Stats::StoreRoot& store,
  4. Thread::BasicLockable& access_log_lock,
  5. ComponentFactory& component_factory,
  6. Runtime::RandomGeneratorPtr&& random_generator,
  7. ThreadLocal::Instance& tls) {
  8. ......
  9. initialize(options, local_address, component_factory);
  10. ......
  11. }
  12. void InstanceImpl::initialize(Options& options,
  13. Network::Address::InstanceConstSharedPtr local_address,
  14. ComponentFactory& component_factory) {
  15. ...
  16. // Handle configuration that needs to take place prior to the main configuration load.
  17. InstanceUtil::loadBootstrapConfig(bootstrap_, options,
  18. messageValidationContext().staticValidationVisitor(), *api_);
  19. .......
  20. // 初始化ListenerManager
  21. listener_manager_.reset(new ListenerManagerImpl(
  22. *this, listener_component_factory_, worker_factory_, ProdSystemTimeSource::instance_));
  23. // 会初始化
  24. main_config->initialize(bootstrap_, *this, *cluster_manager_factory_);
  25. ...
  26. }
  27. // 通过loadFromFile和loadFromYaml读取配置文件路径下的配置,并完成参数校验。
  28. InstanceUtil::loadBootstrapConfig(envoy::config::bootstrap::v2::Bootstrap& bootstrap,
  29. Options& options) {
  30. try {
  31. // 根据配置信息查找对应配置文件
  32. if (!options.configPath().empty()) {
  33. MessageUtil::loadFromFile(options.configPath(), bootstrap);
  34. }
  35. if (!options.configYaml().empty()) {
  36. envoy::config::bootstrap::v2::Bootstrap bootstrap_override;
  37. MessageUtil::loadFromYaml(options.configYaml(), bootstrap_override);
  38. bootstrap.MergeFrom(bootstrap_override);
  39. }
  40. MessageUtil::validate(bootstrap);
  41. return BootstrapVersion::V2;
  42. } catch (const EnvoyException& e) {
  43. if (options.v2ConfigOnly()) {
  44. throw;
  45. }
  46. // TODO(htuch): When v1 is deprecated, make this a warning encouraging config upgrade.
  47. ENVOY_LOG(debug, "Unable to initialize config as v2, will retry as v1: {}", e.what());
  48. }
  49. if (!options.configYaml().empty()) {
  50. throw EnvoyException("V1 config (detected) with --config-yaml is not supported");
  51. }
  52. Json::ObjectSharedPtr config_json = Json::Factory::loadFromFile(options.configPath());
  53. Config::BootstrapJson::translateBootstrap(*config_json, bootstrap);
  54. MessageUtil::validate(bootstrap);
  55. return BootstrapVersion::V1;
  56. }
  57. void MainImpl::initialize(const envoy::config::bootstrap::v2::Bootstrap& bootstrap,
  58. Instance& server,
  59. Upstream::ClusterManagerFactory& cluster_manager_factory) {
  60. ......
  61. const auto& listeners = bootstrap.static_resources().listeners();
  62. ENVOY_LOG(info, "loading {} listener(s)", listeners.size());
  63. // 从bootstrap配置(yaml文件)中提取listener配置,并依次进行添加操作。
  64. for (ssize_t i = 0; i < listeners.size(); i++) {
  65. ENVOY_LOG(debug, "listener #{}:", i);
  66. server.listenerManager().addOrUpdateListener(listeners[i], "", false);
  67. }
  68. ......

回到MainCommonBase.run方法

  1. bool MainCommonBase::run() {
  2. switch (options_.mode()) {
  3. case Server::Mode::Serve:
  4. server_->run();
  5. return true;
  6. case Server::Mode::Validate: {
  7. auto local_address = Network::Utility::getLocalAddress(options_.localAddressIpVersion());
  8. return Server::validateConfig(options_, local_address, component_factory_, thread_factory_,
  9. file_system_);
  10. }
  11. case Server::Mode::InitOnly:
  12. PERF_DUMP();
  13. return true;
  14. }
  15. NOT_REACHED_GCOVR_EXCL_LINE;
  16. }

调用到InstanceImpl的run方法

 

  1. void InstanceImpl::run() {
  2. // startWorker即会进行eventloop
  3. RunHelper helper(*dispatcher_, clusterManager(), restarter_, access_log_manager_, init_manager_,
  4. [this]() -> void { startWorkers(); });
  5. ......
  6. }
  7. void InstanceImpl::startWorkers() {
  8. listener_manager_->startWorkers(*guard_dog_);
  9. ......
  10. }

InstanceImpl::startWorkers()方法解析

  1. void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
  2. ENVOY_LOG(info, "all dependencies initialized. starting workers");
  3. ASSERT(!workers_started_);
  4. workers_started_ = true;
  5. for (const auto& worker : workers_) {
  6. ASSERT(warming_listeners_.empty());
  7. for (const auto& listener : active_listeners_) {
  8. // 此处即会将所有listener绑定到所有worker身上。worker即服务的并发线程数。
  9. addListenerToWorker(*worker, *listener);
  10. }
  11. worker->start(guard_dog);
  12. }
  13. }
  14. void WorkerImpl::addListener(Network::ListenerConfig& listener, AddListenerCompletion completion) {
  15. ......
  16. handler_->addListener(listener);
  17. ......
  18. }

进一步看addListener()方法

 

  1. void ConnectionHandlerImpl::addListener(Network::ListenerConfig& config) {
  2. // 生成ActiveListener
  3. ActiveListenerPtr l(new ActiveListener(*this, config));
  4. listeners_.emplace_back(config.socket().localAddress(), std::move(l));
  5. }
  6. ConnectionHandlerImpl::ActiveListener::ActiveListener(ConnectionHandlerImpl& parent,
  7. Network::ListenerConfig& config)
  8. : ActiveListener(
  9. parent,
  10. // 可以看到,在ActiveListener初始化过程中,将进行真正Listener的初始化。
  11. parent.dispatcher_.createListener(config.socket(), *this, config.bindToPort(),
  12. config.handOffRestoredDestinationConnections()),
  13. config) {}
  14. ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
  15. bool bind_to_port, bool hand_off_restored_destination_connections)
  16. : local_address_(nullptr), cb_(cb),
  17. hand_off_restored_destination_connections_(hand_off_restored_destination_connections),
  18. ......
  19. // 通过libevent的`evconnlistener_new`实现对指定监听fd的新连接事件的回调处理。
  20. listener_.reset(
  21. evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));
  22. ......
  23. }
  24. Listener对新连接设置回调函数 & Listener Filter创建
  25. void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
  26. int remote_addr_len, void* arg) {
  27. ......
  28. // 此处的fd已经不是listenfd,已经是该新连接的connfd。
  29. listener->cb_.onAccept(std::make_unique<AcceptedSocketImpl>(fd, local_address, remote_address),
  30. listener->hand_off_restored_destination_connections_);
  31. ......
  32. }
  33. // 回调时候主要做两件事情,
  34. // 1. 构建出对应的Listener Accept Filter
  35. // 2. 构建出ServerConnection
  36. void ConnectionHandlerImpl::ActiveListener::onAccept(
  37. Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections) {
  38. ......
  39. auto active_socket = std::make_unique<ActiveSocket>(*this, std::move(socket),
  40. hand_off_restored_destination_connections);
  41. // 构建对应的Filter
  42. config_.filterChainFactory().createListenerFilterChain(*active_socket);
  43. active_socket->continueFilterChain(true);
  44. ......
  45. }
  46. void ConnectionHandlerImpl::ActiveSocket::continueFilterChain(bool success) {
  47. ......
  48. // 创建连接
  49. listener_.newConnection(std::move(socket_));
  50. ......
  51. }

我们接着newConnecion方法看

 

  1. void ConnectionHandlerImpl::ActiveListener::newConnection(Network::ConnectionSocketPtr&& socket) {
  2. ......
  3. auto transport_socket = filter_chain->transportSocketFactory().createTransportSocket();
  4. // 创建ServerConnection
  5. Network::ConnectionPtr new_connection =
  6. parent_.dispatcher_.createServerConnection(std::move(socket), std::move(transport_socket));
  7. new_connection->setBufferLimits(config_.perConnectionBufferLimitBytes());
  8. // 创建真正的Read/Write Filter
  9. const bool empty_filter_chain = !config_.filterChainFactory().createNetworkFilterChain(
  10. *new_connection, filter_chain->networkFilterFactories());
  11. ......
  12. }
  13. ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
  14. TransportSocketPtr&& transport_socket, bool connected)
  15. : transport_socket_(std::move(transport_socket)), filter_manager_(*this, *this),
  16. socket_(std::move(socket)), write_buffer_(dispatcher.getWatermarkFactory().create(
  17. [this]() -> void { this->onLowWatermark(); },
  18. [this]() -> void { this->onHighWatermark(); })),
  19. dispatcher_(dispatcher), id_(next_global_id_++) {
  20. // 当read/writed生成事件
  21. file_event_ = dispatcher_.createFileEvent(
  22. fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge,
  23. Event::FileReadyType::Read | Event::FileReadyType::Write);
  24. }

初始化完了连接和Listener,也初始化完了Accept Listener Filter和Read/Write Listener Filter,此时,Listener的libevent事件已经准备就绪,有请求到来后,Connection的read/write事件也将被触发。此时Worker启动源码:

  1. void WorkerImpl::start(GuardDog& guard_dog) {
  2. ASSERT(!thread_);
  3. thread_.reset(new Thread::Thread([this, &guard_dog]() -> void { threadRoutine(guard_dog); }));
  4. }
  5. void WorkerImpl::threadRoutine(GuardDog& guard_dog) {
  6. ......
  7. dispatcher_->run(Event::Dispatcher::RunType::Block);
  8. // 异常退出后做的清理操作
  9. guard_dog.stopWatching(watchdog);
  10. handler_.reset();
  11. tls_.shutdownThread();
  12. watchdog.reset();
  13. }
  14. void DispatcherImpl::run(RunType type) {
  15. // 启动libevent处理
  16. event_base_loop(base_.get(), type == RunType::NonBlock ? EVLOOP_NONBLOCK : 0);
  17. }

总结

     对于istio-proxy的相关概念,envoy的启动部分源码做了学习,对于启动后的动态过程,后续学习发布。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号