Assist_Design/docs/INTEGRATION-DATAFLOW.md

23 KiB

Integration & Data Flow Design

Customer Portal - External System Integration Architecture


Table of Contents

  1. Integration Overview
  2. Integration Patterns
  3. Salesforce Integration
  4. WHMCS Integration
  5. Freebit SIM Management
  6. Data Transformation
  7. Error Handling
  8. Caching Strategy

Integration Overview

The BFF acts as an integration layer between the portal and external systems, following clean architecture principles with clear separation between infrastructure and domain concerns.

Integration Architecture

┌────────────────────────────────────────────────────────────┐
│                     BFF Integration Layer                   │
│                                                             │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐    │
│  │  Salesforce  │  │    WHMCS     │  │   Freebit    │    │
│  │ Integration  │  │ Integration  │  │ Integration  │    │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘    │
│         │                  │                  │             │
│  ┌──────▼──────────────────▼──────────────────▼───────┐   │
│  │          Domain Mappers                             │   │
│  │   (Transform raw data → domain types)               │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                             │
│  ┌──────────────────────────────────────────────────────┐   │
│  │     Application Services (Orchestrators)             │   │
│  │  (Business workflows, multi-system coordination)     │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

Core Principle: "Map Once, Use Everywhere"

Domain mappers are the ONLY place that transforms raw provider data to domain types.

External API → Integration Service → Domain Mapper → Domain Type → Use Everywhere
                                      ↑
                                SINGLE transformation!

Integration Patterns

Integration Service Structure

Each external system integration follows a consistent pattern:

apps/bff/src/integrations/{provider}/
├── services/
│   ├── {provider}-connection.service.ts   # HTTP client, authentication
│   ├── {provider}-{entity}.service.ts     # Entity-specific operations
│   └── {provider}-orchestrator.service.ts # Coordinates multiple operations
├── utils/
│   └── {entity}-query-builder.ts          # Query construction (SOQL, etc.)
└── {provider}.module.ts                   # NestJS module

Integration Service Responsibilities

DO:

  1. Build queries (SOQL, API parameters)
  2. Execute API calls (HTTP, authentication)
  3. Use domain mappers to transform responses
  4. Return domain types
  5. Handle errors gracefully

DON'T: 6. Add additional mapping beyond domain mappers 7. Include business logic 8. Expose raw provider types 9. Create wrapper services around domain mappers

Example Integration Service

@Injectable()
export class SalesforceOrderService {
  constructor(private readonly sf: SalesforceConnection) {}

  async getOrderById(orderId: string): Promise<OrderDetails | null> {
    // 1. Build query (infrastructure concern)
    const fields = buildOrderSelectFields(["Account.Name"]).join(", ");
    const soql = `
      SELECT ${fields}
      FROM Order
      WHERE Id = '${orderId}'
      LIMIT 1
    `;

    // 2. Execute query
    const result = await this.sf.query(soql);
    const order = result.records?.[0];
    
    if (!order) return null;

    // 3. Use domain mapper (SINGLE transformation!)
    return OrderProviders.Salesforce.transformSalesforceOrderDetails(order, []);
  }
}

Salesforce Integration

Connection Architecture

┌──────────────────────────────────────────────────────────┐
│          Salesforce Integration Services                 │
│                                                           │
│  ┌─────────────────┐        ┌────────────────────────┐  │
│  │  REST API       │        │  Platform Events       │  │
│  │  (JSForce)      │        │  (Pub/Sub gRPC)        │  │
│  └────────┬────────┘        └──────────┬─────────────┘  │
│           │                            │                 │
│  ┌────────▼────────────────────────────▼─────────────┐  │
│  │        SalesforceConnection Service               │  │
│  │  • OAuth2 JWT authentication                       │  │
│  │  • Connection pooling                              │  │
│  │  • Retry logic                                     │  │
│  └────────────────────────────────────────────────────┘  │
└──────────────────────────────────────────────────────────┘

REST API Integration

Authentication: OAuth 2.0 JWT Bearer Flow

@Injectable()
export class SalesforceConnection {
  private connection: jsforce.Connection;

  async connect() {
    this.connection = new jsforce.Connection({
      instanceUrl: process.env.SF_INSTANCE_URL,
      accessToken: await this.getAccessToken()
    });
  }

  private async getAccessToken(): Promise<string> {
    // JWT Bearer Token Flow
    const jwt = this.createJWT({
      iss: process.env.SF_CLIENT_ID,
      sub: process.env.SF_USERNAME,
      aud: process.env.SF_LOGIN_URL,
      exp: Math.floor(Date.now() / 1000) + 300
    });

    const response = await fetch(`${process.env.SF_LOGIN_URL}/services/oauth2/token`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
      body: new URLSearchParams({
        grant_type: 'urn:ietf:params:oauth:grant-type:jwt-bearer',
        assertion: jwt
      })
    });

    const { access_token } = await response.json();
    return access_token;
  }

  async query<T>(soql: string): Promise<QueryResult<T>> {
    return this.connection.query<T>(soql);
  }
}

Platform Events (Event-Driven)

Purpose: Receive order provisioning requests from Salesforce without inbound webhooks.

@Injectable()
export class PlatformEventsSubscriber implements OnModuleInit {
  private client: avro.PublishableClient;
  
  async onModuleInit() {
    await this.subscribe();
  }

  private async subscribe() {
    const client = await this.createPubSubClient();
    
    // Subscribe to OrderProvisionRequested__e
    const subscription = await client.subscribe({
      topicName: '/event/OrderProvisionRequested__e',
      numRequested: 100,
      replayPreset: grpc.ReplayPreset.LATEST
    });

    for await (const event of subscription) {
      await this.handleProvisioningEvent(event);
    }
  }

  private async handleProvisioningEvent(event: any) {
    const { orderId__c, idemKey__c } = event.payload;
    
    // Enqueue provisioning job
    await this.provisioningQueue.add('provision-order', {
      orderId: orderId__c,
      idempotencyKey: idemKey__c || orderId__c
    });
    
    this.logger.log({ orderId: orderId__c }, 'Provisioning job enqueued');
  }
}

Benefits:

  • No inbound webhooks (security)
  • Durable replay (reliability)
  • Real-time processing (performance)
  • Scalable (high volume)

Salesforce Query Builders

Query builders are infrastructure concerns and belong in the BFF integration layer.

// apps/bff/src/integrations/salesforce/utils/order-query-builder.ts

export function buildOrderSelectFields(additional: string[] = []): string[] {
  const fields = [
    'Id',
    'AccountId',
    'Status',
    'Type',
    'EffectiveDate',
    'OrderNumber',
    'TotalAmount',
    'CreatedDate',
    'Account.Name',
    'Account.Email__c'
  ];
  
  return UNIQUE([...fields, ...additional]);
}

export function buildOrderItemsQuery(orderId: string): string {
  return `
    SELECT Id, OrderId, Product2Id, Quantity, UnitPrice, TotalPrice,
           Product2.Name, Product2.StockKeepingUnit,
           Product2.Item_Class__c
    FROM OrderItem
    WHERE OrderId = '${orderId}'
  `;
}

Salesforce Data Entities

Orders:

  • Read orders: GET /ordersSELECT ... FROM Order
  • Create orders: POST /orderssobject('Order').create()
  • OrderItems created separately

Products (Catalog):

  • Read catalog: GET /catalogSELECT ... FROM Product2 WHERE Portal_Catalog__c = true
  • Includes related PricebookEntries

Accounts (Customers):

  • Customer profile linked to Account

WHMCS Integration

Connection Architecture

┌──────────────────────────────────────────────────────────┐
│             WHMCS Integration Services                    │
│                                                           │
│  ┌────────────────────┐       ┌────────────────────────┐ │
│  │   REST API         │       │   Webhooks (Inbound)   │ │
│  │   (API Key Auth)   │       │   (HMAC Validation)    │ │
│  └──────────┬─────────┘       └─────────┬──────────────┘ │
│             │                           │                 │
│  ┌──────────▼───────────────────────────▼──────────────┐ │
│  │          WhmcsConnection Service                     │ │
│  │  • API key authentication                            │ │
│  │  • Request signing                                   │ │
│  │  • Response validation                               │ │
│  └──────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────┘

REST API Integration

Authentication: API Key + Secret

@Injectable()
export class WhmcsConnection {
  private readonly apiUrl = process.env.WHMCS_API_URL;
  private readonly apiKey = process.env.WHMCS_API_KEY;
  private readonly apiSecret = process.env.WHMCS_API_SECRET;

  async call<T>(action: string, params: Record<string, any> = {}): Promise<T> {
    const response = await fetch(this.apiUrl, {
      method: 'POST',
      headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
      body: new URLSearchParams({
        action,
        identifier: this.apiKey,
        secret: this.apiSecret,
        responsetype: 'json',
        ...params
      })
    });

    const data = await response.json();
    
    if (data.result === 'error') {
      throw new WhmcsApiException(data.message);
    }

    return data;
  }
}

WHMCS API Operations

Invoices:

@Injectable()
export class WhmcsInvoiceService {
  constructor(
    private readonly whmcs: WhmcsConnection,
    private readonly currencyService: CurrencyService
  ) {}

  async getInvoices(clientId: number): Promise<Invoice[]> {
    const response = await this.whmcs.call('GetInvoices', { clientid: clientId });
    
    if (!response.invoices?.invoice) return [];
    
    const defaultCurrency = this.currencyService.getDefaultCurrency();
    
    return response.invoices.invoice.map(whmcsInvoice => 
      Providers.Whmcs.transformWhmcsInvoice(whmcsInvoice, {
        defaultCurrencyCode: defaultCurrency.code,
        defaultCurrencySymbol: defaultCurrency.prefix || defaultCurrency.suffix
      })
    );
  }
}

Orders (Provisioning):

@Injectable()
export class WhmcsOrderService {
  async addOrder(params: WhmcsAddOrderParams): Promise<{ orderId: number }> {
    return this.whmcs.call('AddOrder', {
      clientid: params.clientId,
      paymentmethod: params.paymentMethod,
      pid: params.productIds,
      billingcycle: params.billingCycles,
      qty: params.quantities,
      configoptions: params.configOptions,
      notes: params.notes
    });
  }

  async acceptOrder(orderId: number): Promise<WhmcsOrderResult> {
    return this.whmcs.call('AcceptOrder', { orderid: orderId });
  }
}

Subscriptions:

@Injectable()
export class WhmcsSubscriptionService {
  async getServices(clientId: number): Promise<Subscription[]> {
    const response = await this.whmcs.call('GetClientsProducts', { clientid: clientId });
    
    const defaultCurrency = this.currencyService.getDefaultCurrency();
    
    return response.products.product.map(whmcsProduct =>
      Providers.Whmcs.transformWhmcsSubscription(whmcsProduct, {
        defaultCurrencyCode: defaultCurrency.code,
        defaultCurrencySymbol: defaultCurrency.prefix || defaultCurrency.suffix
      })
    );
  }
}

WHMCS Webhooks (Inbound)

Purpose: Receive real-time updates from WHMCS (invoice paid, service activated, etc.)

@Controller('webhooks/whmcs')
export class WhmcsWebhookController {
  @Post()
  async handleWebhook(
    @Body() payload: any,
    @Headers('x-whmcs-signature') signature: string
  ) {
    // 1. Validate HMAC signature
    if (!this.validateSignature(payload, signature)) {
      throw new UnauthorizedException('Invalid webhook signature');
    }

    // 2. Process event
    const eventType = payload.event;
    
    switch (eventType) {
      case 'InvoicePaid':
        await this.handleInvoicePaid(payload);
        break;
      case 'ServiceActivated':
        await this.handleServiceActivated(payload);
        break;
      // ... other events
    }

    return { received: true };
  }

  private validateSignature(payload: any, signature: string): boolean {
    const secret = process.env.WHMCS_WEBHOOK_SECRET;
    const computedSignature = createHmac('sha256', secret)
      .update(JSON.stringify(payload))
      .digest('hex');
    
    return timingSafeEqual(
      Buffer.from(signature),
      Buffer.from(computedSignature)
    );
  }
}

Freebit SIM Management

Connection Architecture

@Injectable()
export class FreebitConnection {
  private readonly apiUrl = process.env.FREEBIT_API_URL;
  private readonly apiKey = process.env.FREEBIT_API_KEY;

  async request<T>(endpoint: string, params?: Record<string, any>): Promise<T> {
    const response = await fetch(`${this.apiUrl}${endpoint}`, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify(params)
    });

    return response.json();
  }
}

Freebit API Operations

SIM Details:

@Injectable()
export class FreebitSimService {
  async getSimDetails(iccid: string): Promise<SimDetails> {
    const response = await this.freebit.request('/api/sim/details', { iccid });
    
    // Transform with domain mapper
    return FreebitProvider.transformFreebitAccountDetails(response);
  }

  async getSimUsage(iccid: string, startDate: Date, endDate: Date): Promise<SimUsage> {
    const response = await this.freebit.request('/api/sim/usage', {
      iccid,
      start_date: startDate.toISOString(),
      end_date: endDate.toISOString()
    });
    
    return FreebitProvider.transformFreebitUsageData(response);
  }
}

Data Transformation

Transformation Flow

┌──────────────┐
│ External API │
│ Raw Response │
└──────┬───────┘
       │
       │ 1. Validate raw data
       │
┌──────▼────────────────────┐
│ Domain Mapper             │
│ • Parse raw schema        │
│ • Transform to domain     │
│ • Validate domain schema  │
└──────┬────────────────────┘
       │
       │ 2. Return domain type
       │
┌──────▼───────┐
│ Application  │
│ Uses domain  │
│ types only   │
└──────────────┘

Domain Mapper Pattern

// packages/domain/billing/providers/whmcs/mapper.ts

export function transformWhmcsInvoice(
  raw: unknown,
  context: { defaultCurrencyCode: string; defaultCurrencySymbol: string }
): Invoice {
  // 1. Validate raw data
  const whmcs = whmcsInvoiceRawSchema.parse(raw);
  
  // 2. Transform to domain model
  const result: Invoice = {
    id: parseInt(whmcs.invoiceid),
    userId: parseInt(whmcs.userid),
    status: mapWhmcsInvoiceStatus(whmcs.status),
    amount: {
      value: parseFloat(whmcs.total),
      currency: whmcs.currencycode || context.defaultCurrencyCode
    },
    dueDate: new Date(whmcs.duedate),
    invoiceNumber: whmcs.invoicenum,
    createdAt: new Date(whmcs.date),
    items: whmcs.items?.item?.map(transformWhmcsInvoiceItem) || []
  };
  
  // 3. Validate domain model
  return invoiceSchema.parse(result);
}

Context Injection Pattern

Some transformations need infrastructure context (like currency settings):

// ✅ Correct: Inject context explicitly
const defaultCurrency = this.currencyService.getDefaultCurrency();
const invoice = Providers.Whmcs.transformWhmcsInvoice(rawInvoice, {
  defaultCurrencyCode: defaultCurrency.code,
  defaultCurrencySymbol: defaultCurrency.prefix || defaultCurrency.suffix
});

Why this is clean:

  • Domain mapper is pure (deterministic for same inputs)
  • Infrastructure concern (currency) is injected from BFF
  • No service wrapper needed

Error Handling

Error Handling Strategy

  1. API Errors: Log detailed error, return user-friendly message
  2. Validation Errors: Return specific field errors
  3. Network Errors: Retry with exponential backoff
  4. Business Errors: Return domain-specific error codes
@Injectable()
export class SalesforceOrderService {
  async getOrderById(orderId: string): Promise<OrderDetails | null> {
    try {
      const result = await this.sf.query(soql);
      return OrderProviders.Salesforce.transformSalesforceOrderDetails(result.records[0], []);
    } catch (error) {
      this.logger.error('Failed to fetch order from Salesforce', { 
        orderId, 
        error: error.message 
      });
      
      // Don't expose internal details [[memory:6689308]]
      throw new NotFoundException('Order not found');
    }
  }
}

Retry Logic

async callWithRetry<T>(
  fn: () => Promise<T>,
  maxRetries = 3,
  delay = 1000
): Promise<T> {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await fn();
    } catch (error) {
      if (attempt === maxRetries) throw error;
      
      this.logger.warn(`Retry attempt ${attempt}/${maxRetries}`, { error: error.message });
      await new Promise(resolve => setTimeout(resolve, delay * attempt));
    }
  }
}

Caching Strategy

Cache Configuration

// Invoice cache
CACHE_TTL_INVOICES = 60-120s
CACHE_KEY_PATTERN = 'user:{userId}:invoices:page:{page}'

// Subscription cache
CACHE_TTL_SUBSCRIPTIONS = 120s
CACHE_KEY_PATTERN = 'user:{userId}:subscriptions'

// Catalog cache
CACHE_TTL_CATALOG = 300-900s (5-15 min)
CACHE_KEY_PATTERN = 'catalog:{type}'

Cache Implementation

@Injectable()
export class CacheService {
  constructor(private readonly redis: Redis) {}

  async get<T>(key: string): Promise<T | null> {
    const cached = await this.redis.get(key);
    return cached ? JSON.parse(cached) : null;
  }

  async set(key: string, value: any, ttl: number): Promise<void> {
    await this.redis.setex(key, ttl, JSON.stringify(value));
  }

  async invalidate(pattern: string): Promise<void> {
    const keys = await this.redis.keys(pattern);
    if (keys.length > 0) {
      await this.redis.del(...keys);
    }
  }
}

Cache Invalidation

// Invalidate invoice cache when webhook received
async handleInvoicePaid(payload: any) {
  const userId = payload.userid;
  
  // Process event
  await this.processInvoicePayment(payload);
  
  // Invalidate cache
  await this.cacheService.invalidate(`user:${userId}:invoices:*`);
}

Best Practices

Integration Service Best Practices

DO:

  1. Keep integration services thin - just fetch and transform
  2. Use domain mappers directly - no wrappers
  3. Build queries in utils - separate query construction
  4. Handle errors gracefully - log and throw/return null
  5. Cache when appropriate - reduce API calls
  6. Inject context explicitly - currency, config, etc.
  7. Return domain types - never return raw API responses

DON'T:

  1. Create wrapper services - use domain mappers directly
  2. Add business logic - belongs in domain or orchestrators
  3. Transform twice - map once in domain
  4. Expose raw types - always return domain types
  5. Hard-code queries - use query builders
  6. Skip error handling - always log failures

Last Updated: October 2025
Status: Active - Production System