Mobito.io

Downloading Data from AWS

Downloading Data from AWS

For each use case, two examples are provided:

Using AWS CLI

Using Python’s Boto3 library

Installation

Before executing any commands, ensure you have installed your chosen tool or library. Use the links below for installation:

Downloading files

Using the AWS CLI

To authenticate and download data from S3, follow these steps:

Configure AWS CLI: Run the following command in your terminal and enter your access key, secret key, default region, and output format:

aws configure

Download Files: After configuring AWS CLI, use the following command to download all files from the specified S3 path:

aws s3 sync s3://example-data-bucket/folder-1/ /local/path/to/save/files

Replace “example-data-bucket/folder-1/” with the S3 path provided by Mobito in the **AWS S3 Bucket Access Details **document

Replace “/local/path/to/save/files” with the local directory where you want to save the downloaded files.

This command uses the aws s3 sync to synchronize the specified S3 bucket path with your local directory, ensuring all files are downloaded efficiently.

Using Boto3 for python

Before running the script, ensure Boto3 is installed. If not, install it using:

pip install boto3

Once installed, you can run the following script:

import boto3
import os
from botocore.exceptions import NoCredentialsError
import time
import json
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed


def download_file(s3_client, bucket_name, key, local_file_path):
    """Download a single file."""
    try:
        local_dir = os.path.dirname(local_file_path)
        os.makedirs(local_dir, exist_ok=True)

        # print(f"Starting download: {key}")

        # Download file synchronously (but thread will handle concurrency)
        s3_client.download_file(bucket_name, key, local_file_path)

        # print(f"Completed download: {key}")
        return key

    except Exception as e:
        print(f"Error downloading {key}: {e}")
        return None


def download_files_from_s3(access_key, secret_key, bucket_name, s3_prefix, local_directory, max_concurrent):
    """Download files from S3 using ThreadPoolExecutor."""

    # Create S3 client
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_key,
    )

    # Initialize results tracking for JSON output
    download_results = {
        "metadata": {
            "start_time": datetime.now().isoformat(),
            "s3_prefix": s3_prefix,
            "local_directory": local_directory,
            "max_concurrent_workers": None,  # Will be set when executor is created
        },
        "download_futures": [],
        "summary": {},
    }

    try:
        print(f"Starting to list objects with prefix: {s3_prefix}")

        # First, get all objects using paginator
        paginator = s3_client.get_paginator("list_objects_v2")
        page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=s3_prefix)

        all_objects = []
        total_files = 0

        # Collect all objects first (excluding folders)
        for page in page_iterator:
            page_objects = page.get("Contents", [])

            # Filter out folders/prefixes before counting
            file_objects = [obj for obj in page_objects if not obj["Key"].endswith("/")]
            file_count = len(file_objects)

            # Add only file objects to our list for concurrent processing
            all_objects.extend(file_objects)
            total_files += file_count

        print(f"Total objects to download: {total_files}")

        if total_files == 0:
            print("No files found to download.")
            return

        # Create ThreadPoolExecutor
        start_time = time.time()

        results = []
        with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
            download_results["metadata"]["max_concurrent_workers"] = executor._max_workers
            print(f"Starting downloads with {executor._max_workers} max threads...")

            # Submit all download tasks and track future details
            future_to_key = {}
            key_to_future_info = {}  # O(1) lookup dictionary

            for i, obj in enumerate(all_objects):
                key = obj["Key"]
                local_file_path = os.path.join(local_directory, key)

                future = executor.submit(download_file, s3_client, bucket_name, key, local_file_path)
                future_to_key[future] = key

                # Track future details with file metadata
                future_info = {
                    "future_id": i + 1,
                    "key": key,
                    "local_file_path": local_file_path,
                    "size": obj["Size"],
                    "last_modified": obj.get("LastModified").isoformat() if "LastModified" in obj else None,
                    "etag": obj.get("ETag"),
                    "submitted_at": datetime.now().isoformat(),
                }
                download_results["download_futures"].append(future_info)
                key_to_future_info[key] = future_info  # Store for O(1) lookup

            # Collect results as they complete
            results = []
            for future in as_completed(future_to_key):
                key = future_to_key[future]
                completion_time = datetime.now().isoformat()

                try:
                    result = future.result()
                    results.append(result)
                    status = "success"
                    error_message = None
                except Exception as exc:
                    print(f"Download generated an exception for {key}: {exc}")
                    results.append(None)
                    status = "failed"
                    error_message = str(exc)

                # Update the future info with completion data (O(1) lookup)
                future_info = key_to_future_info[key]
                future_info.update(
                    {
                        "completed_at": completion_time,
                        "status": status,
                        "error_message": error_message,
                        "result": result if status == "success" else None,
                    }
                )

        end_time = time.time()
        duration = end_time - start_time

        # Count successful downloads
        successful_downloads = sum(1 for result in results if result is not None)
        failed_downloads = total_files - successful_downloads

        # Update summary
        download_results["summary"] = {
            "total_files": total_files,
            "successful_downloads": successful_downloads,
            "failed_downloads": failed_downloads,
            "duration_seconds": duration,
            "end_time": datetime.now().isoformat(),
        }

        print("\nDownload Summary:")
        print(f"Total files found: {total_files}")
        print(f"Successfully downloaded: {successful_downloads}")
        print(f"Failed downloads: {failed_downloads}")
        print(f"Total time: {duration:.2f} seconds")

        # Save results to JSON file
        output_file = f"download_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(output_file, "w") as f:
            json.dump(download_results, f, indent=2, default=str)

        print(f"\nDownload report saved to: {output_file}")

    except NoCredentialsError:
        print("Credentials not available or not valid.")
    except Exception as e:
        print(f"An error occurred: {e}")


def run_download():
    """Run S3 download function."""
    # Replace with your AWS credentials and other details
    aws_access_key = "<ACCESS-KEY-ID>"
    aws_secret_key = "<SECRET-KEY>"
    bucket_name = "example-data-bucket"
    s3_prefix = "folder-1"
    local_directory = "/local/path/to/save/files"

    # Max Threads
    # if None, it will default to min(32, os.cpu_count() + 4)
    max_concurrent = None

    print("Starting S3 download...")

    # Run the threaded function
    download_files_from_s3(
        aws_access_key,
        aws_secret_key,
        bucket_name,
        s3_prefix,
        local_directory,
        max_concurrent,
    )


if __name__ == "__main__":
    run_download()

Still have questions?

Let us know what you're trying to achieve and a Mobito expert will help you figure out the best data setup.

Contact us

Why guess, when mobility data can guide you?

Get in touch with our experts to discover how you can leverage vehicle data in your business