Asynchronous Communication
Introduction
Why Asynchronous Communication?
Aspect
Synchronous
Asynchronous
Message Queue Fundamentals
Core Concepts
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Any
import json
import uuid
@dataclass
class Message:
"""Base message structure."""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
type: str = ""
payload: dict = field(default_factory=dict)
metadata: dict = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.utcnow)
def to_json(self) -> str:
return json.dumps({
"id": self.id,
"type": self.type,
"payload": self.payload,
"metadata": self.metadata,
"timestamp": self.timestamp.isoformat(),
})
@classmethod
def from_json(cls, data: str) -> "Message":
parsed = json.loads(data)
parsed["timestamp"] = datetime.fromisoformat(parsed["timestamp"])
return cls(**parsed)
class MessageBroker(ABC):
"""Abstract message broker interface."""
@abstractmethod
async def publish(self, queue: str, message: Message) -> None: ...
@abstractmethod
async def subscribe(
self,
queue: str,
handler: Callable[[Message], Any]
) -> None: ...
@abstractmethod
async def close(self) -> None: ...RabbitMQ Implementation
Exchange Types in RabbitMQ
Event-Driven Architecture
Domain Events
Event Publisher
Event Consumer
Task Queues with Celery
Celery Setup
Task Patterns
Eventual Consistency
Understanding Eventual Consistency
Handling Eventual Consistency
Idempotency
Message Ordering and Partitioning
Practical Exercise
Exercise: Build an Event-Driven Order System
Key Takeaways
What's Next?
Last updated