# Part 5: WS-Security and Authentication

## Security in SOAP

REST APIs secure communication at the transport layer — use HTTPS and attach an API key or Bearer token to the HTTP headers. The security lives outside the message.

SOAP adds another layer: **WS-Security** (Web Services Security, published by OASIS in 2004) allows you to secure the *message itself* — independent of transport. You can sign a SOAP body so the recipient knows it has not been tampered with. You can encrypt part of the body so only the intended recipient can read it. The signed or encrypted payload is valid regardless of whether it travelled over HTTPS, JMS, or any other transport.

This distinction matters in regulated industries. A banking integration I worked on required both TLS *and* message signing. The bank's system verified the XML signature on every request because they needed a cryptographic proof of message integrity that could not be stripped by a proxy or load balancer.

This part covers the most common WS-Security patterns you will encounter and implement with Python.

## WS-Security Overview

WS-Security extends the SOAP Header with security-related information. The standard defines three core mechanisms:

1. **UsernameToken** — username and password (plaintext or hashed) in the header
2. **Timestamp** — a created/expires window to prevent replay attacks
3. **X.509 Certificate Tokens** — digital signature and/or encryption using public-key cryptography

These can be combined. A common enterprise configuration is: UsernameToken + Timestamp for authentication, plus TLS for transport confidentiality.

A WS-Security header looks like this:

```xml
<soap:Header>
    <wsse:Security
        xmlns:wsse="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"
        xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"
        soap:mustUnderstand="1">

        <!-- Timestamp to prevent replay attacks -->
        <wsu:Timestamp wsu:Id="TS-1">
            <wsu:Created>2026-03-14T10:00:00Z</wsu:Created>
            <wsu:Expires>2026-03-14T10:05:00Z</wsu:Expires>
        </wsu:Timestamp>

        <!-- Username and password credentials -->
        <wsse:UsernameToken wsu:Id="UT-1">
            <wsse:Username>api_user</wsse:Username>
            <wsse:Password
                Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest">
                <!-- Base64(SHA1(nonce + created + password)) -->
                3V5JQJR...
            </wsse:Password>
            <wsse:Nonce EncodingType="...#Base64Binary">abc123</wsse:Nonce>
            <wsu:Created>2026-03-14T10:00:00Z</wsu:Created>
        </wsse:UsernameToken>

    </wsse:Security>
</soap:Header>
```

## WS-Security with zeep

zeep's `wsse` module provides ready-to-use WS-Security plugins that attach to the transport layer.

### UsernameToken — Password Text

The simplest form: sends the username and password in plaintext. Only use this over TLS (HTTPS).

```python
# wsse_username_text.py
from zeep import Client
from zeep.wsse.username import UsernameToken

client = Client(
    wsdl="https://api.example.com/service?wsdl",
    wsse=UsernameToken(
        username="api_user",
        password="secret_password",
        use_digest=False,   # PasswordText — plaintext, requires TLS
    ),
)

result = client.service.GetProduct(ProductId="SKU-001")
```

### UsernameToken — Password Digest

A more secure option: sends `Base64(SHA1(nonce + created + password))` instead of the plaintext password. The server computes the same digest and compares. This adds some protection even without TLS, but cannot replace encryption.

```python
# wsse_username_digest.py
from zeep import Client
from zeep.wsse.username import UsernameToken

client = Client(
    wsdl="https://api.example.com/service?wsdl",
    wsse=UsernameToken(
        username="api_user",
        password="secret_password",
        use_digest=True,    # PasswordDigest — hashed
    ),
)

result = client.service.GetProduct(ProductId="SKU-001")
```

Internally, zeep generates a random nonce, a current timestamp, computes the digest, and inserts a `wsse:UsernameToken` into the Security header automatically.

### Adding a Timestamp

Timestamps declare a validity window for the message. The receiving service rejects requests outside that window, preventing replay attacks.

```python
# wsse_with_timestamp.py
from zeep import Client
from zeep.wsse.username import UsernameToken
from zeep.wsse.utils import WSSeTimestamp

# zeep does not have a standalone Timestamp plugin, but UsernameToken
# includes timestamp support with the timestamp_token keyword argument.
# For a standalone timestamp, combine plugins using the Signature class.

# UsernameToken alone also includes Created in the token, which acts
# as a form of timestamp protection. Pair it with short-lived tokens
# by setting a tight nonce window on the server side.

client = Client(
    wsdl="https://api.example.com/service?wsdl",
    wsse=UsernameToken(
        username="api_user",
        password="secret",
        use_digest=True,
        timestamp_token=True,   # Adds wsu:Timestamp to the Security header
    ),
)
```

### Message Signing with X.509 Certificates

Message signing is common when integrating with banking and government systems. The client signs the SOAP body (and optionally headers) with its private key. The server verifies the signature using the client's public certificate.

You need:

* A client private key (`.pem` or `.pfx`)
* A client certificate (`.pem`) — the public part, shared with the server
* The server's public certificate (for encrypting or verifying server responses)

```bash
# Generate a self-signed certificate for testing
openssl req -x509 -newkey rsa:2048 \
    -keyout client_private.pem \
    -out client_cert.pem \
    -days 365 \
    -nodes \
    -subj "/CN=SOAP Client/O=Example/C=US"
```

```python
# wsse_signature.py
from zeep import Client
from zeep.wsse.signature import Signature

client = Client(
    wsdl="https://api.example.com/service?wsdl",
    wsse=Signature(
        private_key_filename="client_private.pem",
        public_key_filename="client_cert.pem",
    ),
)

result = client.service.GetProduct(ProductId="SKU-001")
```

zeep will sign the SOAP body using the private key and embed the certificate reference in the Security header. The server uses the public certificate to verify the signature.

### Combined: Signature + UsernameToken

When a service requires both authentication credentials and a signed message:

```python
# wsse_combined.py
from zeep import Client
from zeep.wsse.compose import Compose
from zeep.wsse.username import UsernameToken
from zeep.wsse.signature import Signature

client = Client(
    wsdl="https://api.example.com/service?wsdl",
    wsse=Compose([
        UsernameToken(
            username="api_user",
            password="secret",
            use_digest=True,
            timestamp_token=True,
        ),
        Signature(
            private_key_filename="client_private.pem",
            public_key_filename="client_cert.pem",
        ),
    ]),
)
```

## Implementing WS-Security in a spyne Server

### Requiring UsernameToken on the Server

spyne does not validate WS-Security out of the box. You implement it as an event handler that runs before the method call:

```python
# soap_inventory/security.py
import hashlib
import base64
from datetime import datetime, timezone, timedelta

from lxml import etree
from spyne.error import Unauthorized


# WS-Security namespaces
WSSE_NS = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"
WSU_NS = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"
PASSWORD_TEXT_TYPE = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordText"
PASSWORD_DIGEST_TYPE = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest"

# A simple credential store — in production, use a secrets manager or database
_VALID_CREDENTIALS = {
    "api_user": "secret_password",
}


def _extract_username_token(header_document) -> tuple[str, str, str, str]:
    """Extract username, password, nonce, and created from WS-Security header."""
    ns = {"wsse": WSSE_NS, "wsu": WSU_NS}

    username_elements = header_document.xpath("//wsse:Username", namespaces=ns)
    password_elements = header_document.xpath("//wsse:Password", namespaces=ns)
    nonce_elements = header_document.xpath("//wsse:Nonce", namespaces=ns)
    created_elements = header_document.xpath("//wsu:Created", namespaces=ns)

    if not username_elements or not password_elements:
        raise Unauthorized("Missing UsernameToken in Security header")

    username = username_elements[0].text
    password = password_elements[0].text
    password_type = password_elements[0].get("Type", PASSWORD_TEXT_TYPE)
    nonce = nonce_elements[0].text if nonce_elements else ""
    created = created_elements[0].text if created_elements else ""

    return username, password, password_type, nonce, created


def _verify_password_digest(stored_password: str, nonce_b64: str, created: str, digest_b64: str) -> bool:
    """Verify WS-Security PasswordDigest: Base64(SHA1(nonce + created + password))."""
    nonce_bytes = base64.b64decode(nonce_b64)
    password_bytes = stored_password.encode("utf-8")
    created_bytes = created.encode("utf-8")

    expected_sha1 = hashlib.sha1(nonce_bytes + created_bytes + password_bytes).digest()
    expected_digest = base64.b64encode(expected_sha1).decode()

    return expected_digest == digest_b64


def _verify_timestamp(created_str: str, max_age_seconds: int = 300) -> bool:
    """Verify the token timestamp is within the allowed age window."""
    if not created_str:
        return True   # No timestamp to verify

    try:
        created_dt = datetime.fromisoformat(created_str.replace("Z", "+00:00"))
        now = datetime.now(timezone.utc)
        age = (now - created_dt).total_seconds()
        return 0 <= age <= max_age_seconds
    except ValueError:
        return False


def authenticate_request(ctx):
    """
    spyne event handler that validates WS-Security UsernameToken.
    Attach this to the Application event manager.
    """
    # Access the raw incoming XML document
    if ctx.in_document is None:
        raise Unauthorized("No SOAP document received")

    try:
        username, password, password_type, nonce, created = _extract_username_token(
            ctx.in_document
        )
    except Unauthorized:
        raise
    except Exception as exc:
        raise Unauthorized(f"Failed to parse security header: {exc}")

    # Verify timestamp to prevent replay attacks
    if not _verify_timestamp(created, max_age_seconds=300):
        raise Unauthorized("Token timestamp invalid or expired")

    # Verify credentials
    stored = _VALID_CREDENTIALS.get(username)
    if stored is None:
        raise Unauthorized("Unknown username")

    if password_type == PASSWORD_DIGEST_TYPE:
        if not _verify_password_digest(stored, nonce, created, password):
            raise Unauthorized("Invalid password digest")
    else:
        # PasswordText — only allow over TLS (enforce at infrastructure level)
        if password != stored:
            raise Unauthorized("Invalid password")
```

Register the handler in the application:

```python
# soap_inventory/app.py
from spyne import Application
from spyne.protocol.soap import Soap11
from spyne.server.wsgi import WsgiApplication

from .security import authenticate_request
from .service import InventoryService

application = Application(
    services=[InventoryService],
    tns="http://example.com/inventory",
    name="InventoryService",
    in_protocol=Soap11(validator="lxml"),
    out_protocol=Soap11(),
)

# Register security handler — runs before every method call
application.event_manager.add_listener("method_call", authenticate_request)

wsgi_app = WsgiApplication(application)
```

## Configuring TLS

For production, always run SOAP services behind TLS. With Flask, use `ssl_context`:

```python
# server_tls.py
import ssl
from soap_inventory.server import flask_app

if __name__ == "__main__":
    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
    context.load_cert_chain(
        certfile="server_cert.pem",
        keyfile="server_private.pem",
    )
    flask_app.run(
        host="0.0.0.0",
        port=8443,
        ssl_context=context,
    )
```

In production deployments, terminate TLS at a reverse proxy (nginx, AWS ALB, etc.) and let Flask run plain HTTP on localhost. This is the standard pattern.

## Storing Credentials Securely

Never hardcode credentials in source code. For SOAP client credentials, use environment variables or a secrets manager:

```python
# secure_credentials.py
import os
from zeep import Client
from zeep.wsse.username import UsernameToken

username = os.environ.get("SOAP_USERNAME")
password = os.environ.get("SOAP_PASSWORD")

if not username or not password:
    raise EnvironmentError("SOAP_USERNAME and SOAP_PASSWORD must be set")

client = Client(
    wsdl=os.environ["SOAP_WSDL_URL"],
    wsse=UsernameToken(username=username, password=password, use_digest=True),
)
```

For certificates in cloud environments, store PEM content as secrets and write to a temp file at startup:

```python
# cert_from_secret.py
import os
import tempfile
from zeep import Client
from zeep.wsse.signature import Signature

# Certificates stored as environment variables (base64-encoded PEM)
import base64

private_key_pem = base64.b64decode(os.environ["SOAP_PRIVATE_KEY_B64"])
client_cert_pem = base64.b64decode(os.environ["SOAP_CLIENT_CERT_B64"])

# Write to temp files (make sure temp directory is secure and not world-readable)
with tempfile.NamedTemporaryFile(delete=False, suffix=".pem", mode="wb") as kf:
    kf.write(private_key_pem)
    key_path = kf.name

with tempfile.NamedTemporaryFile(delete=False, suffix=".pem", mode="wb") as cf:
    cf.write(client_cert_pem)
    cert_path = cf.name

try:
    client = Client(
        wsdl=os.environ["SOAP_WSDL_URL"],
        wsse=Signature(
            private_key_filename=key_path,
            public_key_filename=cert_path,
        ),
    )
    result = client.service.GetProduct(ProductId="SKU-001")
finally:
    # Clean up temporary cert files
    os.unlink(key_path)
    os.unlink(cert_path)
```

## Security Summary

| Mechanism               | Provides                                        | Requires      | zeep support                          |
| ----------------------- | ----------------------------------------------- | ------------- | ------------------------------------- |
| PasswordText over HTTPS | Authentication + confidentiality                | TLS           | `UsernameToken(use_digest=False)`     |
| PasswordDigest          | Authentication + replay protection              | Nothing extra | `UsernameToken(use_digest=True)`      |
| Timestamp               | Replay attack prevention                        | Nothing extra | `UsernameToken(timestamp_token=True)` |
| X.509 Signature         | Message integrity + non-repudiation             | Certificates  | `Signature(...)`                      |
| X.509 Encryption        | Message confidentiality (transport-independent) | Certificates  | `zeep[xmlsec]` needed                 |

For the vast majority of enterprise SOAP integrations I have worked on: **TLS + UsernameToken with PasswordDigest** is sufficient and widely supported. Reserve X.509 signing for integrations where the requirement is explicitly specified in the service contract.

## What's Next

You can now:

* Add WS-Security headers to zeep clients (UsernameToken, Signature)
* Validate WS-Security tokens in spyne servers
* Protect against replay attacks with timestamps
* Manage certificates and credentials securely

In [Part 6](https://blog.htunnthuthu.com/getting-started/programming/soap-101/part-6-error-handling-soap-faults), we cover SOAP Faults — how to define and raise structured errors in spyne services and how to handle them gracefully in zeep clients.
