import logging import shutil import sys import textwrap import xmlrpc.client from collections import OrderedDict from optparse import Values from typing import TYPE_CHECKING, Dict, List, Optional from pip._vendor.packaging.version import parse as parse_version from pip._internal.cli.base_command import Command from pip._internal.cli.req_command import SessionCommandMixin from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS from pip._internal.exceptions import CommandError from pip._internal.metadata import get_default_environment from pip._internal.models.index import PyPI from pip._internal.network.xmlrpc import PipXmlrpcTransport from pip._internal.utils.logging import indent_log from pip._internal.utils.misc import write_output if TYPE_CHECKING: from typing import TypedDict class TransformedHit(TypedDict): name: str summary: str versions: List[str] logger = logging.getLogger(__name__) class SearchCommand(Command, SessionCommandMixin): """Search for PyPI packages whose name or summary contains .""" usage = """ %prog [options] """ ignore_require_venv = True def add_options(self) -> None: self.cmd_opts.add_option( "-i", "--index", dest="index", metavar="URL", default=PyPI.pypi_url, help="Base URL of Python Package Index (default %default)", ) self.parser.insert_option_group(0, self.cmd_opts) def run(self, options: Values, args: List[str]) -> int: if not args: raise CommandError("Missing required argument (search query).") query = args pypi_hits = self.search(query, options) hits = transform_hits(pypi_hits) terminal_width = None if sys.stdout.isatty(): terminal_width = shutil.get_terminal_size()[0] print_results(hits, terminal_width=terminal_width) if pypi_hits: return SUCCESS return NO_MATCHES_FOUND def search(self, query: List[str], options: Values) -> List[Dict[str, str]]: index_url = options.index session = self.get_default_session(options) transport = PipXmlrpcTransport(index_url, session) pypi = xmlrpc.client.ServerProxy(index_url, transport) try: hits = pypi.search({"name": query, "summary": query}, "or") except xmlrpc.client.Fault as fault: message = "XMLRPC request failed [code: {code}]\n{string}".format( code=fault.faultCode, string=fault.faultString, ) raise CommandError(message) assert isinstance(hits, list) return hits def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]: """ The list from pypi is really a list of versions. We want a list of packages with the list of versions stored inline. This converts the list from pypi into one we can use. """ packages: Dict[str, "TransformedHit"] = OrderedDict() for hit in hits: name = hit["name"] summary = hit["summary"] version = hit["version"] if name not in packages.keys(): packages[name] = { "name": name, "summary": summary, "versions": [version], } else: packages[name]["versions"].append(version) # if this is the highest version, replace summary and score if version == highest_version(packages[name]["versions"]): packages[name]["summary"] = summary return list(packages.values()) def print_dist_installation_info(name: str, latest: str) -> None: env = get_default_environment() dist = env.get_distribution(name) if dist is not None: with indent_log(): if dist.version == latest: write_output("INSTALLED: %s (latest)", dist.version) else: write_output("INSTALLED: %s", dist.version) if parse_version(latest).pre: write_output( "LATEST: %s (pre-release; install" " with `pip install --pre`)", latest, ) else: write_output("LATEST: %s", latest) def print_results( hits: List["TransformedHit"], name_column_width: Optional[int] = None, terminal_width: Optional[int] = None, ) -> None: if not hits: return if name_column_width is None: name_column_width = ( max( [ len(hit["name"]) + len(highest_version(hit.get("versions", ["-"]))) for hit in hits ] ) + 4 ) for hit in hits: name = hit["name"] summary = hit["summary"] or "" latest = highest_version(hit.get("versions", ["-"])) if terminal_width is not None: target_width = terminal_width - name_column_width - 5 if target_width > 10: # wrap and indent summary to fit terminal summary_lines = textwrap.wrap(summary, target_width) summary = ("\n" + " " * (name_column_width + 3)).join(summary_lines) name_latest = f"{name} ({latest})" line = f"{name_latest:{name_column_width}} - {summary}" try: write_output(line) print_dist_installation_info(name, latest) except UnicodeEncodeError: pass def highest_version(versions: List[str]) -> str: return max(versions, key=parse_version)