Skip to main content

HTTP Streams

1020 words·5 mins·

What
#

The ability of sending HTTP responses in streams of data

Why
#

For some time, pushing messages to web browsers was done through Server-Sent events. Although this feature is still available in all current browsers it lacks of the following features:

  • Only ‘GET’ method is supported
  • No headers support (which makes difficult to use authentication headers i.e.)
  • Only cookies are supported (withCredentials)
  • Maximum number of open connections per browser (6 connections, unless HTTP2 is used)

Fetch API and Streams API comes to the rescue. It allows JavaScript to programmatically access streams of data over the network and process them as desired by the developer.

Server-Sent Events protocol can be easily emulated using Fetch & Streams APIs

Another protocol that supports sending real-time data to browsers is Websocket. Websockets allows bi-directional traffic between browser and server and has native support in most browsers. Websockets are usually a good option for most scenarios.

Advantages of Fetch streaming over Websockets:

  • Transport over HTTP instead of custom protocol
  • No trouble with corporate firewalls doing packet inspection
  • Scales horizontally

Advantages of Websockets over Fetch streaming:

  • Bidirectional communication

The question is then: Does your application needs bidirectional communication?

How
#

Use case: The application needs to receive some status information (events) from server about some long tasks (up to 2 or 5 minutes long).

Using fetch API emulating Server-Sent Events protocol:

Test Server in golang
#

HTTP Server publishes the following endpoints:

  • / : Serves static files: the JavaScript application

  • /stream: This is the stream generator. It generate a number of separated events with increasing id. It accepts two parameters:

    • maxCount: maximum number of events
    • idleInterval: It stops sending events from id 30 to id 80. This will allow us to test very long idle connections and connection drops from balancers. The generator event function looks like this:
    func generateEvents(evChannel chan *InternalEvent, max_count int, idleInterval bool, cancel chan bool) {
      ticker := time.NewTicker(500 * time.Millisecond)
      pingTicker := time.NewTicker(10 * time.Second)
      count := 0
    
      go func() {
      	for {
      		select {
      			case <-cancel:
      				ticker.Stop()
      				pingTicker.Stop()
      				close(evChannel)
      				return
      			case <-pingTicker.C:
      				event := &InternalEvent{
      					Type: "ping",
      				}
      				evChannel <- event
      			case <-ticker.C:
      				count += 1
      				event := getNewEvent(count)
      				if idleInterval {
      					if count < 30 || count > 80 {
      						evChannel <- event
      					}
      				} else {
      					evChannel <- event
      				}
      				if count > max_count {
      					ticker.Stop()
      					pingTicker.Stop()
      					close(evChannel)
      					return
      				}
      		}
      	}
      }()
    }
    
  • /cancel: Cancels the generator of events

  • /restart: Restarts the HTTPServer. It accepts one parameter timeout that indicates the timeout to restart the HTTPServer after stopped.

Stream Javascript class
#

The following is a StreamHandler implementation with the following features:

  1. Cancellable stream (close ongoing stream)
  2. Retry if server error. I.e. for status streams it is possible to configure the stream to reconnect if the server is temporarily out-of-service.
class StreamHandler {
	#url
	#options
	#controller
	#retry // Timeout for retries after server aborts. If 0 no retries
	#retryOnError = false // internal

	constructor() {}// Request Init

	// callbacks
	onOpen = () => {}
	onError = (error) => {}
	onEvent = (event) => {}
	onClose = () => {}
	onAbort = (reason) => {}
  
async stream(url, options, retry=0) {
	this.#url = url
	this.#retry = retry
	try {
		this.#controller = new AbortController()
		const abortSignal = this.#controller.signal
		this.#options = {...options, signal: abortSignal}
		
		const response = await fetch(url, this.#options);
		const readableStream = response.body;
		const transformStream = this._createTransformStream(this);
		const writableStream = this._createWritableStream(this);
		
		await readableStream.pipeThrough(transformStream).pipeTo(writableStream, {signal: abortSignal});
	} catch (error) {
		if (error !== "clientClosed"){
			if (this.#retry) {
				this.#retryOnError = true
				setTimeout( async () => {
					await this.stream(this.#url, this.#options, this.#retry)
				}, this.#retry)
			} else {
				this.#retryOnError = false
				this._handleError(error, this)
			}
		} else {
			this.#retryOnError = false
			this.onClose()
		}
	}
}

close() {
	this.#controller.abort("clientClosed")
}

_createTransformStream(st) {
	return new TransformStream({
		transform(chunk, controller) {
			const text = new TextDecoder().decode(chunk);
			text.split("\n\n").forEach( (e) => {
				if (e){
					const json = st._parseEventChunk(e)
					controller.enqueue(json);
				}
			})
		},
	});
}

_createWritableStream(st) {
	return new WritableStream({
		start(controller) {
			st.onOpen()
		},
		write(event) {
			if (event.type === "ping") {
				console.log("internal ping > ", event.data)
			} else {
				st.onEvent(event)
			}
		},
		close(controller) {
			if (st.#retry && st.#retryOnError) {
				setTimeout( async () => {
					await st.stream(st.#url, st.#options, st.#retry)
					}, st.#retry)
			} else {
				st.onClose()
			}
		},
		abort(reason) {
			st.onAbort(reason)
		},
	});
}

_handleError(error, st) {
	console.error("Error during stream processing:", error);
	st.onError(error)
}

_parseEventChunk(chunk) {
	if (!chunk || chunk.length === 0) {
		return null;
	}

	if (this.debug) {
		console.debug(chunk);
	}

	const e = {'id': null, 'retry': null, 'data': null, 'event': null};
	chunk.split(/\n|\r\n|\r/).forEach(function(line) {

		const index = line.indexOf(':');
		let field, value;
		if (index > 0) {
			// only first whitespace should be trimmed
			const skip = (line[index + 1] === ' ') ? 2 : 1;
			field = line.substring(0, index);
			value = line.substring(index + skip);
		} else if (index < 0) {
		// Interpret the entire line as the field name, and use the empty string as the field value
			field = line;
			value = '';
		} else {
		// A colon is the first character. This is a comment; ignore it.
			return;
		}
	
		if (!(field in e)) {
			return;
		}
	
		// consecutive 'data' is concatenated with newlines
	
		if (field === 'data' && e[field] !== null) {
			e['data'] += "\n" + value;
		} else {
			e[field] = value;
		}
	}.bind(this));

	if (e.id !== null) {
		this.lastEventId = e.id;
	}

	const event = new CustomEvent(e.event || 'message');
	event.id = e.id;
	event.data = e.data || '';
	event.lastEventId = this.lastEventId;
	return event;
	};

}

Test client
#

From the client is possible to control the server: cancel on server, restart HTTP Server to test disconnections From the client it is possible to select the number of events the server needs to generate, if it is required idle intervals (to tests connection dropdowns from balancers), and the timeout of retries if required.

Test with nginx
#

Following a description of nginx configuration to perform tests with streams:

location ^~ /test {
	rewrite /test/(.*) /$1 break;
	proxy_pass http://host.docker.internal:3000;
 
	proxy_set_header Host $host;
	proxy_set_header X-Real-IP $remote_addr;
	proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
	proxy_set_header X-Forwarded-Proto $scheme;

	# recommended for streams
	proxy_http_version 1.1;
	proxy_set_header Connection "";

	# sse / streams support
	#proxy_read_timeout default is 60 seconds
	proxy_read_timeout 30;
	chunked_transfer_encoding on;
	proxy_buffering off;
	#proxy_cache off;
}