赞
踩
pip是python提供的包管理工具,该工具提供了对python包的查找、下载、安装与卸载等功能的工具,当前是python中比较主流的管理工具。
pip工具的本质通过网络请求去请求服务端对应名称的文件然后解压到本地的python的库文件夹中,从而达到将远端的python包下载并安装到本地。概述流程如下:
pip install django
1.先获取到远端的服务器地址url比如:http://mirrors.aliyun.com/pypi/simple/
2.然后获取到本地的库安装的路径,通过服务器url去查找对应的django包
3.讲找到的包下载到本地
4.解压该包到python的site-packages文件夹中,然后检查是否需要依赖其他包,如果依赖就安装其他的包
5.如果有依赖安装则按照同样的流程执行,待完成之后包就安装完成
因为pip安装的本质就是通过url去服务端请求文件并下载,pip网络请求使用的requests库,只不过被封装到了pip的_vendor文件夹里面,并没有将requests包进行安装。
本次执行示例,假设python环境中还未安装Django,此时我们执行pip install django,函数的入口如下:
pip = pip._internal:main
查看main函数;
def main(args=None): if args is None: args = sys.argv[1:] # 获取命令行输入参数 # Configure our deprecation warnings to be sent through loggers deprecation.install_warning_logger() autocomplete() try: cmd_name, cmd_args = parse_command(args) # 解析传入的命令行参数 except PipError as exc: sys.stderr.write("ERROR: %s" % exc) sys.stderr.write(os.linesep) sys.exit(1) # Needed for locale.getpreferredencoding(False) to work # in pip._internal.utils.encoding.auto_decode try: locale.setlocale(locale.LC_ALL, '') except locale.Error as e: # setlocale can apparently crash if locale are uninitialized logger.debug("Ignoring error %s when setting locale", e) command = commands_dict[cmd_name](isolated=("--isolated" in cmd_args)) # 根据传入的命令找到对应的command 本例中传入的是install return command.main(cmd_args) # 执行对应cmd的main 方法
此时查看commands_dict中对应的install命令,该命令是InstallCommand,通过继承关系可知,InstallCommand继承自 RequirementCommand,而RequirementCommand继承自Command类,从command.main的执行的是Command类的main方法,
def main(self, args): # type: (List[str]) -> int options, args = self.parse_args(args)。 # 解析传入的参数 # Set verbosity so that it can be used elsewhere. self.verbosity = options.verbose - options.quiet level_number = setup_logging( verbosity=self.verbosity, no_color=options.no_color, user_log_file=options.log, ) # 设置打印日志的等级 if sys.version_info[:2] == (3, 4): # 获取当前的版本是否是3.4 deprecated( "Python 3.4 support has been deprecated. pip 19.1 will be the " "last one supporting it. Please upgrade your Python as Python " "3.4 won't be maintained after March 2019 (cf PEP 429).", replacement=None, gone_in='19.2', ) elif sys.version_info[:2] == (2, 7): # 是否是2.7 message = ( "A future version of pip will drop support for Python 2.7." ) if platform.python_implementation() == "CPython": message = ( "Python 2.7 will reach the end of its life on January " "1st, 2020. Please upgrade your Python as Python 2.7 " "won't be maintained after that date. " ) + message deprecated(message, replacement=None, gone_in=None) # TODO: Try to get these passing down from the command? # without resorting to os.environ to hold these. # This also affects isolated builds and it should. if options.no_input: os.environ['PIP_NO_INPUT'] = '1' if options.exists_action: os.environ['PIP_EXISTS_ACTION'] = ' '.join(options.exists_action) if options.require_venv and not self.ignore_require_venv: # If a venv is required check if it can really be found if not running_under_virtualenv(): logger.critical( 'Could not find an activated virtualenv (required).' ) sys.exit(VIRTUALENV_NOT_FOUND) try: status = self.run(options, args) # 调用子类实现的run方法来执行业务 # FIXME: all commands should return an exit status # and when it is done, isinstance is not needed anymore if isinstance(status, int): return status except PreviousBuildDirError as exc: # 报错处理流程 logger.critical(str(exc)) logger.debug('Exception information:', exc_info=True) return PREVIOUS_BUILD_DIR_ERROR except (InstallationError, UninstallationError, BadCommand) as exc: logger.critical(str(exc)) logger.debug('Exception information:', exc_info=True) return ERROR except CommandError as exc: logger.critical('ERROR: %s', exc) logger.debug('Exception information:', exc_info=True) return ERROR except BrokenStdoutLoggingError: # Bypass our logger and write any remaining messages to stderr # because stdout no longer works. print('ERROR: Pipe to stdout was broken', file=sys.stderr) if level_number <= logging.DEBUG: traceback.print_exc(file=sys.stderr) return ERROR except KeyboardInterrupt: logger.critical('Operation cancelled by user') logger.debug('Exception information:', exc_info=True) return ERROR except BaseException: logger.critical('Exception:', exc_info=True) return UNKNOWN_ERROR finally: allow_version_check = ( # Does this command have the index_group options? hasattr(options, "no_index") and # Is this command allowed to perform this check? not (options.disable_pip_version_check or options.no_index) ) # Check if we're using the latest version of pip available if allow_version_check: session = self._build_session( options, retries=0, timeout=min(5, options.timeout) ) with session: pip_version_check(session, options) # Shutdown the logging module logging.shutdown() return SUCCESS
此时我们查看InstallCommand类的run方法,
def run(self, options, args): cmdoptions.check_install_build_global(options) # 检查输入的build-option等信息 upgrade_strategy = "to-satisfy-only" if options.upgrade: upgrade_strategy = options.upgrade_strategy if options.build_dir: options.build_dir = os.path.abspath(options.build_dir) # _build_package_finder cmdoptions.check_dist_restriction(options, check_target=True) if options.python_version: python_versions = [options.python_version] # python版本 else: python_versions = None options.src_dir = os.path.abspath(options.src_dir) # 获取src_dir文件路径 install_options = options.install_options or [] # 注册选项 if options.use_user_site: if options.prefix_path: raise CommandError( "Can not combine '--user' and '--prefix' as they imply " "different installation locations" ) if virtualenv_no_global(): raise InstallationError( "Can not perform a '--user' install. User site-packages " "are not visible in this virtualenv." ) install_options.append('--user') install_options.append('--prefix=') target_temp_dir = TempDirectory(kind="target") if options.target_dir: options.ignore_installed = True options.target_dir = os.path.abspath(options.target_dir) if (os.path.exists(options.target_dir) and not os.path.isdir(options.target_dir)): raise CommandError( "Target path exists but is not a directory, will not " "continue." ) # Create a target directory for using with the target option target_temp_dir.create() install_options.append('--home=' + target_temp_dir.path) global_options = options.global_options or [] with self._build_session(options) as session: # 新建session,该session就是封装了requests的Session finder = self._build_package_finder( options=options, session=session, platform=options.platform, python_versions=python_versions, abi=options.abi, implementation=options.implementation, ) # 生成一个PackageFinder类实例,该实例可下载安装包 build_delete = (not (options.no_clean or options.build_dir)) wheel_cache = WheelCache(options.cache_dir, options.format_control) if options.cache_dir and not check_path_owner(options.cache_dir): logger.warning( "The directory '%s' or its parent directory is not owned " "by the current user and caching wheels has been " "disabled. check the permissions and owner of that " "directory. If executing pip with sudo, you may want " "sudo's -H flag.", options.cache_dir, ) options.cache_dir = None with RequirementTracker() as req_tracker, TempDirectory( options.build_dir, delete=build_delete, kind="install" ) as directory: requirement_set = RequirementSet( require_hashes=options.require_hashes, check_supported_wheels=not options.target_dir, ) # 生成一个RequirementSet实例 try: self.populate_requirement_set( requirement_set, args, options, finder, session, self.name, wheel_cache ) # 将输入的参数转为RequirementSet实例 preparer = RequirementPreparer( build_dir=directory.path, src_dir=options.src_dir, download_dir=None, wheel_download_dir=None, progress_bar=options.progress_bar, build_isolation=options.build_isolation, req_tracker=req_tracker, ) # 生成一个preparer实例 resolver = Resolver( preparer=preparer, finder=finder, session=session, wheel_cache=wheel_cache, use_user_site=options.use_user_site, upgrade_strategy=upgrade_strategy, force_reinstall=options.force_reinstall, ignore_dependencies=options.ignore_dependencies, ignore_requires_python=options.ignore_requires_python, ignore_installed=options.ignore_installed, isolated=options.isolated_mode, use_pep517=options.use_pep517 ) # 解析实例 resolver.resolve(requirement_set) # 解析并下载对应的包 protect_pip_from_modification_on_windows( modifying_pip=requirement_set.has_requirement("pip") ) # Consider legacy and PEP517-using requirements separately legacy_requirements = [] pep517_requirements = [] for req in requirement_set.requirements.values(): if req.use_pep517: pep517_requirements.append(req) else: legacy_requirements.append(req) # We don't build wheels for legacy requirements if we # don't have wheel installed or we don't have a cache dir try: import wheel # noqa: F401 build_legacy = bool(options.cache_dir) except ImportError: build_legacy = False wb = WheelBuilder( finder, preparer, wheel_cache, build_options=[], global_options=[], ) # Always build PEP 517 requirements build_failures = wb.build( pep517_requirements, session=session, autobuilding=True ) if build_legacy: # We don't care about failures building legacy # requirements, as we'll fall through to a direct # install for those. wb.build( legacy_requirements, session=session, autobuilding=True ) # If we're using PEP 517, we cannot do a direct install # so we fail here. if build_failures: raise InstallationError( "Could not build wheels for {} which use" " PEP 517 and cannot be installed directly".format( ", ".join(r.name for r in build_failures))) to_install = resolver.get_installation_order( requirement_set ) # 添加需要安装的install # Consistency Checking of the package set we're installing. should_warn_about_conflicts = ( not options.ignore_dependencies and options.warn_about_conflicts ) if should_warn_about_conflicts: self._warn_about_conflicts(to_install) # Don't warn about script install locations if # --target has been specified warn_script_location = options.warn_script_location if options.target_dir: warn_script_location = False installed = install_given_reqs( to_install, install_options, global_options, root=options.root_path, home=target_temp_dir.path, prefix=options.prefix_path, pycompile=options.compile, warn_script_location=warn_script_location, use_user_site=options.use_user_site, ) # 安装准备好的包 lib_locations = get_lib_location_guesses( user=options.use_user_site, home=target_temp_dir.path, root=options.root_path, prefix=options.prefix_path, isolated=options.isolated_mode, ) working_set = pkg_resources.WorkingSet(lib_locations) reqs = sorted(installed, key=operator.attrgetter('name')) items = [] for req in reqs: item = req.name try: installed_version = get_installed_version( req.name, working_set=working_set ) if installed_version: item += '-' + installed_version except Exception: pass items.append(item) installed = ' '.join(items) if installed: logger.info('Successfully installed %s', installed) # 打印安装好的模块 except EnvironmentError as error: show_traceback = (self.verbosity >= 1) message = create_env_error_message( error, show_traceback, options.use_user_site, ) logger.error(message, exc_info=show_traceback) return ERROR except PreviousBuildDirError: options.no_clean = True raise finally: # Clean up if not options.no_clean: requirement_set.cleanup_files() wheel_cache.cleanup() if options.target_dir: self._handle_target_dir( options.target_dir, target_temp_dir, options.upgrade ) return requirement_set
其中有关下载的主要的函数就是resolver.resolve(requirement_set),该函数的执行代码如下;
def resolve(self, requirement_set): # type: (RequirementSet) -> None """Resolve what operations need to be done As a side-effect of this method, the packages (and their dependencies) are downloaded, unpacked and prepared for installation. This preparation is done by ``pip.operations.prepare``. Once PyPI has static dependency metadata available, it would be possible to move the preparation to become a step separated from dependency resolution. """ # make the wheelhouse if self.preparer.wheel_download_dir: ensure_dir(self.preparer.wheel_download_dir) # If any top-level requirement has a hash specified, enter # hash-checking mode, which requires hashes from all. root_reqs = ( requirement_set.unnamed_requirements + list(requirement_set.requirements.values()) ) self.require_hashes = ( requirement_set.require_hashes or any(req.has_hash_options for req in root_reqs) ) # Display where finder is looking for packages locations = self.finder.get_formatted_locations() # 获取下载的服务器URL if locations: logger.info(locations) # Actually prepare the files, and collect any exceptions. Most hash # exceptions cannot be checked ahead of time, because # req.populate_link() needs to be called before we can make decisions # based on link type. discovered_reqs = [] # type: List[InstallRequirement] hash_errors = HashErrors() for req in chain(root_reqs, discovered_reqs): try: discovered_reqs.extend( self._resolve_one(requirement_set, req) # 调用_resolve_one函数去发送请求去下载模块 ) except HashError as exc: exc.req = req hash_errors.append(exc) if hash_errors: raise hash_errors
此时继续查看_resolve_one函数;
def _resolve_one( self, requirement_set, # type: RequirementSet req_to_install # type: InstallRequirement ): # type: (...) -> List[InstallRequirement] """Prepare a single requirements file. :return: A list of additional InstallRequirements to also install. """ # Tell user what we are doing for this requirement: # obtain (editable), skipping, processing (local url), collecting # (remote url or package name) if req_to_install.constraint or req_to_install.prepared: return [] req_to_install.prepared = True # register tmp src for cleanup in case something goes wrong requirement_set.reqs_to_cleanup.append(req_to_install) abstract_dist = self._get_abstract_dist_for(req_to_install) # 下载对应的模块 # Parse and return dependencies dist = abstract_dist.dist() # 获取依赖 try: check_dist_requires_python(dist) except UnsupportedPythonVersion as err: if self.ignore_requires_python: logger.warning(err.args[0]) else: raise more_reqs = [] # type: List[InstallRequirement] def add_req(subreq, extras_requested): sub_install_req = install_req_from_req_string( str(subreq), req_to_install, isolated=self.isolated, wheel_cache=self.wheel_cache, use_pep517=self.use_pep517 ) parent_req_name = req_to_install.name to_scan_again, add_to_parent = requirement_set.add_requirement( sub_install_req, parent_req_name=parent_req_name, extras_requested=extras_requested, ) if parent_req_name and add_to_parent: self._discovered_dependencies[parent_req_name].append( add_to_parent ) more_reqs.extend(to_scan_again) with indent_log(): # 下载依赖 # We add req_to_install before its dependencies, so that we # can refer to it when adding dependencies. if not requirement_set.has_requirement(req_to_install.name): # 'unnamed' requirements will get added here req_to_install.is_direct = True requirement_set.add_requirement( req_to_install, parent_req_name=None, ) if not self.ignore_dependencies: if req_to_install.extras: logger.debug( "Installing extra requirements: %r", ','.join(req_to_install.extras), ) missing_requested = sorted( set(req_to_install.extras) - set(dist.extras) ) for missing in missing_requested: logger.warning( '%s does not provide the extra \'%s\'', dist, missing ) available_requested = sorted( set(dist.extras) & set(req_to_install.extras) ) for subreq in dist.requires(available_requested): add_req(subreq, extras_requested=available_requested) if not req_to_install.editable and not req_to_install.satisfied_by: # XXX: --no-install leads this to report 'Successfully # downloaded' for only non-editable reqs, even though we took # action on them. requirement_set.successfully_downloaded.append(req_to_install) return more_reqs
此时继续查看_get_abstract_dist_for函数是如何下载的;
def _get_abstract_dist_for(self, req): # type: (InstallRequirement) -> DistAbstraction """Takes a InstallRequirement and returns a single AbstractDist \ representing a prepared variant of the same. """ ... # satisfied_by is only evaluated by calling _check_skip_installed, # so it must be None here. assert req.satisfied_by is None skip_reason = self._check_skip_installed(req) # 检查是否有已经注册过的 if req.satisfied_by: return self.preparer.prepare_installed_requirement( req, self.require_hashes, skip_reason ) upgrade_allowed = self._is_upgrade_allowed(req) abstract_dist = self.preparer.prepare_linked_requirement( req, self.session, self.finder, upgrade_allowed, self.require_hashes ) # 下载包 ... return abstract_dist
prepare_linked_requirement函数对应的代码如下;
def prepare_linked_requirement( self, req, # type: InstallRequirement session, # type: PipSession finder, # type: PackageFinder upgrade_allowed, # type: bool require_hashes # type: bool ): # type: (...) -> DistAbstraction """Prepare a requirement that would be obtained from req.link """ # TODO: Breakup into smaller functions if req.link and req.link.scheme == 'file': path = url_to_path(req.link.url) logger.info('Processing %s', display_path(path)) else: logger.info('Collecting %s', req) with indent_log(): # @@ if filesystem packages are not marked # editable in a req, a non deterministic error # occurs when the script attempts to unpack the # build directory req.ensure_has_source_dir(self.build_dir) # If a checkout exists, it's unwise to keep going. version # inconsistencies are logged later, but do not fail the # installation. # FIXME: this won't upgrade when there's an existing # package unpacked in `req.source_dir` # package unpacked in `req.source_dir` if os.path.exists(os.path.join(req.source_dir, 'setup.py')): raise PreviousBuildDirError( "pip can't proceed with requirements '%s' due to a" " pre-existing build directory (%s). This is " "likely due to a previous installation that failed" ". pip is being responsible and not assuming it " "can delete this. Please delete it and try again." % (req, req.source_dir) ) req.populate_link(finder, upgrade_allowed, require_hashes) # 下载内容 # We can't hit this spot and have populate_link return None. # req.satisfied_by is None here (because we're # guarded) and upgrade has no impact except when satisfied_by # is not None. # Then inside find_requirement existing_applicable -> False # If no new versions are found, DistributionNotFound is raised, # otherwise a result is guaranteed. assert req.link link = req.link # Now that we have the real link, we can tell what kind of # requirements we have and raise some more informative errors # than otherwise. (For example, we can raise VcsHashUnsupported # for a VCS URL rather than HashMissing.) if require_hashes: # We could check these first 2 conditions inside # unpack_url and save repetition of conditions, but then # we would report less-useful error messages for # unhashable requirements, complaining that there's no # hash provided. if is_vcs_url(link): raise VcsHashUnsupported() elif is_file_url(link) and is_dir_url(link): raise DirectoryUrlHashUnsupported() if not req.original_link and not req.is_pinned: # Unpinned packages are asking for trouble when a new # version is uploaded. This isn't a security check, but # it saves users a surprising hash mismatch in the # future. # # file:/// URLs aren't pinnable, so don't complain # about them not being pinned. raise HashUnpinned() hashes = req.hashes(trust_internet=not require_hashes) if require_hashes and not hashes: # Known-good hashes are missing for this requirement, so # shim it with a facade object that will provoke hash # computation and then raise a HashMissing exception # showing the user what the hash should be. hashes = MissingHashes() try: download_dir = self.download_dir # 需要下载到的文件目录 # We always delete unpacked sdists after pip ran. autodelete_unpacked = True if req.link.is_wheel and self.wheel_download_dir: # when doing 'pip wheel` we download wheels to a # dedicated dir. download_dir = self.wheel_download_dir if req.link.is_wheel: if download_dir: # When downloading, we only unpack wheels to get # metadata. autodelete_unpacked = True else: # When installing a wheel, we use the unpacked # wheel. autodelete_unpacked = False unpack_url( req.link, req.source_dir, download_dir, autodelete_unpacked, session=session, hashes=hashes, progress_bar=self.progress_bar ) # 解压相关文件 except requests.HTTPError as exc: logger.critical( 'Could not install requirement %s because of error %s', req, exc, ) raise InstallationError( 'Could not install requirement %s because of HTTP ' 'error %s for URL %s' % (req, exc, req.link) ) abstract_dist = make_abstract_dist(req) with self.req_tracker.track(req): abstract_dist.prep_for_dist(finder, self.build_isolation) if self._download_should_save: # 将下载的内容保存 # Make a .zip of the source_dir we already created. if req.link.scheme in vcs.all_schemes: req.archive(self.download_dir) return abstract_dist
此时通过req.populate_link来进行模块的下载;
def populate_link(self, finder, upgrade, require_hashes): # type: (PackageFinder, bool, bool) -> None """Ensure that if a link can be found for this, that it is found. Note that self.link may still be None - if Upgrade is False and the requirement is already installed. If require_hashes is True, don't use the wheel cache, because cached wheels, always built locally, have different hashes than the files downloaded from the index server and thus throw false hash mismatches. Furthermore, cached wheels at present have undeterministic contents due to file modification times. """ if self.link is None: self.link = finder.find_requirement(self, upgrade) # 下载文件 if self._wheel_cache is not None and not require_hashes: old_link = self.link self.link = self._wheel_cache.get(self.link, self.name) if old_link != self.link: logger.debug('Using cached wheel link: %s', self.link)
此时对应的find_requirement函数的执行流程如下;
def find_requirement(self, req, upgrade): # type: (InstallRequirement, bool) -> Optional[Link] """Try to find a Link matching req Expects req, an InstallRequirement and upgrade, a boolean Returns a Link if found, Raises DistributionNotFound or BestVersionAlreadyInstalled otherwise """ all_candidates = self.find_all_candidates(req.name) # 搜集并下载 # Filter out anything which doesn't match our specifier compatible_versions = set( req.specifier.filter( # We turn the version object into a str here because otherwise # when we're debundled but setuptools isn't, Python will see # packaging.version.Version and # pkg_resources._vendor.packaging.version.Version as different # types. This way we'll use a str as a common data interchange # format. If we stop using the pkg_resources provided specifier # and start using our own, we can drop the cast to str(). [str(c.version) for c in all_candidates], prereleases=( self.allow_all_prereleases if self.allow_all_prereleases else None ), ) ) ... return best_candidate.location
find_all_candidates函数如下所示;
def find_all_candidates(self, project_name): # type: (str) -> List[Optional[InstallationCandidate]] """Find all available InstallationCandidate for project_name This checks index_urls and find_links. All versions found are returned as an InstallationCandidate list. See _link_package_versions for details on which files are accepted """ index_locations = self._get_index_urls_locations(project_name) index_file_loc, index_url_loc = self._sort_locations(index_locations) fl_file_loc, fl_url_loc = self._sort_locations( self.find_links, expand_dir=True, ) file_locations = (Link(url) for url in itertools.chain( index_file_loc, fl_file_loc, )) # We trust every url that the user has given us whether it was given # via --index-url or --find-links. # We want to filter out any thing which does not have a secure origin. url_locations = [ link for link in itertools.chain( (Link(url) for url in index_url_loc), (Link(url) for url in fl_url_loc), ) if self._validate_secure_origin(logger, link) ] # 查找地址 logger.debug('%d location(s) to search for versions of %s:', len(url_locations), project_name) for location in url_locations: logger.debug('* %s', location) canonical_name = canonicalize_name(project_name) formats = self.format_control.get_allowed_formats(canonical_name) search = Search(project_name, canonical_name, formats) find_links_versions = self._package_versions( # We trust every directly linked archive in find_links (Link(url, '-f') for url in self.find_links), search ) page_versions = [] for page in self._get_pages(url_locations, project_name): # 访问远端服务器 logger.debug('Analyzing links from page %s', page.url) with indent_log(): page_versions.extend( self._package_versions(page.iter_links(), search) ) file_versions = self._package_versions(file_locations, search) # 获取版本信息 if file_versions: file_versions.sort(reverse=True) logger.debug( 'Local files found: %s', ', '.join([ url_to_path(candidate.location.url) for candidate in file_versions ]) ) # This is an intentional priority ordering return file_versions + find_links_versions + page_versions
此时主要执行的就是_get_pages去获取内容;
def _get_pages(self, locations, project_name): # type: (Iterable[Link], str) -> Iterable[HTMLPage] """ Yields (page, page_url) from the given locations, skipping locations that have errors. """ seen = set() # type: Set[Link] for location in locations: if location in seen: continue seen.add(location) page = _get_html_page(location, session=self.session) # 获取html信息 if page is None: continue yield page def _get_html_page(link, session=None): # type: (Link, Optional[PipSession]) -> Optional[HTMLPage] if session is None: raise TypeError( "_get_html_page() missing 1 required keyword argument: 'session'" ) url = link.url.split('#', 1)[0] # Check for VCS schemes that do not support lookup as web pages. vcs_scheme = _match_vcs_scheme(url) if vcs_scheme: logger.debug('Cannot look at %s URL %s', vcs_scheme, link) return None # Tack index.html onto file:// URLs that point to directories scheme, _, path, _, _, _ = urllib_parse.urlparse(url) # 编码 if (scheme == 'file' and os.path.isdir(urllib_request.url2pathname(path))): # add trailing slash if not present so urljoin doesn't trim # final segment if not url.endswith('/'): url += '/' url = urllib_parse.urljoin(url, 'index.html') logger.debug(' file: URL is directory, getting %s', url) try: resp = _get_html_response(url, session=session) # 发送请求 except _NotHTTP as exc: logger.debug( 'Skipping page %s because it looks like an archive, and cannot ' 'be checked by HEAD.', link, ) except _NotHTML as exc: logger.debug( 'Skipping page %s because the %s request got Content-Type: %s', link, exc.request_desc, exc.content_type, ) except requests.HTTPError as exc: _handle_get_page_fail(link, exc) except RetryError as exc: _handle_get_page_fail(link, exc) except SSLError as exc: reason = "There was a problem confirming the ssl certificate: " reason += str(exc) _handle_get_page_fail(link, reason, meth=logger.info) except requests.ConnectionError as exc: _handle_get_page_fail(link, "connection error: %s" % exc) except requests.Timeout: _handle_get_page_fail(link, "timed out") else: return HTMLPage(resp.content, resp.url, resp.headers) return None def _get_html_response(url, session): # type: (str, PipSession) -> Response """Access an HTML page with GET, and return the response. This consists of three parts: 1. If the URL looks suspiciously like an archive, send a HEAD first to check the Content-Type is HTML, to avoid downloading a large file. Raise `_NotHTTP` if the content type cannot be determined, or `_NotHTML` if it is not HTML. 2. Actually perform the request. Raise HTTP exceptions on network failures. 3. Check the Content-Type header to make sure we got HTML, and raise `_NotHTML` otherwise. """ if _is_url_like_archive(url): _ensure_html_response(url, session=session) logger.debug('Getting page %s', url) resp = session.get( url, headers={ "Accept": "text/html", # We don't want to blindly returned cached data for # /simple/, because authors generally expecting that # twine upload && pip install will function, but if # they've done a pip install in the last ~10 minutes # it won't. Thus by setting this to zero we will not # blindly use any cached data, however the benefit of # using max-age=0 instead of no-cache, is that we will # still support conditional requests, so we will still # minimize traffic sent in cases where the page hasn't # changed at all, we will just always incur the round # trip for the conditional GET now instead of only # once per 10 minutes. # For more information, please see pypa/pip#5670. "Cache-Control": "max-age=0", }, ) # 通过session发送请求去请求内容 resp.raise_for_status() # The check for archives above only works if the url ends with # something that looks like an archive. However that is not a # requirement of an url. Unless we issue a HEAD request on every # url we cannot know ahead of time for sure if something is HTML # or not. However we can check after we've downloaded it. _ensure_html_header(resp) return resp
至此请求的过程就分析完成,该流程的主要调用过程如下所示;
本文只是简单的查看了一下pip的请求流程而已,并没有对pip具体的业务场景进行详细分析,大家如有兴趣可自行分析,本文只是突然发现pip安装包的过程中源地址未换所以简单的看一下执行流程而已。大家有兴趣可自行分析。由于本人才疏学浅,如有错误请批评指正。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。