Quick Navigation
Mastering Python Integration with IBM Sterling OMS
Streamline your order management processes with powerful Python automation
Why Python for Sterling OMS Integration?
Rapid Development
Python's clean syntax and extensive libraries accelerate development timelines, getting your integrations to market faster.
Rich Ecosystem
From REST APIs to XML parsing, Python offers comprehensive tools for every integration scenario you'll encounter.
Data Processing Power
Pandas, NumPy, and other libraries make complex data transformations and analytics effortless.
Enterprise Ready
Mature frameworks like Django and Flask provide production-ready solutions for enterprise environments.
Real-time Capabilities
Asynchronous programming support enables real-time order processing and event-driven architectures.
Robust & Secure
Built-in security features and extensive testing frameworks ensure your integrations are reliable and secure.
Core Integration Approaches
Sterling OMS exposes powerful REST APIs that Python can consume effortlessly, providing the most modern and flexible integration approach.
import requests
import json
from datetime import datetime
class SterlingOMSClient:
def __init__(self, base_url, username, password):
self.base_url = base_url
self.session = requests.Session()
self.session.auth = (username, password)
self.session.headers.update({
'Content-Type': 'application/json',
'Accept': 'application/json'
})
def create_order(self, order_data):
"""Create a new order in Sterling OMS"""
endpoint = f"{self.base_url}/orders"
response = self.session.post(endpoint, json=order_data)
return response.json()
def get_order_status(self, order_id):
"""Retrieve order status"""
endpoint = f"{self.base_url}/orders/{order_id}"
response = self.session.get(endpoint)
return response.json()
def update_inventory(self, item_id, quantity):
"""Update inventory levels"""
endpoint = f"{self.base_url}/inventory/{item_id}"
data = {'quantity': quantity}
response = self.session.put(endpoint, json=data)
return response.json()
For legacy systems or complex business logic, XML integration provides robust support for traditional enterprise workflows.
import xml.etree.ElementTree as ET
import requests
from datetime import datetime
def create_order_xml(customer_id, items):
"""Generate Sterling OMS XML order"""
order = ET.Element("Order")
# Order header
order_header = ET.SubElement(order, "OrderHeader")
ET.SubElement(order_header, "CustomerID").text = customer_id
ET.SubElement(order_header, "OrderDate").text = datetime.now().isoformat()
# Order lines
order_lines = ET.SubElement(order, "OrderLines")
for item in items:
line = ET.SubElement(order_lines, "OrderLine")
ET.SubElement(line, "ItemID").text = item['id']
ET.SubElement(line, "Quantity").text = str(item['quantity'])
ET.SubElement(line, "UnitPrice").text = str(item['price'])
return ET.tostring(order, encoding='unicode')
def submit_xml_order(xml_data, endpoint_url):
"""Submit XML order to Sterling OMS"""
headers = {
'Content-Type': 'application/xml',
'Accept': 'application/xml'
}
response = requests.post(endpoint_url, data=xml_data, headers=headers)
return ET.fromstring(response.content)
For high-performance scenarios requiring minimal latency, direct database access provides optimal speed and control.
import psycopg2
from contextlib import contextmanager
import logging
logger = logging.getLogger(__name__)
@contextmanager
def sterling_db_connection():
"""Context manager for Sterling OMS database connections"""
conn = psycopg2.connect(
host="sterling-db-host",
database="sterling_oms",
user="db_user",
password="db_password"
)
try:
yield conn
finally:
conn.close()
def update_inventory(item_id, quantity):
"""Update inventory levels directly"""
with sterling_db_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute("""
UPDATE inventory
SET available_quantity = available_quantity - %s,
last_updated = CURRENT_TIMESTAMP
WHERE item_id = %s
RETURNING available_quantity
""", (quantity, item_id))
result = cursor.fetchone()
conn.commit()
logger.info(f"Updated inventory for {item_id}: {result[0]} remaining")
return result[0]
except Exception as e:
conn.rollback()
logger.error(f"Failed to update inventory: {e}")
raise
Advanced Integration Patterns
Handle high-volume order processing with Python's async capabilities for maximum throughput and responsiveness.
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import time
class AsyncSterlingClient:
def __init__(self, base_url, credentials):
self.base_url = base_url
self.credentials = credentials
self.semaphore = asyncio.Semaphore(10) # Limit concurrent requests
async def process_order_batch(self, orders):
"""Process multiple orders concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [
self.create_order_async(session, order)
for order in orders
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def create_order_async(self, session, order_data):
"""Async order creation with rate limiting"""
async with self.semaphore:
try:
async with session.post(
f"{self.base_url}/orders",
json=order_data,
auth=self.credentials,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
return await response.json()
else:
return {'error': f'HTTP {response.status}'}
except asyncio.TimeoutError:
return {'error': 'Request timeout'}
except Exception as e:
return {'error': str(e)}
# Usage example
async def main():
client = AsyncSterlingClient("https://api.sterling.com", ("user", "pass"))
orders = [
{'customer_id': f'CUST{i:03d}', 'items': [{'id': 'ITEM001', 'qty': 1}]}
for i in range(100)
]
start_time = time.time()
results = await client.process_order_batch(orders)
duration = time.time() - start_time
successful_orders = [r for r in results if 'error' not in r]
print(f"Processed {len(successful_orders)}/{len(orders)} orders in {duration:.2f}s")
Production Best Practices
Configuration Management
Centralized configuration with environment variables and validation.
Error Handling
Comprehensive error handling with retry logic and circuit breakers.
Monitoring & Logging
Detailed logging and performance metrics for operational visibility.
Security
OAuth2, API keys, and secure credential management.
import os
from dataclasses import dataclass
from typing import Optional
import logging
@dataclass
class SterlingConfig:
base_url: str = os.getenv('STERLING_BASE_URL')
username: str = os.getenv('STERLING_USERNAME')
password: str = os.getenv('STERLING_PASSWORD')
timeout: int = int(os.getenv('STERLING_TIMEOUT', '30'))
retry_attempts: int = int(os.getenv('STERLING_RETRY_ATTEMPTS', '3'))
max_concurrent: int = int(os.getenv('STERLING_MAX_CONCURRENT', '10'))
def __post_init__(self):
if not all([self.base_url, self.username, self.password]):
raise ValueError("Missing required Sterling OMS configuration")
# Error handling and retry logic
import time
from functools import wraps
def retry_on_failure(max_attempts=3, backoff_factor=2):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
logging.error(f"Final attempt failed for {func.__name__}: {e}")
raise
wait_time = backoff_factor ** attempt
logging.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time}s")
time.sleep(wait_time)
return None
return wrapper
return decorator
# Performance monitoring
def monitor_performance(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
logging.info(f"{func.__name__} completed in {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
logging.error(f"{func.__name__} failed after {duration:.2f}s: {e}")
raise
return wrapper
Testing Your Integration
Implement unit tests, integration tests, and performance tests to ensure your Sterling OMS integration is bulletproof.
import pytest
from unittest.mock import Mock, patch, MagicMock
from sterling_client import SterlingOMSClient
import requests
class TestSterlingOMSClient:
@pytest.fixture
def client(self):
return SterlingOMSClient("http://test-url", "user", "pass")
@pytest.fixture
def sample_order(self):
return {
'customer_id': 'CUST001',
'items': [
{'id': 'ITEM001', 'quantity': 2, 'price': '10.99'},
{'id': 'ITEM002', 'quantity': 1, 'price': '15.50'}
],
'shipping_address': {
'street': '123 Main St',
'city': 'Anytown',
'zip': '12345'
}
}
@patch('requests.Session.post')
def test_create_order_success(self, mock_post, client, sample_order):
# Mock successful response
mock_response = MagicMock()
mock_response.json.return_value = {
'order_id': '12345',
'status': 'created',
'total_amount': '36.48'
}
mock_response.status_code = 200
mock_post.return_value = mock_response
result = client.create_order(sample_order)
assert result['order_id'] == '12345'
assert result['status'] == 'created'
mock_post.assert_called_once()
@patch('requests.Session.post')
def test_create_order_failure(self, mock_post, client, sample_order):
# Mock failed response
mock_post.side_effect = requests.RequestException("Connection error")
with pytest.raises(requests.RequestException):
client.create_order(sample_order)
@patch('requests.Session.get')
def test_get_order_status(self, mock_get, client):
mock_response = MagicMock()
mock_response.json.return_value = {
'order_id': '12345',
'status': 'shipped',
'tracking_number': 'TRK123456'
}
mock_get.return_value = mock_response
result = client.get_order_status('12345')
assert result['status'] == 'shipped'
assert 'tracking_number' in result
# Integration tests
@pytest.mark.integration
class TestSterlingIntegration:
def test_full_order_lifecycle(self):
"""Test complete order lifecycle"""
# This would test against a staging environment
pass
def test_error_recovery(self):
"""Test error handling and recovery"""
pass
# Performance tests
@pytest.mark.performance
def test_concurrent_order_processing():
"""Test handling multiple concurrent orders"""
import asyncio
from async_sterling_client import AsyncSterlingClient
async def test_performance():
client = AsyncSterlingClient("http://staging-url", ("user", "pass"))
orders = [{'customer_id': f'CUST{i}'} for i in range(100)]
start_time = time.time()
results = await client.process_order_batch(orders)
duration = time.time() - start_time
assert duration < 10.0 # Should complete within 10 seconds
assert len(results) == 100
asyncio.run(test_performance())
Deployment & Operations
FROM python:3.11-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONPATH=/app
# Create app directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd --create-home --shell /bin/bash appuser
RUN chown -R appuser:appuser /app
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python health_check.py
# Expose port
EXPOSE 8000
# Run the application
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "--worker-class", "uvicorn.workers.UvicornWorker", "app:app"]
apiVersion: apps/v1
kind: Deployment
metadata:
name: sterling-integration
labels:
app: sterling-integration
spec:
replicas: 3
selector:
matchLabels:
app: sterling-integration
template:
metadata:
labels:
app: sterling-integration
spec:
containers:
- name: sterling-integration
image: your-registry/sterling-integration:latest
ports:
- containerPort: 8000
env:
- name: STERLING_BASE_URL
valueFrom:
secretKeyRef:
name: sterling-secrets
key: base-url
- name: STERLING_USERNAME
valueFrom:
secretKeyRef:
name: sterling-secrets
key: username
- name: STERLING_PASSWORD
valueFrom:
secretKeyRef:
name: sterling-secrets
key: password
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: sterling-integration-service
spec:
selector:
app: sterling-integration
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
import logging
import time
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from functools import wraps
# Metrics
order_requests_total = Counter('sterling_order_requests_total', 'Total order requests', ['method', 'status'])
order_processing_duration = Histogram('sterling_order_processing_seconds', 'Order processing duration')
active_connections = Gauge('sterling_active_connections', 'Active Sterling connections')
queue_size = Gauge('sterling_queue_size', 'Current queue size')
# Logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('sterling_integration.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def track_metrics(operation_name):
"""Decorator to track operation metrics"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
order_requests_total.labels(method=operation_name, status='success').inc()
return result
except Exception as e:
order_requests_total.labels(method=operation_name, status='error').inc()
logger.error(f"{operation_name} failed: {e}")
raise
finally:
duration = time.time() - start_time
order_processing_duration.observe(duration)
return wrapper
return decorator
# Health check endpoint
from flask import Flask, jsonify
monitoring_app = Flask(__name__)
@monitoring_app.route('/health')
def health_check():
return jsonify({'status': 'healthy', 'timestamp': time.time()})
@monitoring_app.route('/metrics')
def metrics():
# Return Prometheus metrics
pass
if __name__ == '__main__':
# Start Prometheus metrics server
start_http_server(8001)
# Start health check server
monitoring_app.run(host='0.0.0.0', port=8002)
Ready to Transform Your Order Management?
Start with a simple REST API integration and gradually incorporate more advanced patterns as your needs evolve. Python's versatility and Sterling OMS's robust APIs create endless possibilities.
Get Started Today- Start Simple: Begin with REST API integration before moving to advanced patterns
- Embrace Async: Use Python's async capabilities for high-performance integrations
- Monitor Everything: Implement comprehensive logging and metrics from day one
- Test Thoroughly: Unit tests, integration tests, and performance tests are essential
- Design for Scale: Plan for growth with proper architecture and deployment strategies
- Security First: Always use secure authentication and encrypt sensitive data
No comments:
Post a Comment