Cloud Data Platforms
Introduction
AWS Data Services
Amazon S3 - Object Storage
# s3_operations.py
import boto3
from botocore.exceptions import ClientError
import pandas as pd
from io import StringIO, BytesIO
from typing import List, Optional
from pathlib import Path
class S3Manager:
"""Manage S3 operations for data engineering"""
def __init__(self, bucket_name: str, region: str = 'us-east-1'):
"""
Initialize S3 manager.
Args:
bucket_name: S3 bucket name
region: AWS region
"""
self.bucket_name = bucket_name
self.s3_client = boto3.client('s3', region_name=region)
self.s3_resource = boto3.resource('s3', region_name=region)
self.bucket = self.s3_resource.Bucket(bucket_name)
def upload_file(self, local_path: str, s3_key: str) -> None:
"""
Upload file to S3.
Args:
local_path: Local file path
s3_key: S3 object key
"""
try:
self.s3_client.upload_file(local_path, self.bucket_name, s3_key)
print(f"Uploaded {local_path} to s3://{self.bucket_name}/{s3_key}")
except ClientError as e:
print(f"Error uploading file: {e}")
raise
def download_file(self, s3_key: str, local_path: str) -> None:
"""
Download file from S3.
Args:
s3_key: S3 object key
local_path: Local destination path
"""
try:
self.s3_client.download_file(self.bucket_name, s3_key, local_path)
print(f"Downloaded s3://{self.bucket_name}/{s3_key} to {local_path}")
except ClientError as e:
print(f"Error downloading file: {e}")
raise
def upload_dataframe(
self,
df: pd.DataFrame,
s3_key: str,
file_format: str = 'parquet'
) -> None:
"""
Upload DataFrame to S3.
Args:
df: DataFrame to upload
s3_key: S3 object key
file_format: 'parquet', 'csv', or 'json'
"""
buffer = BytesIO()
if file_format == 'parquet':
df.to_parquet(buffer, index=False)
elif file_format == 'csv':
df.to_csv(buffer, index=False)
elif file_format == 'json':
df.to_json(buffer, orient='records', lines=True)
else:
raise ValueError(f"Unsupported format: {file_format}")
buffer.seek(0)
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=s3_key,
Body=buffer.getvalue()
)
print(f"Uploaded DataFrame to s3://{self.bucket_name}/{s3_key}")
def read_dataframe(
self,
s3_key: str,
file_format: str = 'parquet'
) -> pd.DataFrame:
"""
Read DataFrame from S3.
Args:
s3_key: S3 object key
file_format: 'parquet', 'csv', or 'json'
Returns:
DataFrame
"""
obj = self.s3_client.get_object(Bucket=self.bucket_name, Key=s3_key)
if file_format == 'parquet':
return pd.read_parquet(BytesIO(obj['Body'].read()))
elif file_format == 'csv':
return pd.read_csv(BytesIO(obj['Body'].read()))
elif file_format == 'json':
return pd.read_json(BytesIO(obj['Body'].read()), lines=True)
else:
raise ValueError(f"Unsupported format: {file_format}")
def list_objects(self, prefix: str = '') -> List[str]:
"""
List objects in S3 with given prefix.
Args:
prefix: S3 key prefix
Returns:
List of object keys
"""
objects = []
paginator = self.s3_client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=self.bucket_name, Prefix=prefix):
if 'Contents' in page:
objects.extend([obj['Key'] for obj in page['Contents']])
return objects
def delete_objects(self, s3_keys: List[str]) -> None:
"""
Delete multiple objects from S3.
Args:
s3_keys: List of S3 object keys to delete
"""
if not s3_keys:
return
objects = [{'Key': key} for key in s3_keys]
# Delete in batches of 1000 (S3 limit)
for i in range(0, len(objects), 1000):
batch = objects[i:i+1000]
self.s3_client.delete_objects(
Bucket=self.bucket_name,
Delete={'Objects': batch}
)
print(f"Deleted {len(s3_keys)} objects from S3")
# Example usage
if __name__ == "__main__":
s3 = S3Manager('my-data-lake-bucket')
# Upload CSV file
s3.upload_file('data/users.csv', 'raw/users/2024-01-15/users.csv')
# Read DataFrame from S3
df = s3.read_dataframe('processed/users/users_cleaned.parquet')
# List all parquet files in processed zone
parquet_files = [
f for f in s3.list_objects('processed/')
if f.endswith('.parquet')
]
print(f"Found {len(parquet_files)} parquet files")AWS Glue - ETL Service
Amazon Redshift - Data Warehouse
Snowflake
Snowflake with Python
Databricks
Azure Data Services
Azure Data Factory
Best Practices for Cloud Data Platforms
Key Takeaways
Last updated