engineering

Building Streaming-First Chat Architecture

Deep dive into the technical decisions behind Convobase's streaming-first approach to AI chat infrastructure.

C
Convobase Team
Jan 20, 20246 min read
#streaming#architecture#websockets#sse

Building Streaming-First Chat Architecture

When we started building Convobase, we made a fundamental decision: streaming would be a first-class citizen, not an afterthought. Here's why and how we built our architecture around this principle.

The Streaming Imperative

Modern AI models generate tokens sequentially. Users expect to see responses as they're generated, not wait for complete messages. This creates a UX expectation that traditional request-response APIs can't satisfy.

Traditional API Pattern

// Request-response: User waits for complete message
POST /api/chat
{
  "message": "Explain quantum computing",
  "conversation_id": "123"
}

// Response: 2-3 seconds later
{
  "reply": "Quantum computing is a revolutionary approach to computation...",
  "conversation_id": "123"
}

Problems:

  • Long wait times (2-10 seconds)
  • No progress indication
  • Poor user experience
  • Timeout issues with long responses

Streaming-First Pattern

// Server-Sent Events: Immediate token streaming
POST /api/chat/stream
{
  "message": "Explain quantum computing",
  "conversation_id": "123"
}

// Response: Immediate stream
data: {"type": "token", "content": "Quantum"}
data: {"type": "token", "content": " computing"}
data: {"type": "token", "content": " is"}
data: {"type": "token", "content": " a"}
data: {"type": "function_call", "name": "search_papers", "args": {...}}
data: {"type": "token", "content": " revolutionary"}
data: {"type": "done"}

Benefits:

  • Immediate visual feedback
  • Progress indication
  • Better perceived performance
  • Support for tool calls mid-stream

Transport Layer Decisions

We evaluated three approaches for streaming:

1. WebSockets

// WebSocket implementation
const socket = new WebSocket('wss://api.convobase.com/chat')

socket.onmessage = (event) => {
  const data = JSON.parse(event.data)
  handleStreamChunk(data)
}

socket.send(JSON.stringify({
  type: 'chat_message',
  message: 'Hello world'
}))

Pros:

  • Bidirectional communication
  • Low latency
  • Connection reuse

Cons:

  • Connection management complexity
  • Proxy/firewall issues
  • State synchronization challenges

2. Server-Sent Events (Our Choice)

// SSE implementation
const eventSource = new EventSource('/api/chat/stream', {
  method: 'POST',
  body: JSON.stringify({ message: 'Hello world' })
})

eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data)
  handleStreamChunk(data)
}

Pros:

  • Simple HTTP-based protocol
  • Automatic reconnection
  • Built-in browser support
  • Easy to cache and proxy

Cons:

  • Unidirectional (acceptable for our use case)
  • Limited browser connection pool

3. HTTP Streaming with Fetch

// Fetch streaming implementation
const response = await fetch('/api/chat/stream', {
  method: 'POST',
  body: JSON.stringify({ message: 'Hello world' })
})

const reader = response.body?.getReader()
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  
  const chunk = new TextDecoder().decode(value)
  handleStreamChunk(chunk)
}

Why we chose SSE: Perfect balance of simplicity and functionality for our AI chat use case.

Backend Architecture

Our streaming backend is built on these principles:

1. Async-First Design

// Streaming response handler
export async function* streamChatResponse(
  message: string,
  context: ConversationContext
): AsyncGenerator<ChatChunk> {
  
  // Initialize model stream
  const modelStream = await openai.chat.completions.create({
    model: 'gpt-4',
    messages: context.messages,
    stream: true,
    tools: context.availableTools
  })
  
  // Stream tokens and function calls
  for await (const chunk of modelStream) {
    const delta = chunk.choices[0]?.delta
    
    if (delta.content) {
      yield {
        type: 'token',
        content: delta.content,
        metadata: { timestamp: Date.now() }
      }
    }
    
    if (delta.tool_calls) {
      yield {
        type: 'function_call',
        function: delta.tool_calls[0].function,
        metadata: { timestamp: Date.now() }
      }
    }
  }
  
  yield { type: 'done' }
}

2. Context-Aware Streaming

// Intelligent context management during streaming
class StreamingContextManager {
  async optimizeContext(
    conversation: Message[], 
    newMessage: string
  ): Promise<OptimizedContext> {
    
    // Calculate token usage
    const currentTokens = this.countTokens(conversation)
    const maxTokens = 8000 // Model context limit
    
    if (currentTokens + this.countTokens(newMessage) > maxTokens) {
      // Intelligently compress older messages
      conversation = await this.semanticCompression(conversation)
    }
    
    return {
      messages: conversation,
      availableTools: this.getRelevantTools(newMessage),
      systemPrompt: this.getContextualPrompt(conversation)
    }
  }
}

3. Error Handling in Streams

// Graceful error handling in streams
export async function handleStreamingErrors(
  streamGenerator: AsyncGenerator<ChatChunk>
): AsyncGenerator<ChatChunk> {
  
  try {
    for await (const chunk of streamGenerator) {
      yield chunk
    }
  } catch (error) {
    // Send error to client via stream
    yield {
      type: 'error',
      error: {
        code: 'STREAM_ERROR',
        message: 'Stream interrupted',
        retryable: true
      }
    }
    
    // Log for monitoring
    logger.error('Stream error', { error, context })
  }
}

Frontend Implementation

Our React hooks make streaming simple:

// useStreamingChat hook
export function useStreamingChat() {
  const [messages, setMessages] = useState<Message[]>([])
  const [isStreaming, setIsStreaming] = useState(false)
  const [currentStream, setCurrentStream] = useState('')
  
  const sendMessage = useCallback(async (content: string) => {
    setIsStreaming(true)
    setCurrentStream('')
    
    const eventSource = new EventSource('/api/chat/stream', {
      method: 'POST',
      body: JSON.stringify({ message: content })
    })
    
    eventSource.onmessage = (event) => {
      const chunk = JSON.parse(event.data)
      
      switch (chunk.type) {
        case 'token':
          setCurrentStream(prev => prev + chunk.content)
          break
          
        case 'function_call':
          // Handle tool calls
          handleFunctionCall(chunk.function)
          break
          
        case 'done':
          setMessages(prev => [...prev, { 
            role: 'assistant', 
            content: currentStream 
          }])
          setCurrentStream('')
          setIsStreaming(false)
          eventSource.close()
          break
      }
    }
  }, [currentStream])
  
  return { messages, isStreaming, currentStream, sendMessage }
}

Performance Optimizations

1. Token Batching

// Batch tokens for better performance
class TokenBatcher {
  private buffer: string = ''
  private lastFlush: number = Date.now()
  private readonly flushInterval: number = 50 // ms
  
  addToken(token: string, forceFlush = false) {
    this.buffer += token
    
    const shouldFlush = forceFlush || 
      this.buffer.length > 10 || 
      Date.now() - this.lastFlush > this.flushInterval
    
    if (shouldFlush) {
      this.flush()
    }
  }
  
  private flush() {
    if (this.buffer) {
      this.emit('batch', this.buffer)
      this.buffer = ''
      this.lastFlush = Date.now()
    }
  }
}

2. Connection Pooling

// Connection pool for backend streams
class StreamConnectionPool {
  private connections = new Map<string, StreamConnection>()
  
  async getConnection(conversationId: string): Promise<StreamConnection> {
    if (this.connections.has(conversationId)) {
      return this.connections.get(conversationId)!
    }
    
    const connection = new StreamConnection(conversationId)
    this.connections.set(conversationId, connection)
    
    // Clean up after inactivity
    setTimeout(() => {
      this.connections.delete(conversationId)
      connection.close()
    }, 5 * 60 * 1000) // 5 minutes
    
    return connection
  }
}

Monitoring and Observability

Streaming introduces unique monitoring challenges:

// Stream metrics collection
class StreamMetrics {
  trackStreamStart(conversationId: string) {
    metrics.increment('stream.started', {
      conversation_id: conversationId
    })
  }
  
  trackTokenLatency(latency: number) {
    metrics.histogram('stream.token_latency', latency)
  }
  
  trackStreamCompletion(
    conversationId: string, 
    totalTokens: number, 
    duration: number
  ) {
    metrics.increment('stream.completed')
    metrics.histogram('stream.duration', duration)
    metrics.histogram('stream.tokens', totalTokens)
    metrics.gauge('stream.tokens_per_second', totalTokens / duration)
  }
}

Lessons Learned

After 6 months of production streaming:

1. Start Simple

Begin with basic SSE, add complexity only when needed.

2. Handle Network Issues

Implement robust reconnection and resume logic.

3. Monitor Everything

Stream health is different from traditional API health.

4. Batch Thoughtfully

Balance perceived performance with actual performance.

5. Plan for Scale

Connection limits and memory usage matter at scale.


Streaming-first architecture transforms the AI chat experience. The initial complexity pays dividends in user satisfaction and opens up new interaction patterns impossible with traditional APIs.

Next week: "Context Management at Scale" - how we handle intelligent memory across millions of conversations.

C

Convobase Team

Building AI-native chat infrastructure for the next generation of applications. We're passionate about creating developer-friendly tools that make streaming conversations and intelligent context management effortless.