Skip to content

downloader

appimage_updater.core.downloader

Download manager for AppImage files.

Downloader(timeout=300, user_agent=None, max_concurrent=3)

Handles downloading AppImage files.

Source code in src/appimage_updater/core/downloader.py
def __init__(
    self,
    timeout: int = 300,
    user_agent: str | None = None,
    max_concurrent: int = 3,
) -> None:
    """Initialize downloader."""
    self.timeout = timeout
    self.user_agent = user_agent or f"AppImage-Updater/{__version__}"
    self.max_concurrent = max_concurrent

max_concurrent = max_concurrent instance-attribute

timeout = timeout instance-attribute

user_agent = user_agent or f'AppImage-Updater/{__version__}' instance-attribute

download_updates(candidates, show_progress=True) async

Download multiple updates concurrently.

Source code in src/appimage_updater/core/downloader.py
async def download_updates(
    self,
    candidates: list[UpdateCandidate],
    show_progress: bool = True,
) -> list[DownloadResult]:
    """Download multiple updates concurrently."""
    if not candidates:
        return []

    semaphore = asyncio.Semaphore(self.max_concurrent)

    if show_progress:
        with Progress(
            TextColumn("[bold blue]{task.description}", justify="right"),
            BarColumn(bar_width=None),
            "[progress.percentage]{task.percentage:>3.1f}%",
            "•",
            DownloadColumn(),
            "•",
            TransferSpeedColumn(),
            "•",
            TimeRemainingColumn(),
        ) as progress:
            tasks = []
            for candidate in candidates:
                task = asyncio.create_task(self._download_with_semaphore(semaphore, candidate, progress))
                tasks.append(task)

            return await asyncio.gather(*tasks)
    else:
        tasks = []
        for candidate in candidates:
            task = asyncio.create_task(self._download_with_semaphore(semaphore, candidate))
            tasks.append(task)

        return await asyncio.gather(*tasks)