Source code for pip_rating.rating

import datetime
import json
import os
import logging
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING, TypedDict, Optional, Union, List, Tuple, Dict

from platformdirs import user_cache_dir
from pip_rating import __version__
from pip_rating._compat import cache
from pip_rating.sources.audit import Vulnerability

from pip_rating.sources.sourcerank import SourceRankBreakdown
from pip_rating.utils import parse_iso_datetime


if TYPE_CHECKING:
    from pip_rating.packages import Package


RATING_CACHE_DIR = Path(user_cache_dir()) / "pip-rating" / "rating"
MAX_CACHE_AGE = datetime.timedelta(days=7)


[docs]class PypiPackage(TypedDict): latest_upload_iso_dt: Optional[str] first_upload_iso_dt: Optional[str]
[docs]class SourcecodePage(TypedDict): package_in_readme: Optional[bool]
[docs]class PackageRatingParams(TypedDict): sourcerank_breakdown: SourceRankBreakdown pypi_package: PypiPackage sourcecode_page: SourcecodePage
[docs]class PackageRatingCache(TypedDict): package_name: str updated_at: str schema_version: str params: PackageRatingParams
[docs]class ScoreBase: def __add__(self, other: "ScoreBase"): raise NotImplementedError def __int__(self) -> int: raise NotImplementedError def __repr__(self) -> str: raise NotImplementedError def __str__(self) -> str: return repr(self)
[docs]class ScoreValue(ScoreBase): def __init__(self, value: int): self.value = value def __add__(self, other: "ScoreBase") -> "ScoreBase": if isinstance(other, ScoreValue): return ScoreValue(int(self) + int(other)) elif isinstance(other, Max): return other + self def __int__(self) -> int: return self.value def __repr__(self) -> str: return f"{self.value}"
[docs]class Max(ScoreBase): def __init__(self, max_score: int, current_score: int = 0): self.max_score = max_score self.current_score = current_score def __add__(self, other: ScoreBase): if isinstance(other, ScoreValue): score = self.current_score + int(other) self.current_score = min(self.max_score, score) if isinstance(other, Max) and other.max_score < self.max_score: other.current_score += self.current_score return other if isinstance(other, Max) and other.max_score > self.max_score: self.current_score += other.current_score return self return self def __int__(self) -> int: return min(self.current_score, self.max_score) def __str__(self): return f"Max({self.max_score})" def __repr__(self) -> str: return f"<Max current: {self.current_score} max: {self.max_score}>"
[docs]class BreakdownBase: breakdown_key: str
[docs] def get_score(self, package_rating: "PackageRating") -> ScoreBase: raise NotImplementedError
[docs] def get_breakdown_value( self, package_rating: "PackageRating" ) -> Union[int, bool, str]: value = package_rating.params for subkey in self.breakdown_key.split("."): value = value[subkey] return value
[docs]class PackageBreakdown(BreakdownBase): def __init__(self, breakdown_key: str, score: Optional[Union[int, Max]] = None): self.breakdown_key = breakdown_key self._score = score
[docs] def get_score(self, package_rating: "PackageRating") -> ScoreValue: value = self.get_breakdown_value(package_rating) if value and self._score: return ScoreValue(self._score) if not value and self._score: return ScoreValue(0) # the default is 0 if isinstance(value, bool): raise ValueError("Cannot calculate score for boolean value") return ScoreValue(value)
[docs]class DateBreakdown(BreakdownBase): def __init__( self, breakdown_key: str, scores: Dict[datetime.timedelta, int], default: int ): self.breakdown_key = breakdown_key self.scores = scores self.default = default self.logger = logging.getLogger(__name__)
[docs] def get_score(self, package_rating: "PackageRating") -> ScoreValue: iso_dt = self.get_breakdown_value(package_rating) if not iso_dt: return ScoreValue(0) try: dt = parse_iso_datetime(iso_dt) except ValueError: self.logger.warning( "Invalid datetime received for package %s: %s", package_rating.package.name, iso_dt, ) return ScoreValue(0) dt_delta = datetime.datetime.now(datetime.timezone.utc) - dt for delta, score in self.scores.items(): if dt_delta < delta: return ScoreValue(score) return ScoreValue(self.default)
[docs]class NullBoolBreakdown(BreakdownBase): def __init__(self, breakdown_key: str, scores: Dict[bool, ScoreBase]): self.breakdown_key = breakdown_key self.scores = scores
[docs] def get_score(self, package_rating: "PackageRating") -> ScoreBase: value = self.get_breakdown_value(package_rating) return self.scores[value]
BREAKDOWN_SCORES = [ PackageBreakdown("sourcerank_breakdown.basic_info_present", 1), PackageBreakdown("sourcerank_breakdown.source_repository_present", 1), PackageBreakdown("sourcerank_breakdown.readme_present", 1), PackageBreakdown("sourcerank_breakdown.license_present", 1), PackageBreakdown("sourcerank_breakdown.has_multiple_versions", 3), PackageBreakdown("sourcerank_breakdown.dependent_projects"), PackageBreakdown("sourcerank_breakdown.dependent_repositories"), PackageBreakdown("sourcerank_breakdown.stars"), PackageBreakdown("sourcerank_breakdown.contributors"), DateBreakdown( "pypi_package.latest_upload_iso_dt", { datetime.timedelta(days=30 * 4): 4, datetime.timedelta(days=30 * 6): 3, datetime.timedelta(days=365): 2, datetime.timedelta(days=365 + (30 * 6)): 1, datetime.timedelta(days=365 * 3): 0, datetime.timedelta(days=365 * 4): -2, }, default=-4, ), DateBreakdown( "pypi_package.first_upload_iso_dt", { datetime.timedelta(days=15): Max(0), datetime.timedelta(days=30): -3, datetime.timedelta(days=60): -2, datetime.timedelta(days=90): -1, datetime.timedelta(days=180): 0, datetime.timedelta(days=360): 1, datetime.timedelta(days=360 * 2): 2, datetime.timedelta(days=360 * 4): 3, }, default=4, ), NullBoolBreakdown( "sourcecode_page.package_in_readme", {True: ScoreValue(1), False: Max(0), None: ScoreValue(0)}, ), ]
[docs]class PackageRatingJson(TypedDict): rating_score: int global_rating_score: int vulnerabilities: List[Vulnerability] params: PackageRatingParams
[docs]class PackageRating: def __init__( self, package: "Package", params: Optional[PackageRatingParams] = None ): self.package = package if not params and self.is_cache_expired: params = self.get_params_from_package() self.save_to_cache() elif not params: params = self.get_params_from_cache() self.params: PackageRatingParams = params @property def is_cache_expired(self) -> bool: return ( not self.cache_path.exists() or self.cache_path.stat().st_mtime < (datetime.datetime.now() - MAX_CACHE_AGE).timestamp() ) @property def cache_path(self) -> Path: return RATING_CACHE_DIR / f"{self.package.name}.json"
[docs] def get_from_cache(self) -> Optional[PackageRatingCache]: with open(self.cache_path) as file: data = json.load(file) if data["schema_version"] != __version__: return None return data
[docs] def save_to_cache(self) -> PackageRatingCache: cache = { "package_name": self.package.name, "updated_at": datetime.datetime.now().isoformat(), "schema_version": __version__, "params": self.get_params_from_package(), } os.makedirs(str(self.cache_path.parent), exist_ok=True) with open(str(self.cache_path), "w") as file: json.dump(cache, file) return cache
[docs] def get_params_from_cache(self) -> PackageRatingParams: cache = self.get_from_cache() if cache is None: cache = self.save_to_cache() return cache["params"]
[docs] def get_params_from_package(self) -> PackageRatingParams: return { "sourcerank_breakdown": self.package.sourcerank.breakdown, "pypi_package": { "latest_upload_iso_dt": self.package.pypi.latest_upload_iso_dt, "first_upload_iso_dt": self.package.pypi.first_upload_iso_dt, }, "sourcecode_page": { "package_in_readme": self.package.sourcecode_page.package_in_readme, }, }
@cached_property def breakdown_scores(self) -> List[Tuple[str, ScoreBase]]: return [ (breakdown.breakdown_key, breakdown.get_score(self)) for breakdown in BREAKDOWN_SCORES ] @cached_property def descendant_rating_scores(self) -> List[Tuple["Package", int]]: return [ (package, package.rating.get_rating_score(self.package)) for package in self.package.get_descendant_packages() ] @cached_property def rating_score(self): scores = dict(self.breakdown_scores).values() value = ScoreValue(0) for score in scores: value += score return int(value)
[docs] @cache def get_vulnerabilities( self, from_package: Optional["Package"] = None ) -> List["Vulnerability"]: node = None if from_package is not None: node = self.package.get_node_from_parent(from_package) elif from_package is None: node = self.package.first_node # get_audit requires a node, so we only call it if we have one and this is used # instead of the package's own rating score if node is not None: return self.package.get_audit(node).vulnerabilities return []
[docs] def get_rating_score(self, from_package: Optional["Package"] = None) -> int: self.package.dependencies.results.analizing_package( self.package.name, self.package.dependencies.total_size ) if len(self.get_vulnerabilities(from_package)): return 0 return self.rating_score
[docs] def get_global_rating_score(self, from_package: Optional["Optional"] = None) -> int: return min( [self.get_rating_score(from_package)] + list(dict(self.descendant_rating_scores).values()), default=0, )
[docs] def as_json(self, from_package: Optional["Package"] = None) -> PackageRatingJson: return { "rating_score": self.get_rating_score(from_package), "global_rating_score": self.get_global_rating_score(from_package), "vulnerabilities": self.get_vulnerabilities(from_package), "params": self.params, }