Authentication and Security¶
Authentication and security are foundational pillars of software engineering, ensuring that systems protect sensitive data, verify user identities, and mitigate risks from adversaries. This comprehensive guide covers the theoretical foundations, practical implementations, common vulnerabilities, and best practices for building secure systems.
Table of Contents¶
- Security Fundamentals
- Authentication: Verifying Identity
- Authorization: Controlling Access
- Cryptography Essentials
- Web Application Security
- API Security
- Common Vulnerabilities (OWASP Top 10)
- Secure Development Lifecycle
- Infrastructure and Network Security
- Compliance and Standards
- Security Operations
- Advanced Topics
Security Fundamentals¶
The CIA Triad¶
The CIA triad forms the cornerstone of information security, providing a framework for evaluating and implementing security measures:
┌─────────────────┐
│ Confidentiality │
│ (Privacy) │
└────────┬────────┘
│
┌──────────────┴──────────────┐
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Integrity │◄─────────►│ Availability │
│ (Accuracy) │ │ (Access) │
└─────────────────┘ └─────────────────┘
Confidentiality: Ensuring that information is accessible only to those authorized to access it.
- Encryption (at rest and in transit)
- Access controls
- Data classification
- Need-to-know principles
Integrity: Maintaining the accuracy and completeness of data over its lifecycle.
- Hashing and checksums
- Digital signatures
- Version control
- Input validation
- Audit trails
Availability: Ensuring that authorized users have reliable access to information and resources.
- Redundancy and failover
- Load balancing
- DDoS protection
- Disaster recovery
- Backup strategies
Extended Security Properties¶
Beyond the CIA triad, modern security frameworks consider additional properties:
| Property | Description | Implementation |
|---|---|---|
| Authentication | Verifying identity | Passwords, MFA, biometrics |
| Authorization | Controlling access rights | RBAC, ABAC, ACLs |
| Non-repudiation | Preventing denial of actions | Digital signatures, audit logs |
| Accountability | Tracing actions to entities | Logging, monitoring |
| Privacy | Protecting personal information | Data minimization, anonymization |
Core Security Principles¶
Principle of Least Privilege¶
Users and processes should have only the minimum permissions necessary to perform their tasks.
# Bad: Overly permissive
def process_order(user, order):
# User has admin access to everything
db.execute_as_admin(f"INSERT INTO orders ...")
# Good: Minimal permissions
def process_order(user, order):
# User can only insert into orders table
if not user.has_permission("orders:create"):
raise PermissionDenied()
db.execute_as_user(user, "INSERT INTO orders ...")
Defense in Depth¶
Multiple layers of security controls throughout the system, so that if one layer fails, others continue to provide protection.
┌──────────────────────────────────────────────────────────────┐
│ Physical Security │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Network Security │ │
│ │ ┌──────────────────────────────────────────────────┐ │ │
│ │ │ Host Security │ │ │
│ │ │ ┌────────────────────────────────────────────┐ │ │ │
│ │ │ │ Application Security │ │ │ │
│ │ │ │ ┌──────────────────────────────────────┐ │ │ │ │
│ │ │ │ │ Data Security │ │ │ │ │
│ │ │ │ │ ┌────────────────────────────────┐ │ │ │ │ │
│ │ │ │ │ │ Sensitive Data │ │ │ │ │ │
│ │ │ │ │ └────────────────────────────────┘ │ │ │ │ │
│ │ │ │ └──────────────────────────────────────┘ │ │ │ │
│ │ │ └────────────────────────────────────────────┘ │ │ │
│ │ └──────────────────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
Fail-Safe Defaults¶
Systems should deny access by default, requiring explicit permission grants.
# Bad: Fail-open
def check_access(user, resource):
try:
return permission_service.check(user, resource)
except ServiceUnavailable:
return True # Allow access if service is down
# Good: Fail-safe
def check_access(user, resource):
try:
return permission_service.check(user, resource)
except ServiceUnavailable:
log.error("Permission service unavailable")
return False # Deny access if service is down
Separation of Duties¶
Critical tasks should require multiple individuals or systems, preventing any single point of compromise.
Code Change Flow:
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│Developer │───►│ Reviewer │───►│ CI/CD │───►│ Deploy │
│ (Write) │ │(Approve) │ │ (Test) │ │(Release) │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
│ │ │ │
└───────────────┴───────────────┴───────────────┘
No single person can push to production
Security Through Obscurity (Anti-Pattern)¶
Security should never rely solely on keeping design or implementation secret. Assume attackers will learn your system's details.
# Bad: Security through obscurity
def is_admin(user):
# "Secret" admin check - anyone who discovers this is admin
return user.id == 12345 or user.email.endswith("@secret-admin.com")
# Good: Proper authorization
def is_admin(user):
return "admin" in user.roles and user.mfa_verified
Threat Modeling¶
Threat modeling is a structured approach to identifying and prioritizing potential threats.
STRIDE Model¶
| Threat | Property Violated | Example | Mitigation |
|---|---|---|---|
| Spoofing | Authentication | Impersonating another user | Strong authentication, MFA |
| Tampering | Integrity | Modifying data in transit | Digital signatures, HTTPS |
| Repudiation | Non-repudiation | Denying an action | Audit logs, digital signatures |
| Information Disclosure | Confidentiality | Data breach | Encryption, access controls |
| Denial of Service | Availability | DDoS attack | Rate limiting, redundancy |
| Elevation of Privilege | Authorization | Gaining admin access | Least privilege, input validation |
Threat Modeling Process¶
1. Identify Assets
└── What are we protecting? (data, services, reputation)
2. Create Architecture Overview
└── Data flow diagrams, trust boundaries
3. Decompose Application
└── Entry points, exit points, data stores
4. Identify Threats (STRIDE)
└── For each component, consider each threat category
5. Document Threats
└── Threat ID, description, impact, likelihood
6. Rate Threats (DREAD or CVSS)
└── Prioritize based on risk
7. Plan Mitigations
└── Controls to reduce or eliminate threats
DREAD Risk Rating¶
| Factor | Description | Scale |
|---|---|---|
| Damage Potential | How much damage if exploited? | 0-10 |
| Reproducibility | How easy to reproduce? | 0-10 |
| Exploitability | How easy to exploit? | 0-10 |
| Affected Users | How many users affected? | 0-10 |
| Discoverability | How easy to discover? | 0-10 |
Risk Score = (D + R + E + A + D) / 5
Authentication: Verifying Identity¶
Authentication (AuthN) establishes "who you are" by verifying claimed identity. It is the first step in access control—without reliable authentication, authorization is meaningless.
Authentication Factors¶
Authentication factors are categories of evidence used to verify identity:
┌─────────────────────────────────────────────────────────────────┐
│ Authentication Factors │
├─────────────────┬─────────────────┬─────────────────────────────┤
│ Knowledge │ Possession │ Inherence │
│ (Something you │ (Something you │ (Something you are) │
│ know) │ have) │ │
├─────────────────┼─────────────────┼─────────────────────────────┤
│ • Password │ • Phone/SMS │ • Fingerprint │
│ • PIN │ • Hardware key │ • Face recognition │
│ • Security Q&A │ • Smart card │ • Iris scan │
│ • Pattern │ • TOTP app │ • Voice recognition │
│ │ • Email link │ • Behavioral biometrics │
└─────────────────┴─────────────────┴─────────────────────────────┘
Additional Factors:
┌─────────────────┬─────────────────┐
│ Location │ Time │
│ (Somewhere you │ (When you │
│ are) │ access) │
├─────────────────┼─────────────────┤
│ • IP geolocation│ • Time-based │
│ • GPS │ restrictions │
│ • Network │ • Access windows│
└─────────────────┴─────────────────┘
Password Security¶
Password Storage: Never Store Plaintext¶
Passwords must never be stored in plaintext. Instead, use a one-way cryptographic hash with proper security properties.
Evolution of Password Hashing:
| Algorithm | Status | Properties |
|---|---|---|
| MD5 | ❌ Broken | Fast, no salt, collision-prone |
| SHA-1 | ❌ Broken | Fast, collision-prone |
| SHA-256 | ⚠️ Not Ideal | Too fast for passwords |
| bcrypt | ✅ Good | Adaptive, built-in salt |
| scrypt | ✅ Good | Memory-hard, adaptive |
| Argon2 | ✅ Best | Winner of PHC, memory-hard |
Password Hashing Deep Dive¶
Salting: A random value added to each password before hashing, preventing rainbow table attacks and ensuring identical passwords produce different hashes.
Without Salt:
password123 → SHA256 → ef92b778bafe771e... (same for all users)
With Salt:
password123 + "x7Kj9mN2" → SHA256 → 8a4f2c1e... (unique per user)
password123 + "pQ3rY8sT" → SHA256 → 2b7d9e3f... (different hash!)
Work Factor: Modern password hashing algorithms include a configurable work factor that makes computation intentionally slow, thwarting brute-force attacks.
# Pseudocode: bcrypt password hashing
function hash_password(password: str, work_factor: int = 12) -> str:
salt = generate_random_bytes(16) # 128-bit salt
# bcrypt iterates 2^work_factor times
# work_factor=12 means 4,096 iterations
hash = bcrypt_hash(password, salt, work_factor)
# Result includes algorithm, work factor, salt, and hash
# Format: $2b$12$salt_22_chars_base64hash_31_chars_base64
return f"$2b${work_factor}${base64(salt)}{base64(hash)}"
function verify_password(password: str, stored_hash: str) -> bool:
# Extract parameters from stored hash
algorithm, work_factor, salt_and_hash = parse(stored_hash)
salt = salt_and_hash[:22] # First 22 chars are salt
# Recompute hash with same parameters
computed = bcrypt_hash(password, salt, work_factor)
# Constant-time comparison to prevent timing attacks
return constant_time_compare(computed, stored_hash)
Argon2 (Recommended):
import argon2
# Argon2 configuration
hasher = argon2.PasswordHasher(
time_cost=3, # Number of iterations
memory_cost=65536, # Memory usage in KB (64MB)
parallelism=4, # Number of parallel threads
hash_len=32, # Output hash length
salt_len=16 # Salt length
)
# Hash a password
hash = hasher.hash("user_password")
# Result: $argon2id$v=19$m=65536,t=3,p=4$salt$hash
# Verify a password
try:
hasher.verify(hash, "user_password")
# Password is correct
# Check if rehashing is needed (parameters changed)
if hasher.check_needs_rehash(hash):
new_hash = hasher.hash("user_password")
# Update stored hash
except argon2.exceptions.VerifyMismatchError:
# Password is incorrect
pass
Password Policies¶
| Requirement | Recommendation | Reasoning |
|---|---|---|
| Minimum length | 12+ characters | Longer passwords are exponentially harder to crack |
| Complexity rules | Discouraged | Users create predictable patterns (P@ssw0rd!) |
| Maximum length | 128+ characters | Allow passphrases |
| Composition | Allow all Unicode | Don't restrict character sets |
| Expiration | Only on breach | Forced rotation leads to weaker passwords |
| History | Prevent reuse | Check against last 10-24 passwords |
| Breach checking | Required | Check against known compromised passwords |
# Pseudocode: Modern password validation
function validate_password(password: str, user_context: dict) -> ValidationResult:
errors = []
# Length check (primary defense)
if len(password) < 12:
errors.append("Password must be at least 12 characters")
# Check against breached passwords (Have I Been Pwned API)
if is_password_breached(password):
errors.append("This password has appeared in a data breach")
# Context-specific checks
if contains_user_info(password, user_context):
errors.append("Password cannot contain your name or email")
# Common patterns check
if is_common_pattern(password): # "qwerty", "123456", etc.
errors.append("Password is too common")
# Entropy estimation (zxcvbn library)
strength = estimate_strength(password)
if strength.score < 3:
errors.append(f"Password is too weak: {strength.feedback}")
return ValidationResult(valid=len(errors) == 0, errors=errors)
Multi-Factor Authentication (MFA)¶
MFA requires two or more authentication factors, dramatically reducing the risk of account compromise.
MFA Methods Comparison¶
| Method | Security | Usability | Cost | Phishing Resistant |
|---|---|---|---|---|
| SMS OTP | Low | High | Low | ❌ No |
| Email OTP | Low | High | Low | ❌ No |
| TOTP (Authenticator) | Medium | Medium | Low | ❌ No |
| Push Notification | Medium | High | Medium | ⚠️ Partial |
| Hardware Key (FIDO2) | High | Medium | High | ✅ Yes |
| Biometric | High | High | Medium | ✅ Yes |
Time-Based One-Time Password (TOTP)¶
TOTP generates codes that change every 30 seconds based on a shared secret and current time.
┌────────────────────────────────────────────────────────────┐
│ TOTP Generation │
│ │
│ Shared Secret: JBSWY3DPEHPK3PXP (base32) │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ Counter = floor(Unix Time / 30) │ │
│ │ Counter = floor(1704067200 / 30) │ │
│ │ Counter = 56802240 │ │
│ └─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ HMAC-SHA1(secret, counter) │ │
│ │ = 1f8698690e02ca16618550ef7f19da8e..│ │
│ └─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ Dynamic Truncation → 6 digits │ │
│ │ = 847192 │ │
│ └─────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
# Pseudocode: TOTP implementation (RFC 6238)
import hmac
import hashlib
import struct
import time
import base64
def generate_totp(secret: bytes, time_step: int = 30, digits: int = 6) -> str:
# Calculate time counter
counter = int(time.time()) // time_step
# Convert counter to 8-byte big-endian
counter_bytes = struct.pack(">Q", counter)
# Generate HMAC-SHA1
hmac_hash = hmac.new(secret, counter_bytes, hashlib.sha1).digest()
# Dynamic truncation
offset = hmac_hash[-1] & 0x0F
truncated = struct.unpack(">I", hmac_hash[offset:offset + 4])[0]
truncated &= 0x7FFFFFFF # Clear sign bit
# Generate digits
otp = truncated % (10 ** digits)
return str(otp).zfill(digits)
def verify_totp(secret: bytes, code: str, window: int = 1) -> bool:
"""Verify TOTP with time window tolerance"""
for offset in range(-window, window + 1):
expected = generate_totp(secret, time_offset=offset * 30)
if constant_time_compare(expected, code):
return True
return False
FIDO2/WebAuthn (Passwordless)¶
FIDO2 is the modern standard for phishing-resistant authentication using public-key cryptography.
┌──────────────────────────────────────────────────────────────────-┐
│ WebAuthn Registration Flow │
│ │
│ User Browser Server Authenticator │
│ │ │ │ │ │
│ │──(1) Register─┼────────────────►│ │ │
│ │ │ │ │ │
│ │ │◄──(2) Challenge─┤ │ │
│ │ │ + options │ │ │
│ │ │ │ │ │
│ │ │─(3) Create──────┼─────────────────►│ │
│ │ │ credential │ │ │
│ │ │ │ │ │
│ │◄──(4) User────┤ │ │ │
│ │ verification│ │ (biometric/ │ │
│ │ (touch/bio) │ │ PIN/touch) │ │
│ │ │ │ │ │
│ │ │◄─(5) Credential─┼──────────────────┤ │
│ │ │ (public key, │ │ │
│ │ │ attestation) │ │ │
│ │ │ │ │ │
│ │ │──(6) Store─────►│ │ │
│ │ │ public key │ │ │
│ │ │ │ │ │
└────────────────────────────────────────────────────────────────-──┘
┌─────────────────────────────────────────────────────────────────-─┐
│ WebAuthn Authentication Flow │
│ │
│ User Browser Server Authenticator │
│ │ │ │ │ │
│ │──(1) Login────┼────────────────►│ │ │
│ │ │ │ │ │
│ │ │◄──(2) Challenge─┤ │ │
│ │ │ (random) │ │ │
│ │ │ │ │ │
│ │ │─(3) Get─────────┼─────────────────►│ │
│ │ │ assertion │ │ │
│ │ │ │ │ │
│ │◄──(4) User────┤ │ Sign challenge │ │
│ │ gesture │ │ with private key│ │
│ │ │ │ │ │
│ │ │◄─(5) Signed─────┼──────────────────┤ │
│ │ │ assertion │ │ │
│ │ │ │ │ │
│ │ │──(6) Verify─────►│ │ │
│ │ │ signature │ │ │
│ │ │ with pubkey │ │ │
└──────────────────────────────────────────────────────────────────┘
Why FIDO2 is Phishing-Resistant:
- Origin binding: Credentials are bound to specific domains
- Challenge-response: Unique challenge prevents replay attacks
- No shared secrets: Private key never leaves the authenticator
- User verification: Requires physical presence/biometric
Token-Based Authentication¶
Session-Based Authentication¶
Traditional server-side session management:
┌─────────────────────────────────────────────────────────────────┐
│ Session-Based Authentication │
│ │
│ Client Server │
│ │ │ │
│ │──(1) POST /login ──────────────────────►│ │
│ │ {username, password} │ │
│ │ │ │
│ │ ┌────────────┴────────────┐ │
│ │ │ Verify credentials │ │
│ │ │ Create session in store │ │
│ │ │ session_id → user_data │ │
│ │ └────────────┬────────────┘ │
│ │ │ │
│ │◄──(2) Set-Cookie: session_id=abc123 ────┤ │
│ │ │ │
│ │──(3) GET /api/data ────────────────────►│ │
│ │ Cookie: session_id=abc123 │ │
│ │ ┌────────────┴────────────┐ │
│ │ │ Lookup session in store │ │
│ │ │ abc123 → user_data │ │
│ │ └────────────┬────────────┘ │
│ │◄──(4) Response with user data ──────────┤ │
└─────────────────────────────────────────────────────────────────┘
# Pseudocode: Secure session management
class SessionManager:
def __init__(self, store: SessionStore, secret_key: bytes):
self.store = store # Redis, database, or memory
self.secret_key = secret_key
def create_session(self, user_id: str, metadata: dict) -> str:
# Generate cryptographically secure session ID
session_id = secrets.token_urlsafe(32) # 256 bits
session_data = {
"user_id": user_id,
"created_at": time.time(),
"last_activity": time.time(),
"ip_address": metadata.get("ip"),
"user_agent": metadata.get("user_agent"),
"mfa_verified": metadata.get("mfa_verified", False)
}
# Store with expiration
self.store.set(
key=f"session:{session_id}",
value=session_data,
ttl=3600 # 1 hour
)
return session_id
def validate_session(self, session_id: str, request: Request) -> Session:
session_data = self.store.get(f"session:{session_id}")
if not session_data:
raise SessionNotFound()
# Check for session fixation/hijacking
if session_data["ip_address"] != request.ip:
log.warning(f"IP mismatch for session {session_id}")
# Could invalidate or require re-auth
# Update last activity
session_data["last_activity"] = time.time()
self.store.set(f"session:{session_id}", session_data)
return Session(session_data)
def destroy_session(self, session_id: str):
self.store.delete(f"session:{session_id}")
JSON Web Tokens (JWT)¶
JWTs are self-contained tokens that carry claims and can be verified without server-side state.
JWT Structure:
┌─────────────────────────────────────────────────────────────────┐
│ JSON Web Token │
│ │
│ Header Payload Signature │
│ (Algorithm) (Claims) (Verification) │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────┐ ┌─────────┐ ┌──────────┐ │
│ │{ │ │{ │ │HMACSHA256│ │
│ │ "alg": │ │ "sub": │ │( │ │
│ │ "HS256"│ │ "123", │ │ base64( │ │
│ │ "typ": │ │ "name": │ │ header │ │
│ │ "JWT" │ │ "Alice",│ │ ) + "." │ │
│ │} │ │ "iat": │ │ + base64│ │
│ └────────┘ │ 16040...,│ │ (payload│ │
│ │ │ "exp": │ │ ), │ │
│ │ │ 16041...│ │ secret │ │
│ │ │} │ │) │ │
│ │ └─────────┘ └──────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ eyJhbGci... . eyJzdWIi... . SflKxwRJ... │
│ │
│ Complete Token: │
│ eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjMiLC... │
└─────────────────────────────────────────────────────────────────┘
Standard JWT Claims:
| Claim | Name | Description |
|---|---|---|
iss |
Issuer | Who issued the token |
sub |
Subject | Who the token is about (user ID) |
aud |
Audience | Intended recipient |
exp |
Expiration | When the token expires (Unix timestamp) |
nbf |
Not Before | Token not valid before this time |
iat |
Issued At | When the token was issued |
jti |
JWT ID | Unique identifier for the token |
# Pseudocode: JWT implementation
import json
import hmac
import hashlib
import base64
import time
class JWT:
@staticmethod
def encode(payload: dict, secret: str, algorithm: str = "HS256") -> str:
header = {"alg": algorithm, "typ": "JWT"}
# Add standard claims if not present
if "iat" not in payload:
payload["iat"] = int(time.time())
if "exp" not in payload:
payload["exp"] = int(time.time()) + 3600 # 1 hour default
# Base64url encode header and payload
header_b64 = base64url_encode(json.dumps(header))
payload_b64 = base64url_encode(json.dumps(payload))
# Create signature
message = f"{header_b64}.{payload_b64}"
if algorithm == "HS256":
signature = hmac.new(
secret.encode(),
message.encode(),
hashlib.sha256
).digest()
else:
raise UnsupportedAlgorithm(algorithm)
signature_b64 = base64url_encode(signature)
return f"{header_b64}.{payload_b64}.{signature_b64}"
@staticmethod
def decode(token: str, secret: str, verify: bool = True) -> dict:
parts = token.split(".")
if len(parts) != 3:
raise InvalidToken("Token must have 3 parts")
header_b64, payload_b64, signature_b64 = parts
if verify:
# Verify signature
message = f"{header_b64}.{payload_b64}"
expected_sig = hmac.new(
secret.encode(),
message.encode(),
hashlib.sha256
).digest()
actual_sig = base64url_decode(signature_b64)
if not hmac.compare_digest(expected_sig, actual_sig):
raise InvalidSignature()
payload = json.loads(base64url_decode(payload_b64))
# Verify expiration
if "exp" in payload and payload["exp"] < time.time():
raise TokenExpired()
# Verify not-before
if "nbf" in payload and payload["nbf"] > time.time():
raise TokenNotYetValid()
return payload
JWT Security Considerations:
| Vulnerability | Description | Mitigation |
|---|---|---|
| Algorithm confusion | Attacker changes alg to "none" | Whitelist allowed algorithms |
| Secret weakness | Weak HMAC secrets can be brute-forced | Use 256+ bit secrets |
| Token theft | XSS can steal tokens from localStorage | Use HttpOnly cookies |
| No revocation | JWTs valid until expiration | Short expiry + refresh tokens |
| Sensitive data | Claims are base64, not encrypted | Don't put secrets in payload |
Access Tokens and Refresh Tokens¶
┌────────────────────────────────────────────────────────────────┐
│ Access + Refresh Token Pattern │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Access Token │ │
│ │ • Short-lived (5-15 minutes) │ │
│ │ • Used for API requests │ │
│ │ • Stateless (JWT) │ │
│ │ • Stored in memory │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Refresh Token │ │
│ │ • Long-lived (days/weeks) │ │
│ │ • Used only to get new access tokens │ │
│ │ • Stored server-side (can be revoked) │ │
│ │ • Stored in HttpOnly cookie │ │
│ └─────────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────┘
Token Refresh Flow:
┌────────┐ ┌────────┐
│ Client │ │ Server │
└───┬────┘ └───┬────┘
│ │
│──(1) Request with expired access ────►│
│ token │
│◄──(2) 401 Unauthorized ───────────────┤
│ │
│──(3) POST /refresh ──────────────────►│
│ Cookie: refresh_token=xyz │
│ ┌─────────────┴─────────────┐
│ │ Validate refresh token │
│ │ Check not revoked │
│ │ Generate new access token │
│ │ Optionally rotate refresh │
│ └─────────────┬─────────────┘
│◄──(4) New access token ───────────────┤
│ │
│──(5) Retry original request ─────────►│
OAuth 2.0 and OpenID Connect¶
OAuth 2.0 Overview¶
OAuth 2.0 is an authorization framework that enables third-party applications to obtain limited access to a service.
OAuth 2.0 Roles:
| Role | Description | Example |
|---|---|---|
| Resource Owner | User who owns the data | End user |
| Client | Application requesting access | Your web app |
| Authorization Server | Issues tokens | Auth0, Okta, Google |
| Resource Server | API hosting protected resources | Your API |
Authorization Code Flow (Most Secure)¶
┌─────────────────────────────────────────────────────────────────────┐
│ OAuth 2.0 Authorization Code Flow with PKCE │
│ │
│ User Client App Auth Server Resource Server │
│ │ │ │ │ │
│ │──(1) Click ──►│ │ │ │
│ │ "Login" │ │ │ │
│ │ │ │ │ │
│ │ │ Generate: │ │ │
│ │ │ code_verifier │ │ │
│ │ │ code_challenge │ │ │
│ │ │ │ │ │
│ │◄─(2) Redirect to Auth Server──┤ │ │
│ │ /authorize? │ │ │
│ │ client_id=xxx │ │ │
│ │ redirect_uri=xxx │ │ │
│ │ response_type=code │ │ │
│ │ scope=openid profile │ │ │
│ │ state=random123 │ │ │
│ │ code_challenge=xxx │ │ │
│ │ code_challenge_method=S256 │ │ │
│ │ │ │ │ │
│ │─────────────(3) Login & Consent ─────────────────►│ │
│ │ │ │ │
│ │◄────────────(4) Redirect with code ──────────────┤ │
│ │ /callback?code=abc&state=random123 │ │
│ │ │ │ │ │
│ │ │──(5) Exchange code for tokens ───►│ │
│ │ │ POST /token │ │
│ │ │ code=abc │ │
│ │ │ code_verifier=original │ │
│ │ │ client_id=xxx │ │
│ │ │ redirect_uri=xxx │ │
│ │ │ │ │ │
│ │ │◄─(6) Tokens────┤ │ │
│ │ │ access_token │ │
│ │ │ refresh_token │ │
│ │ │ id_token (OIDC) │ │
│ │ │ │ │ │
│ │ │──(7) API Request ────────────────────────────────►│
│ │ │ Authorization: Bearer <token> │ │
│ │ │ │ │ │
│ │ │◄─(8) Protected Resource ──────────────────────────┤
└─────────────────────────────────────────────────────────────────────┘
PKCE (Proof Key for Code Exchange):
PKCE prevents authorization code interception attacks, especially important for public clients (mobile apps, SPAs).
# Pseudocode: PKCE implementation
import secrets
import hashlib
import base64
def generate_pkce():
# Generate random code verifier (43-128 characters)
code_verifier = secrets.token_urlsafe(32) # 43 chars
# Create code challenge (S256 method)
# challenge = BASE64URL(SHA256(verifier))
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(digest).rstrip(b'=').decode()
return {
"code_verifier": code_verifier,
"code_challenge": code_challenge,
"code_challenge_method": "S256"
}
# Client stores code_verifier securely
# Client sends code_challenge to /authorize
# Client sends code_verifier to /token
# Server verifies: SHA256(code_verifier) == code_challenge
OAuth 2.0 Grant Types¶
| Grant Type | Use Case | Security |
|---|---|---|
| Authorization Code + PKCE | Web apps, mobile apps, SPAs | Highest |
| Client Credentials | Server-to-server (no user) | High |
| Device Code | Smart TVs, CLI tools | Medium |
| Refresh Token | Renewing access tokens | High |
| ~~Implicit~~ | ~~SPAs (deprecated)~~ | ❌ Deprecated |
| ~~Password~~ | ~~Trusted first-party apps~~ | ❌ Deprecated |
OpenID Connect (OIDC)¶
OIDC is an identity layer built on OAuth 2.0, adding authentication capabilities.
Key Additions:
- ID Token: JWT containing user identity claims
- UserInfo Endpoint: API to fetch user profile
- Standard Scopes:
openid,profile,email,address,phone - Standard Claims:
sub,name,email,picture, etc.
// Example ID Token payload
{
"iss": "https://auth.example.com",
"sub": "user123",
"aud": "client_app_id",
"exp": 1704067200,
"iat": 1704063600,
"nonce": "abc123",
"name": "Alice Johnson",
"email": "alice@example.com",
"email_verified": true,
"picture": "https://example.com/alice.jpg"
}
Single Sign-On (SSO)¶
SSO allows users to authenticate once and access multiple applications.
SAML 2.0¶
Security Assertion Markup Language - XML-based protocol for enterprise SSO.
┌────────────────────────────────────────────────────────────────┐
│ SAML 2.0 Flow │
│ │
│ User Service Provider (SP) Identity Provider (IdP) │
│ │ │ │ │
│ │──(1) Access ──────►│ │ │
│ │ protected │ │ │
│ │ resource │ │ │
│ │ │ │ │
│ │◄──(2) Redirect ────┤ │ │
│ │ with SAML │ │ │
│ │ AuthnRequest │ │ │
│ │ │ │ │
│ │─────────────(3) SAML Request ────────────────►│ │
│ │ │ │ │
│ │◄────────────(4) Login page ──────────────────┤ │
│ │ │ │ │
│ │─────────────(5) Credentials ─────────────────►│ │
│ │ │ │ │
│ │◄────────────(6) SAML Response ───────────────┤ │
│ │ │ (signed assertion) │ │
│ │ │ │ │
│ │──(7) POST SAML ───►│ │ │
│ │ Response to SP │ │ │
│ │ │ │ │
│ │ ┌────┴─────────────────┐ │ │
│ │ │ Validate signature │ │ │
│ │ │ Extract attributes │ │ │
│ │ │ Create session │ │ │
│ │ └────┬─────────────────┘ │ │
│ │ │ │ │
│ │◄──(8) Access ──────┤ │ │
│ │ granted │ │ │
└────────────────────────────────────────────────────────────────┘
SSO Protocols Comparison¶
| Feature | SAML 2.0 | OAuth 2.0 | OIDC |
|---|---|---|---|
| Primary Use | Enterprise SSO | Authorization | Authentication + AuthZ |
| Token Format | XML | Opaque/JWT | JWT (ID Token) |
| Transport | HTTP POST/Redirect | HTTP | HTTP |
| Complexity | High | Medium | Medium |
| Mobile Support | Poor | Good | Good |
| Modern Apps | Legacy | Yes | Yes |
Authorization: Controlling Access¶
Authorization (AuthZ) determines what authenticated users can do—"what you can access" after proving "who you are."
Role-Based Access Control (RBAC)¶
RBAC assigns permissions to roles, and roles to users.
┌────────────────────────────────────────────────────────────────┐
│ RBAC Model │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────────────┐ │
│ │ Users │────────►│ Roles │────────►│ Permissions │ │
│ └─────────┘ has └─────────┘ has └─────────────────┘ │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────────────┐ │
│ │ Alice │────────►│ Admin │────────►│ users:create │ │
│ │ Bob │ │ │ │ users:read │ │
│ │ Charlie │ │ │ │ users:update │ │
│ └─────────┘ └─────────┘ │ users:delete │ │
│ │ reports:read │ │
│ ┌─────────┐ ┌─────────┐ │ reports:create │ │
│ │ Diana │────────►│ Editor │────────►│ posts:create │ │
│ │ Eve │ │ │ │ posts:read │ │
│ └─────────┘ └─────────┘ │ posts:update │ │
│ └─────────────────┘ │
│ ┌─────────┐ ┌─────────┐ ┌─────────────────┐ │
│ │ Frank │────────►│ Viewer │────────►│ posts:read │ │
│ └─────────┘ └─────────┘ │ reports:read │ │
│ └─────────────────┘ │
└────────────────────────────────────────────────────────────────┘
# Pseudocode: RBAC implementation
class Permission:
def __init__(self, resource: str, action: str):
self.resource = resource
self.action = action
def __str__(self):
return f"{self.resource}:{self.action}"
class Role:
def __init__(self, name: str, permissions: list[Permission]):
self.name = name
self.permissions = set(str(p) for p in permissions)
def has_permission(self, permission: str) -> bool:
return permission in self.permissions
class User:
def __init__(self, id: str, roles: list[Role]):
self.id = id
self.roles = roles
def has_permission(self, resource: str, action: str) -> bool:
permission = f"{resource}:{action}"
return any(role.has_permission(permission) for role in self.roles)
def has_role(self, role_name: str) -> bool:
return any(role.name == role_name for role in self.roles)
# Decorator for permission checking
def require_permission(resource: str, action: str):
def decorator(func):
def wrapper(request, *args, **kwargs):
if not request.user.has_permission(resource, action):
raise PermissionDenied(f"Missing permission: {resource}:{action}")
return func(request, *args, **kwargs)
return wrapper
return decorator
# Usage
@require_permission("posts", "create")
def create_post(request, title, content):
# User must have posts:create permission
return Post.create(author=request.user, title=title, content=content)
Attribute-Based Access Control (ABAC)¶
ABAC makes decisions based on attributes of the user, resource, action, and environment.
┌────────────────────────────────────────────────────────────────┐
│ ABAC Decision Flow │
│ │
│ ┌─────────────────┐ │
│ │ Access Request │ │
│ │ │ │
│ │ Subject: Alice │ │
│ │ Action: read │ │
│ │ Resource: doc123│ │
│ │ Environment: │ │
│ │ time=14:30 │ │
│ │ ip=10.0.0.5 │ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Policy Decision Point │ │
│ │ │ │
│ │ Policy 1: Department Access │ │
│ │ IF subject.department == resource.department │ │
│ │ AND action IN ["read", "write"] │ │
│ │ THEN permit │ │
│ │ │ │
│ │ Policy 2: Time Restriction │ │
│ │ IF environment.time NOT IN business_hours │ │
│ │ AND subject.role != "admin" │ │
│ │ THEN deny │ │
│ │ │ │
│ │ Policy 3: Classification Level │ │
│ │ IF resource.classification > subject.clearance │ │
│ │ THEN deny │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Decision: │ │
│ │ PERMIT / DENY │ │
│ └─────────────────┘ │
└────────────────────────────────────────────────────────────────┘
# Pseudocode: ABAC implementation
from dataclasses import dataclass
from typing import Any
from enum import Enum
class Decision(Enum):
PERMIT = "permit"
DENY = "deny"
NOT_APPLICABLE = "not_applicable"
@dataclass
class AccessRequest:
subject: dict # User attributes
action: str
resource: dict # Resource attributes
environment: dict # Context attributes
class Policy:
def __init__(self, name: str, condition: callable, effect: Decision):
self.name = name
self.condition = condition
self.effect = effect
def evaluate(self, request: AccessRequest) -> Decision:
if self.condition(request):
return self.effect
return Decision.NOT_APPLICABLE
class PolicyDecisionPoint:
def __init__(self, policies: list[Policy], combining_algorithm: str = "deny_unless_permit"):
self.policies = policies
self.combining_algorithm = combining_algorithm
def evaluate(self, request: AccessRequest) -> Decision:
decisions = [policy.evaluate(request) for policy in self.policies]
if self.combining_algorithm == "deny_unless_permit":
# Deny unless at least one policy permits and none deny
if Decision.DENY in decisions:
return Decision.DENY
if Decision.PERMIT in decisions:
return Decision.PERMIT
return Decision.DENY
elif self.combining_algorithm == "permit_unless_deny":
# Permit unless at least one policy denies
if Decision.DENY in decisions:
return Decision.DENY
return Decision.PERMIT
# Example policies
policies = [
Policy(
name="department_access",
condition=lambda r: (
r.subject.get("department") == r.resource.get("department") and
r.action in ["read", "write"]
),
effect=Decision.PERMIT
),
Policy(
name="classification_check",
condition=lambda r: (
r.resource.get("classification", 0) > r.subject.get("clearance", 0)
),
effect=Decision.DENY
),
Policy(
name="business_hours",
condition=lambda r: (
not (9 <= r.environment.get("hour", 12) <= 17) and
r.subject.get("role") != "admin"
),
effect=Decision.DENY
)
]
pdp = PolicyDecisionPoint(policies)
# Example request
request = AccessRequest(
subject={"id": "alice", "department": "engineering", "clearance": 2, "role": "developer"},
action="read",
resource={"id": "doc123", "department": "engineering", "classification": 1},
environment={"hour": 14, "ip": "10.0.0.5"}
)
decision = pdp.evaluate(request) # PERMIT
RBAC vs ABAC Comparison¶
| Aspect | RBAC | ABAC |
|---|---|---|
| Complexity | Simple | Complex |
| Flexibility | Limited | High |
| Scalability | Role explosion | Scales well |
| Audit | Easy | Complex |
| Performance | Fast (role lookup) | Slower (policy eval) |
| Use Case | Static hierarchies | Dynamic, contextual |
| Example | "Admins can delete users" | "Managers can approve expenses < $1000 for their department during business hours" |
Permission Models¶
Capability-Based Security¶
Unforgeable tokens that grant specific rights:
# Pseudocode: Capability tokens
class Capability:
def __init__(self, resource_id: str, permissions: set, constraints: dict = None):
self.resource_id = resource_id
self.permissions = permissions
self.constraints = constraints or {}
self.token = self._generate_token()
def _generate_token(self) -> str:
data = {
"resource": self.resource_id,
"permissions": list(self.permissions),
"constraints": self.constraints,
"issued_at": time.time()
}
# Sign with server secret
return jwt.encode(data, SECRET_KEY, algorithm="HS256")
@classmethod
def verify(cls, token: str) -> "Capability":
try:
data = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return cls(
resource_id=data["resource"],
permissions=set(data["permissions"]),
constraints=data["constraints"]
)
except jwt.InvalidTokenError:
raise InvalidCapability()
# Usage: Share a read-only link to a document
cap = Capability(
resource_id="doc123",
permissions={"read"},
constraints={"expires": time.time() + 86400} # 24 hours
)
share_url = f"https://app.com/shared/{cap.token}"
OAuth 2.0 Scopes¶
Scopes define the access boundaries for OAuth tokens:
# Common scope patterns
SCOPES = {
# Resource-based
"users:read": "Read user profiles",
"users:write": "Create and update users",
"posts:read": "Read posts",
"posts:write": "Create and update posts",
# Action-based
"read": "Read access to all resources",
"write": "Write access to all resources",
"admin": "Administrative access",
# OpenID Connect standard scopes
"openid": "Access to user identity",
"profile": "Access to user profile",
"email": "Access to email address",
"offline_access": "Request refresh token"
}
# Scope validation
def require_scopes(*required_scopes):
def decorator(func):
def wrapper(request, *args, **kwargs):
token_scopes = set(request.token.get("scope", "").split())
required = set(required_scopes)
if not required.issubset(token_scopes):
missing = required - token_scopes
raise InsufficientScope(f"Missing scopes: {missing}")
return func(request, *args, **kwargs)
return wrapper
return decorator
@require_scopes("users:read", "posts:read")
def get_user_posts(request, user_id):
# Requires both users:read and posts:read scopes
pass
Cryptography Essentials¶
Cryptography provides the mathematical foundation for security: confidentiality, integrity, authentication, and non-repudiation.
Symmetric Encryption¶
Uses a single shared key for both encryption and decryption.
┌────────────────────────────────────────────────────────────────┐
│ Symmetric Encryption │
│ │
│ Plaintext Shared Key Ciphertext │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │
│ │ "Hello │ ──────► │ Encrypt │ ──────► │ 7f3a2b9c... │ │
│ │ World" │ │ (AES) │ │ │ │
│ └─────────┘ └─────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 7f3a2b9c... │ ──► │ Decrypt │ ──────► │ "Hello │ │
│ │ │ │ (AES) │ │ World" │ │
│ └─────────────┘ └─────────┘ └─────────┘ │
│ ▲ │
│ │ │
│ Same Key │
└────────────────────────────────────────────────────────────────┘
Common Algorithms:
| Algorithm | Key Size | Block Size | Status | Use Case |
|---|---|---|---|---|
| AES-256-GCM | 256 bits | 128 bits | ✅ Recommended | General purpose |
| AES-256-CBC | 256 bits | 128 bits | ✅ Good | Legacy systems |
| ChaCha20-Poly1305 | 256 bits | Stream | ✅ Recommended | Mobile, TLS |
| 3DES | 168 bits | 64 bits | ⚠️ Deprecated | Legacy only |
| DES | 56 bits | 64 bits | ❌ Broken | Never use |
AES-GCM (Authenticated Encryption):
# Pseudocode: AES-GCM encryption
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import os
def encrypt_aes_gcm(plaintext: bytes, key: bytes, associated_data: bytes = b"") -> bytes:
"""
AES-GCM provides both confidentiality and integrity (authenticated encryption).
"""
# Generate random 96-bit nonce (NEVER reuse with same key!)
nonce = os.urandom(12)
# Create cipher and encrypt
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, plaintext, associated_data)
# Return nonce + ciphertext (nonce needed for decryption)
return nonce + ciphertext
def decrypt_aes_gcm(ciphertext: bytes, key: bytes, associated_data: bytes = b"") -> bytes:
"""
Decryption also verifies authenticity - fails if tampered.
"""
# Extract nonce and ciphertext
nonce = ciphertext[:12]
actual_ciphertext = ciphertext[12:]
# Decrypt and verify
aesgcm = AESGCM(key)
try:
plaintext = aesgcm.decrypt(nonce, actual_ciphertext, associated_data)
return plaintext
except InvalidTag:
raise TamperedCiphertext("Data has been modified!")
# Usage
key = os.urandom(32) # 256-bit key
message = b"Sensitive data"
aad = b"user_id:123" # Authenticated but not encrypted
encrypted = encrypt_aes_gcm(message, key, aad)
decrypted = decrypt_aes_gcm(encrypted, key, aad)
Asymmetric Encryption¶
Uses a key pair: public key for encryption, private key for decryption.
┌────────────────────────────────────────────────────────────────┐
│ Asymmetric Encryption │
│ │
│ Alice (Sender) Bob (Receiver) │
│ │
│ Bob's Public Key │
│ (shared openly) │
│ │ │
│ ┌─────────┐ ▼ ┌─────────────┐ │
│ │"Secret" │ ──────► [Encrypt] ──► │ a7f3b2c9... │ │
│ └─────────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ Bob's Private Key │
│ (kept secret) │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────┐ │
│ │ a7f3b2c9... │──►│"Secret" │ │
│ └─────────────┘ └─────────┘ │
│ [Decrypt] │
└────────────────────────────────────────────────────────────────┘
Common Algorithms:
| Algorithm | Key Size | Security | Use Case |
|---|---|---|---|
| RSA-2048 | 2048 bits | ✅ Acceptable | Legacy, key exchange |
| RSA-4096 | 4096 bits | ✅ Good | Long-term security |
| ECDSA P-256 | 256 bits | ✅ Recommended | Digital signatures |
| Ed25519 | 256 bits | ✅ Recommended | Signatures, modern |
| X25519 | 256 bits | ✅ Recommended | Key exchange |
RSA Encryption:
# Pseudocode: RSA key generation and encryption
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes
def generate_rsa_keypair(key_size: int = 2048):
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size
)
public_key = private_key.public_key()
return private_key, public_key
def rsa_encrypt(plaintext: bytes, public_key) -> bytes:
ciphertext = public_key.encrypt(
plaintext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return ciphertext
def rsa_decrypt(ciphertext: bytes, private_key) -> bytes:
plaintext = private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return plaintext
Cryptographic Hashing¶
One-way functions that produce a fixed-size digest from arbitrary input.
┌────────────────────────────────────────────────────────────────┐
│ Cryptographic Hash Function │
│ │
│ Properties: │
│ 1. Deterministic: Same input → same output │
│ 2. One-way: Cannot reverse hash to get input │
│ 3. Collision-resistant: Hard to find two inputs with same hash│
│ 4. Avalanche effect: Small input change → completely diff hash│
│ │
│ Input (any size) Output (fixed size) │
│ │ │ │
│ ▼ ▼ │
│ "Hello" ──► SHA-256 ──► 185f8db32271fe... │
│ "Hello!" ──► SHA-256 ──► 33b765b8e4e098... (totally │
│ "hello" ──► SHA-256 ──► 2cf24dba5fb0a3... different)│
│ │
│ Large file ──► SHA-256 ──► 256-bit hash (32 bytes) │
│ (any size) │
└────────────────────────────────────────────────────────────────┘
Hash Algorithm Status:
| Algorithm | Output Size | Status | Use Case |
|---|---|---|---|
| MD5 | 128 bits | ❌ Broken | Never use for security |
| SHA-1 | 160 bits | ❌ Broken | Legacy verification only |
| SHA-256 | 256 bits | ✅ Secure | General purpose |
| SHA-384 | 384 bits | ✅ Secure | Higher security margin |
| SHA-512 | 512 bits | ✅ Secure | 64-bit optimized |
| SHA-3-256 | 256 bits | ✅ Secure | Alternative to SHA-2 |
| BLAKE2b | 512 bits | ✅ Secure | Fast, modern |
| BLAKE3 | 256 bits | ✅ Secure | Fastest, parallelizable |
Digital Signatures¶
Provides authentication and non-repudiation using asymmetric cryptography.
┌────────────────────────────────────────────────────────────────┐
│ Digital Signature │
│ │
│ Signing (Alice) │
│ ┌──────────────┐ │
│ │ Message │──────────────────────┐ │
│ │ "Contract" │ │ │
│ └──────────────┘ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────┐ ┌───────────┐ │
│ │ Hash │ │ Sign │ │
│ │ SHA-256 │──► digest ──────► │ (RSA or │──► Signature │
│ └─────────┘ │ ECDSA) │ │
│ └───────────┘ │
│ ▲ │
│ │ │
│ Alice's Private Key │
│ │
│ Verification (Bob) │
│ ┌──────────────┐ ┌───────────┐ │
│ │ Message │───►│ Hash │──► digest ──┐ │
│ │ "Contract" │ │ SHA-256 │ │ │
│ └──────────────┘ └───────────┘ │ │
│ ▼ │
│ ┌──────────────┐ ┌───────────┐ ┌──────────┐ │
│ │ Signature │───►│ Verify │──────│ Match? │ │
│ └──────────────┘ │ (RSA or │ │ Valid! │ │
│ │ ECDSA) │ └──────────┘ │
│ └───────────┘ │
│ ▲ │
│ │ │
│ Alice's Public Key │
└────────────────────────────────────────────────────────────────┘
# Pseudocode: Digital signatures with Ed25519
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.exceptions import InvalidSignature
def generate_signing_keypair():
private_key = Ed25519PrivateKey.generate()
public_key = private_key.public_key()
return private_key, public_key
def sign_message(message: bytes, private_key) -> bytes:
signature = private_key.sign(message)
return signature
def verify_signature(message: bytes, signature: bytes, public_key) -> bool:
try:
public_key.verify(signature, message)
return True
except InvalidSignature:
return False
# Usage
private_key, public_key = generate_signing_keypair()
message = b"I agree to the terms and conditions"
signature = sign_message(message, private_key)
# Anyone with the public key can verify
is_valid = verify_signature(message, signature, public_key) # True
# Tampered message fails verification
is_valid = verify_signature(b"Modified message", signature, public_key) # False
Key Management¶
Proper key management is critical—the strongest encryption is useless if keys are compromised.
Key Derivation Functions (KDFs)¶
Derive cryptographic keys from passwords or other key material:
# Pseudocode: Key derivation
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
def derive_key_from_password(password: str, salt: bytes, iterations: int = 600000) -> bytes:
"""
PBKDF2 - Password-Based Key Derivation Function
Used to derive encryption keys from user passwords
"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32, # 256-bit key
salt=salt,
iterations=iterations # OWASP recommends 600,000 for SHA-256
)
return kdf.derive(password.encode())
def derive_keys_from_master(master_key: bytes, info: bytes, num_keys: int = 2) -> list[bytes]:
"""
HKDF - HMAC-based Key Derivation Function
Used to derive multiple keys from a master key
"""
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32 * num_keys,
salt=None, # Optional salt
info=info # Context-specific info
)
derived = hkdf.derive(master_key)
return [derived[i*32:(i+1)*32] for i in range(num_keys)]
# Example: Derive encryption and MAC keys from password
salt = os.urandom(16)
master_key = derive_key_from_password("user_password", salt)
encryption_key, mac_key = derive_keys_from_master(master_key, b"encryption-keys")
Secrets Management¶
┌────────────────────────────────────────────────────────────────┐
│ Secrets Management Architecture │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Secrets Manager │ │
│ │ (Vault, AWS Secrets Manager) │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Encryption │ │ Access │ │ Audit │ │ │
│ │ │ at Rest │ │ Control │ │ Logging │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Rotation │ │ Versioning │ │ Dynamic │ │ │
│ │ │ Policies │ │ │ │ Secrets │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ App │ │ App │ │ App │ │
│ │ Instance │ │ Instance │ │ Instance │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ Never store secrets in: │
│ ❌ Source code │
│ ❌ Environment variables (in plain text) │
│ ❌ Config files in repos │
│ ❌ Container images │
└────────────────────────────────────────────────────────────────┘
TLS/SSL¶
Transport Layer Security provides secure communication over networks.
TLS Best Practices (Nginx Configuration):
# Nginx TLS configuration
server {
listen 443 ssl http2;
# Certificates
ssl_certificate /path/to/fullchain.pem;
ssl_certificate_key /path/to/privkey.pem;
# Protocols - TLS 1.2 and 1.3 only
ssl_protocols TLSv1.2 TLSv1.3;
# Cipher suites (TLS 1.2)
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384;
# Prefer server ciphers
ssl_prefer_server_ciphers on;
# HSTS (force HTTPS)
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
# OCSP Stapling
ssl_stapling on;
ssl_stapling_verify on;
# Session configuration
ssl_session_timeout 1d;
ssl_session_cache shared:SSL:50m;
ssl_session_tickets off;
}
Web Application Security¶
OWASP Top 10 (2021)¶
The Open Web Application Security Project maintains a list of the most critical security risks.
A01: Broken Access Control¶
Restrictions on what authenticated users can do are not properly enforced.
Examples:
# Vulnerable: No authorization check
@app.route("/api/users/<user_id>/profile")
def get_user_profile(user_id):
return User.query.get(user_id).to_dict()
# Attacker can access any user's profile by changing user_id
# Fixed: Proper authorization
@app.route("/api/users/<user_id>/profile")
@login_required
def get_user_profile(user_id):
if current_user.id != user_id and not current_user.is_admin:
abort(403)
return User.query.get(user_id).to_dict()
IDOR (Insecure Direct Object Reference):
# Vulnerable: Predictable IDs
@app.route("/api/orders/<int:order_id>")
def get_order(order_id):
return Order.query.get(order_id).to_dict()
# Fixed: Verify ownership
@app.route("/api/orders/<int:order_id>")
@login_required
def get_order(order_id):
order = Order.query.get_or_404(order_id)
if order.user_id != current_user.id:
abort(403)
return order.to_dict()
A02: Cryptographic Failures¶
Failures related to cryptography that expose sensitive data.
Common Issues:
| Issue | Example | Mitigation |
|---|---|---|
| Weak algorithms | MD5, SHA1, DES | Use AES-256, SHA-256+ |
| Missing encryption | HTTP instead of HTTPS | Enforce TLS everywhere |
| Weak keys | Short passwords as keys | Use proper KDFs |
| Hardcoded secrets | API keys in code | Use secrets manager |
| Missing at-rest encryption | Plaintext database | Encrypt sensitive columns |
A03: Injection¶
Untrusted data is sent to an interpreter as part of a command or query.
SQL Injection:
# Vulnerable
def get_user(username):
query = f"SELECT * FROM users WHERE username = '{username}'"
return db.execute(query)
# Input: ' OR '1'='1' --
# Resulting query: SELECT * FROM users WHERE username = '' OR '1'='1' --'
# Fixed: Parameterized queries
def get_user(username):
query = "SELECT * FROM users WHERE username = :username"
return db.execute(query, {"username": username})
# Or with ORM
def get_user(username):
return User.query.filter_by(username=username).first()
Command Injection:
# Vulnerable
def ping_host(host):
return os.system(f"ping -c 1 {host}")
# Input: "google.com; rm -rf /"
# Fixed: Use subprocess with list arguments (no shell)
import subprocess
def ping_host(host):
# Validate input
if not re.match(r'^[a-zA-Z0-9.-]+$', host):
raise ValueError("Invalid hostname")
result = subprocess.run(
["ping", "-c", "1", host],
capture_output=True,
text=True
)
return result.stdout
A04: Insecure Design¶
Missing or ineffective security controls in the design phase.
Example - Business Logic Flaws:
# Vulnerable: No rate limiting on sensitive operations
@app.route("/api/password-reset", methods=["POST"])
def password_reset():
email = request.json["email"]
token = generate_reset_token(email)
send_reset_email(email, token)
return {"status": "sent"}
# Attacker can enumerate users and spam reset emails
# Fixed: Rate limiting + no user enumeration
from flask_limiter import Limiter
limiter = Limiter(app)
@app.route("/api/password-reset", methods=["POST"])
@limiter.limit("3 per minute") # Rate limit
def password_reset():
email = request.json["email"]
# Always return same response (no enumeration)
if user := User.query.filter_by(email=email).first():
token = generate_reset_token(email)
send_reset_email(email, token)
# Same response whether user exists or not
return {"status": "If an account exists, a reset email was sent"}
A05: Security Misconfiguration¶
Missing security hardening, default credentials, unnecessary features enabled.
Checklist:
- [ ] Remove default accounts/passwords
- [ ] Disable directory listing
- [ ] Remove unused features/frameworks
- [ ] Configure proper error handling (no stack traces)
- [ ] Set security headers
- [ ] Keep software updated
- [ ] Review cloud permissions (least privilege)
A06: Vulnerable and Outdated Components¶
Using components with known vulnerabilities.
# Check for vulnerabilities in dependencies
# Python
pip-audit
safety check
# JavaScript
npm audit
yarn audit
# General
snyk test
A07: Identification and Authentication Failures¶
Weaknesses in authentication mechanisms.
Checklist:
- [ ] Strong password policy
- [ ] MFA support
- [ ] Rate limiting on login
- [ ] Secure session management
- [ ] No credential exposure in URLs
- [ ] Secure password recovery
- [ ] Session timeout
A08: Software and Data Integrity Failures¶
Code and infrastructure that doesn't protect against integrity violations.
# Vulnerable: Insecure deserialization
import pickle
@app.route("/api/import", methods=["POST"])
def import_data():
data = pickle.loads(request.data) # DANGEROUS!
return process(data)
# Attacker can execute arbitrary code via crafted pickle payload
# Fixed: Use safe serialization
import json
@app.route("/api/import", methods=["POST"])
def import_data():
data = json.loads(request.data) # Safe
# Validate data structure
validate_schema(data, expected_schema)
return process(data)
A09: Security Logging and Monitoring Failures¶
Insufficient logging and monitoring.
# Good logging practices
import logging
import structlog
logger = structlog.get_logger()
@app.route("/api/login", methods=["POST"])
def login():
username = request.json.get("username")
user = authenticate(username, request.json.get("password"))
if user:
logger.info(
"login_success",
user_id=user.id,
username=username,
ip=request.remote_addr,
user_agent=request.user_agent.string
)
return create_session(user)
else:
logger.warning(
"login_failure",
username=username,
ip=request.remote_addr,
user_agent=request.user_agent.string
)
# Don't reveal whether username exists
return {"error": "Invalid credentials"}, 401
A10: Server-Side Request Forgery (SSRF)¶
Web application fetches a remote resource without validating the user-supplied URL.
# Vulnerable
@app.route("/api/fetch-url")
def fetch_url():
url = request.args.get("url")
response = requests.get(url)
return response.text
# Attacker can access internal services:
# /api/fetch-url?url=http://169.254.169.254/latest/meta-data/
# (AWS metadata service)
# Fixed: URL validation and allowlist
from urllib.parse import urlparse
import ipaddress
ALLOWED_HOSTS = {"api.example.com", "cdn.example.com"}
def is_safe_url(url: str) -> bool:
try:
parsed = urlparse(url)
# Only allow HTTPS
if parsed.scheme != "https":
return False
# Check against allowlist
if parsed.hostname not in ALLOWED_HOSTS:
return False
# Ensure not internal IP
try:
ip = ipaddress.ip_address(parsed.hostname)
if ip.is_private or ip.is_loopback:
return False
except ValueError:
pass # hostname, not IP
return True
except:
return False
@app.route("/api/fetch-url")
def fetch_url():
url = request.args.get("url")
if not is_safe_url(url):
abort(400, "Invalid URL")
response = requests.get(url, timeout=5)
return response.text
Security Headers¶
HTTP security headers provide defense-in-depth for web applications.
# Flask middleware for security headers
@app.after_request
def add_security_headers(response):
# Prevent clickjacking
response.headers["X-Frame-Options"] = "DENY"
# Prevent MIME type sniffing
response.headers["X-Content-Type-Options"] = "nosniff"
# Enable XSS filter (legacy browsers)
response.headers["X-XSS-Protection"] = "1; mode=block"
# Content Security Policy
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.example.com; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self' https://fonts.gstatic.com; "
"connect-src 'self' https://api.example.com; "
"frame-ancestors 'none'; "
"base-uri 'self'; "
"form-action 'self'"
)
# Force HTTPS
response.headers["Strict-Transport-Security"] = (
"max-age=31536000; includeSubDomains; preload"
)
# Control Referer header
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Permissions Policy (formerly Feature-Policy)
response.headers["Permissions-Policy"] = (
"geolocation=(), microphone=(), camera=()"
)
return response
Cross-Site Request Forgery (CSRF)¶
CSRF tricks users into performing unintended actions on sites where they're authenticated.
┌────────────────────────────────────────────────────────────────┐
│ CSRF Attack Flow │
│ │
│ 1. User logs into bank.com (gets session cookie) │
│ │
│ 2. User visits malicious.com │
│ │
│ 3. Malicious page contains: │
│ <form action="https://bank.com/transfer" method="POST"> │
│ <input name="to" value="attacker"> │
│ <input name="amount" value="10000"> │
│ </form> │
│ <script>document.forms[0].submit();</script> │
│ │
│ 4. Browser automatically includes bank.com cookies │
│ │
│ 5. Bank processes transfer (appears legitimate) │
└────────────────────────────────────────────────────────────────┘
CSRF Protection:
# Flask-WTF CSRF protection
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect(app)
# In templates
<form method="post">
{{ csrf_token() }}
...
</form>
# For APIs, use SameSite cookies
@app.route("/api/login", methods=["POST"])
def login():
# ... authentication ...
response = make_response({"status": "logged_in"})
response.set_cookie(
"session",
session_token,
httponly=True,
secure=True,
samesite="Strict" # or "Lax" for less strict
)
return response
SameSite Cookie Attribute:
| Value | Behavior |
|---|---|
Strict |
Cookie only sent for same-site requests |
Lax |
Cookie sent for same-site + top-level navigations |
None |
Cookie always sent (requires Secure flag) |
Cross-Site Scripting (XSS)¶
XSS allows attackers to inject malicious scripts into web pages viewed by other users.
┌────────────────────────────────────────────────────────────────┐
│ XSS Attack Types │
│ │
│ Stored XSS: │
│ 1. Attacker posts comment: <script>steal(document.cookie)</script>
│ 2. Comment saved to database │
│ 3. Other users view page, script executes │
│ │
│ Reflected XSS: │
│ 1. Attacker crafts URL: site.com/search?q=<script>...</script>│
│ 2. Victim clicks link │
│ 3. Server reflects query in response, script executes │
│ │
│ DOM-based XSS: │
│ 1. Attacker crafts URL with payload in fragment │
│ 2. Client-side JS reads fragment and inserts into DOM │
│ 3. Script executes without server involvement │
└────────────────────────────────────────────────────────────────┘
XSS Prevention:
# 1. Output encoding (context-aware)
from markupsafe import escape
# HTML context
user_input = "<script>alert('xss')</script>"
safe_output = escape(user_input)
# Result: <script>alert('xss')</script>
# 2. Use templating engine with auto-escaping
# Jinja2 auto-escapes by default
<p>{{ user_input }}</p> <!-- Automatically escaped -->
# To render raw HTML (dangerous, validate first!):
<p>{{ trusted_html|safe }}</p>
# 3. Content Security Policy
# Disable inline scripts
Content-Security-Policy: script-src 'self'
# 4. For JavaScript contexts
import json
# Safe way to pass data to JavaScript
<script>
var userData = {{ user_data|tojson }};
</script>
API Security¶
API Authentication Methods¶
| Method | Security | Use Case |
|---|---|---|
| API Keys | Low | Server-to-server, public APIs |
| Basic Auth | Low | Simple internal APIs |
| Bearer Tokens (JWT) | Medium | Stateless APIs |
| OAuth 2.0 | High | Third-party access |
| mTLS | High | Microservices, B2B |
Rate Limiting¶
Protect APIs from abuse and denial of service.
# Pseudocode: Token bucket rate limiter
class TokenBucketRateLimiter:
def __init__(self, rate: float, capacity: int, redis_client):
self.rate = rate # tokens per second
self.capacity = capacity # max tokens
self.redis = redis_client
def is_allowed(self, key: str) -> bool:
now = time.time()
# Get current state
bucket = self.redis.hgetall(f"ratelimit:{key}")
if not bucket:
# New bucket
tokens = self.capacity - 1
self.redis.hset(f"ratelimit:{key}", mapping={
"tokens": tokens,
"last_update": now
})
return True
# Refill tokens based on time elapsed
last_update = float(bucket["last_update"])
elapsed = now - last_update
tokens = float(bucket["tokens"])
tokens = min(self.capacity, tokens + elapsed * self.rate)
if tokens >= 1:
tokens -= 1
self.redis.hset(f"ratelimit:{key}", mapping={
"tokens": tokens,
"last_update": now
})
return True
return False
# Usage with Flask
limiter = TokenBucketRateLimiter(rate=10, capacity=100, redis_client=redis)
@app.before_request
def check_rate_limit():
key = f"{request.remote_addr}:{request.endpoint}"
if not limiter.is_allowed(key):
abort(429, "Too many requests")
Rate Limiting Headers:
HTTP/1.1 200 OK
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 45
X-RateLimit-Reset: 1704067200
HTTP/1.1 429 Too Many Requests
Retry-After: 60
Input Validation¶
# Schema validation with Pydantic
from pydantic import BaseModel, validator, EmailStr, constr
from typing import Optional
import re
class CreateUserRequest(BaseModel):
username: constr(min_length=3, max_length=50, regex=r'^[a-zA-Z0-9_]+$')
email: EmailStr
password: constr(min_length=12)
age: Optional[int] = None
@validator('password')
def password_strength(cls, v):
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain uppercase')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain lowercase')
if not re.search(r'[0-9]', v):
raise ValueError('Password must contain digit')
return v
@validator('age')
def age_range(cls, v):
if v is not None and (v < 0 or v > 150):
raise ValueError('Invalid age')
return v
@app.route("/api/users", methods=["POST"])
def create_user():
try:
data = CreateUserRequest(**request.json)
except ValidationError as e:
return {"errors": e.errors()}, 400
# Data is now validated and type-safe
user = User.create(
username=data.username,
email=data.email,
password_hash=hash_password(data.password)
)
return user.to_dict(), 201
API Gateway Security¶
┌────────────────────────────────────────────────────────────────┐
│ API Gateway Security │
│ │
│ External │
│ Clients ┌───────────────────────────────────────┐ │
│ │ │ API Gateway │ │
│ │ │ │ │
│ │ │ ┌─────────────────────────────────┐ │ │
│ ├──────►│ │ Authentication │ │ │
│ │ │ │ • Validate API keys/tokens │ │ │
│ │ │ │ • JWT verification │ │ │
│ │ │ └─────────────────────────────────┘ │ │
│ │ │ │ │ │
│ │ │ ┌─────────────────────────────────┐ │ │
│ │ │ │ Rate Limiting │ │ │
│ │ │ │ • Per-client limits │ │ │
│ │ │ │ • Quota management │ │ │
│ │ │ └─────────────────────────────────┘ │ │
│ │ │ │ │ │
│ │ │ ┌─────────────────────────────────┐ │ │
│ │ │ │ Input Validation │ │ │
│ │ │ │ • Schema validation │ │ │
│ │ │ │ • SQL/XSS filtering │ │ │
│ │ │ └─────────────────────────────────┘ │ │
│ │ │ │ │ │
│ │ │ ┌─────────────────────────────────┐ │ │
│ │ │ │ Logging & Monitoring │ │ │
│ │ │ │ • Request logging │ │ │
│ │ │ │ • Anomaly detection │ │ │
│ │ │ └─────────────────────────────────┘ │ │
│ │ └───────────────────────────────────────┘ │
│ │ │ │
│ │ ▼ │
│ │ ┌──────────────────────────────────────┐ │
│ │ │ Backend Services │ │
│ │ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │ │Service │ │Service │ │Service │ │ │
│ │ │ │ A │ │ B │ │ C │ │ │
│ │ │ └────────┘ └────────┘ └────────┘ │ │
│ │ └──────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────┘
Secure Development Lifecycle¶
Security in SDLC¶
┌────────────────────────────────────────────────────────────────┐
│ Secure Software Development Lifecycle │
│ │
│ Requirements Design Implementation Verification │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────────┐ ┌──────────┐ │
│ │Security │ │Threat │ │Secure │ │Security │ │
│ │Require- │ │Modeling │ │Coding │ │Testing │ │
│ │ments │ │ │ │ │ │ │ │
│ └─────────┘ └─────────┘ └─────────────┘ └──────────┘ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ • Compliance • STRIDE • Code reviews • SAST │
│ • Privacy • Attack • Static analysis • DAST │
│ • Risk │ trees • Dependency • Pen testing │
│ assessment • Security │ scanning • Fuzzing │
│ │ patterns • Secure libs │
│ │
│ Deployment │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Operations │ │
│ │ • Monitoring │ │
│ │ • Incident Resp │ │
│ │ • Patching │ │
│ └─────────────────┘ │
└────────────────────────────────────────────────────────────────┘
Static Application Security Testing (SAST)¶
Analyzes source code for vulnerabilities without executing it.
# Python
bandit -r ./src # Security linter
semgrep --config auto ./ # Pattern-based analysis
# JavaScript
npm audit # Dependency vulnerabilities
eslint --plugin security # Security-focused linting
# Multi-language
sonarqube # Comprehensive analysis
Dynamic Application Security Testing (DAST)¶
Tests running applications for vulnerabilities.
# OWASP ZAP
zap-cli quick-scan --self-contained https://target.com
# Nikto
nikto -h https://target.com
# SQLMap (authorized testing only!)
sqlmap -u "https://target.com/page?id=1" --batch
Dependency Scanning¶
# GitHub Dependabot configuration
# .github/dependabot.yml
version: 2
updates:
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10
- package-ecosystem: "npm"
directory: "/"
schedule:
interval: "weekly"
# Manual scanning
# Python
pip-audit
safety check
# JavaScript
npm audit --production
# Snyk (multi-language)
snyk test
snyk monitor
Infrastructure and Network Security¶
Zero Trust Architecture¶
"Never trust, always verify" - assume breach and verify every request.
┌────────────────────────────────────────────────────────────────┐
│ Zero Trust Principles │
│ │
│ Traditional (Castle & Moat) Zero Trust │
│ ┌────────────────────────┐ ┌────────────────────────┐ │
│ │ ┌──────────────┐ │ │ │ │
│ │ │ Trusted │ │ │ ┌────┐ ┌────┐ │ │
│ │ │ Internal │ │ │ │ 🔒 │────│ 🔒 │ │ │
│ │ │ Network │ │ │ └────┘ └────┘ │ │
│ │ └──────────────┘ │ │ │ │ │ │
│ │ │ │ │ │ ┌────┐ │ │ │
│ │ ┌──────────────┐ │ │ └───│ 🔒 │─┘ │ │
│ │ │ Firewall │ │ │ └────┘ │ │
│ │ │ (Perimeter) │ │ │ │ │ │
│ │ └──────────────┘ │ │ Every connection is │ │
│ │ │ │ │ authenticated and │ │
│ │ [Untrusted] │ │ authorized │ │
│ └────────────────────────┘ └────────────────────────┘ │
│ │
│ Key Principles: │
│ 1. Verify explicitly (user, device, location) │
│ 2. Use least privilege access │
│ 3. Assume breach │
└────────────────────────────────────────────────────────────────┘
Container Security¶
# Secure Dockerfile practices
# Use specific versions, not 'latest'
FROM python:3.11-slim-bookworm
# Run as non-root user
RUN useradd -m -s /bin/bash appuser
# Set secure permissions
WORKDIR /app
COPY --chown=appuser:appuser requirements.txt .
# Install dependencies with no cache
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY --chown=appuser:appuser . .
# Switch to non-root user
USER appuser
# Don't run as PID 1 (use tini or similar)
ENTRYPOINT ["tini", "--"]
CMD ["python", "app.py"]
# Security scanning
# trivy image myapp:latest
# docker scan myapp:latest
Secrets Management¶
# Using HashiCorp Vault
import hvac
class SecretsManager:
def __init__(self, vault_addr: str, token: str):
self.client = hvac.Client(url=vault_addr, token=token)
def get_secret(self, path: str) -> dict:
"""Retrieve secret from Vault"""
response = self.client.secrets.kv.v2.read_secret_version(path=path)
return response["data"]["data"]
def get_database_credentials(self, role: str) -> dict:
"""Get dynamic database credentials"""
response = self.client.secrets.database.generate_credentials(name=role)
return {
"username": response["data"]["username"],
"password": response["data"]["password"],
"lease_id": response["lease_id"],
"lease_duration": response["lease_duration"]
}
# Usage
secrets = SecretsManager(
vault_addr=os.environ["VAULT_ADDR"],
token=os.environ["VAULT_TOKEN"] # Or use AppRole auth
)
db_config = secrets.get_secret("database/config")
db_creds = secrets.get_database_credentials("readonly")
Compliance and Standards¶
Common Compliance Frameworks¶
| Standard | Focus | Key Requirements |
|---|---|---|
| GDPR | Personal data (EU) | Consent, data minimization, right to erasure |
| PCI DSS | Payment cards | Encryption, access control, logging |
| HIPAA | Healthcare (US) | PHI protection, audit controls |
| SOC 2 | Service orgs | Security, availability, confidentiality |
| ISO 27001 | InfoSec management | Risk management, controls |
GDPR Key Requirements¶
# GDPR compliance considerations
class UserDataHandler:
def get_user_data(self, user_id: str) -> dict:
"""Data portability - provide user data in portable format"""
user = User.query.get(user_id)
return {
"personal_info": user.to_dict(),
"orders": [o.to_dict() for o in user.orders],
"preferences": user.preferences,
"export_date": datetime.utcnow().isoformat()
}
def delete_user_data(self, user_id: str) -> bool:
"""Right to erasure - delete user data"""
user = User.query.get(user_id)
# Anonymize instead of delete if needed for legal reasons
user.email = f"deleted_{user_id}@deleted.invalid"
user.name = "Deleted User"
user.personal_data = None
# Delete related data
Order.query.filter_by(user_id=user_id).delete()
db.session.commit()
# Log for audit
audit_log.info("user_data_deleted", user_id=user_id)
return True
def record_consent(self, user_id: str, purpose: str, granted: bool):
"""Explicit consent recording"""
consent = Consent(
user_id=user_id,
purpose=purpose,
granted=granted,
timestamp=datetime.utcnow(),
ip_address=request.remote_addr
)
db.session.add(consent)
db.session.commit()
Security Operations¶
Security Logging¶
# Structured security logging
import structlog
from enum import Enum
class SecurityEventType(Enum):
LOGIN_SUCCESS = "login_success"
LOGIN_FAILURE = "login_failure"
LOGOUT = "logout"
PASSWORD_CHANGE = "password_change"
MFA_ENABLED = "mfa_enabled"
MFA_DISABLED = "mfa_disabled"
PERMISSION_DENIED = "permission_denied"
SUSPICIOUS_ACTIVITY = "suspicious_activity"
DATA_ACCESS = "data_access"
DATA_EXPORT = "data_export"
class SecurityLogger:
def __init__(self):
self.logger = structlog.get_logger("security")
def log_event(
self,
event_type: SecurityEventType,
user_id: str = None,
details: dict = None,
severity: str = "info"
):
event = {
"event_type": event_type.value,
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"ip_address": request.remote_addr if request else None,
"user_agent": request.user_agent.string if request else None,
"details": details or {}
}
log_method = getattr(self.logger, severity)
log_method("security_event", **event)
# Alert on high-severity events
if severity in ("warning", "error", "critical"):
self.send_alert(event)
security_log = SecurityLogger()
# Usage
security_log.log_event(
SecurityEventType.LOGIN_FAILURE,
user_id="user123",
details={"reason": "invalid_password", "attempt": 3},
severity="warning"
)
Incident Response¶
┌────────────────────────────────────────────────────────────────┐
│ Incident Response Process │
│ │
│ 1. Preparation │
│ • Incident response plan │
│ • Team roles and responsibilities │
│ • Communication channels │
│ • Tools and access ready │
│ │
│ 2. Detection & Analysis │
│ • Monitor alerts │
│ • Triage and classify │
│ • Determine scope and impact │
│ • Document evidence │
│ │
│ 3. Containment │
│ • Short-term: Isolate affected systems │
│ • Long-term: Apply patches, change credentials │
│ • Preserve evidence for forensics │
│ │
│ 4. Eradication │
│ • Remove malware/backdoors │
│ • Patch vulnerabilities │
│ • Verify clean state │
│ │
│ 5. Recovery │
│ • Restore from clean backups │
│ • Rebuild compromised systems │
│ • Verify normal operations │
│ • Monitor for re-infection │
│ │
│ 6. Post-Incident │
│ • Lessons learned meeting │
│ • Update response plan │
│ • Improve defenses │
│ • Report to stakeholders/regulators │
└────────────────────────────────────────────────────────────────┘
Advanced Topics¶
Zero-Knowledge Proofs¶
Prove knowledge of a secret without revealing the secret itself.
┌────────────────────────────────────────────────────────────────┐
│ Zero-Knowledge Proof Example │
│ (Simplified Password Proof) │
│ │
│ Goal: Prove you know password without sending password │
│ │
│ Setup: │
│ • Server stores: hash(password) │
│ • Server generates: random challenge │
│ │
│ Protocol: │
│ 1. Server → Client: random nonce │
│ 2. Client computes: proof = hash(password + nonce) │
│ 3. Client → Server: proof │
│ 4. Server verifies: hash(stored_hash + nonce) == proof │
│ │
│ Properties: │
│ • Completeness: Valid proof always accepted │
│ • Soundness: Invalid proof rejected with high probability │
│ • Zero-knowledge: Verifier learns nothing beyond validity │
└────────────────────────────────────────────────────────────────┘
Applications:
- Privacy-preserving authentication
- Blockchain transactions (zk-SNARKs, zk-STARKs)
- Anonymous credentials
- Secure voting
Post-Quantum Cryptography¶
Preparing for quantum computers that could break current cryptography.
| Current Algorithm | Quantum Threat | Post-Quantum Alternative |
|---|---|---|
| RSA | Broken by Shor's algorithm | Lattice-based (CRYSTALS-Kyber) |
| ECDSA | Broken by Shor's algorithm | CRYSTALS-Dilithium |
| AES-256 | Weakened (Grover's) | AES-256 (still secure) |
| SHA-256 | Weakened (Grover's) | SHA-256 (still secure) |
NIST PQC Standards (2024):
- CRYSTALS-Kyber: Key encapsulation (replacing RSA/ECDH)
- CRYSTALS-Dilithium: Digital signatures
- SPHINCS+: Hash-based signatures
- FALCON: Lattice-based signatures
Hardware Security¶
| Technology | Description | Use Case |
|---|---|---|
| HSM | Hardware Security Module | Key storage, signing |
| TPM | Trusted Platform Module | Secure boot, attestation |
| SGX | Intel Software Guard Extensions | Secure enclaves |
| TrustZone | ARM security extensions | Mobile secure world |
| Secure Element | Dedicated security chip | Payment, authentication |
Tools and Frameworks¶
Authentication & Authorization¶
| Tool | Type | Description |
|---|---|---|
| Auth0 | SaaS | Identity platform |
| Okta | SaaS | Enterprise identity |
| Keycloak | Open source | Identity and access management |
| Authlib | Library | Python OAuth/OIDC library |
| Passport.js | Library | Node.js authentication |
| Spring Security | Framework | Java security framework |
Security Testing¶
| Tool | Type | Description |
|---|---|---|
| OWASP ZAP | DAST | Web app scanner |
| Burp Suite | DAST | Security testing platform |
| Bandit | SAST | Python security linter |
| Semgrep | SAST | Multi-language analysis |
| SonarQube | SAST | Code quality & security |
| Trivy | Container | Vulnerability scanner |
Cryptography¶
| Tool | Language | Description |
|---|---|---|
| OpenSSL | C | TLS, crypto toolkit |
| libsodium | C | Modern crypto library |
| cryptography | Python | Crypto primitives |
| Bouncy Castle | Java | Crypto provider |
| Web Crypto API | JavaScript | Browser crypto |
Secrets Management¶
| Tool | Type | Description |
|---|---|---|
| HashiCorp Vault | Self-hosted/Cloud | Secrets management |
| AWS Secrets Manager | Cloud | AWS secrets |
| Azure Key Vault | Cloud | Azure secrets |
| 1Password | SaaS | Team password manager |
| SOPS | CLI | Encrypted secrets in Git |
Best Practices Summary¶
Authentication¶
- [ ] Use strong password hashing (Argon2, bcrypt)
- [ ] Implement MFA for sensitive accounts
- [ ] Use secure session management
- [ ] Implement account lockout
- [ ] Use secure password reset flows
- [ ] Consider passwordless options (WebAuthn)
Authorization¶
- [ ] Apply principle of least privilege
- [ ] Use RBAC or ABAC appropriately
- [ ] Validate authorization on every request
- [ ] Implement proper access control checks
- [ ] Log access to sensitive resources
Data Protection¶
- [ ] Encrypt data at rest and in transit
- [ ] Use TLS 1.2+ for all connections
- [ ] Protect sensitive data (PII, credentials)
- [ ] Implement proper key management
- [ ] Apply data minimization
Application Security¶
- [ ] Validate and sanitize all input
- [ ] Use parameterized queries
- [ ] Implement proper error handling
- [ ] Set security headers
- [ ] Keep dependencies updated
- [ ] Perform regular security testing
Infrastructure¶
- [ ] Implement network segmentation
- [ ] Use least privilege for services
- [ ] Enable comprehensive logging
- [ ] Monitor for anomalies
- [ ] Have incident response plan
- [ ] Regular security assessments
Further Reading¶
Books¶
- "The Web Application Hacker's Handbook" - Dafydd Stuttard
- "Cryptography Engineering" - Ferguson, Schneier, Kohno
- "Threat Modeling: Designing for Security" - Adam Shostack
- "Security Engineering" - Ross Anderson
- "Identity and Data Security for Web Development" - Jonathan LeBlanc
Resources¶
- OWASP - Open Web Application Security Project
- NIST Cybersecurity - Standards and guidelines
- CWE - Common Weakness Enumeration
- CVE - Common Vulnerabilities and Exposures
- Have I Been Pwned - Breach database
Certifications¶
- CISSP - Certified Information Systems Security Professional
- CEH - Certified Ethical Hacker
- OSCP - Offensive Security Certified Professional
- Security+ - CompTIA Security+