Streaming JSON Parsing Issue

TODO: collapasable, copyable, download, etc

In my SvelteKit app, I have an issue. I make a service request with Axiom for some streaming JSON. as I get each response, I stream it to the frontend.

This mostly works, but sometimes I am seeing the frontend get a steam event where the data is two JSON objects in a single string, so JSON.parse fails.

Frontend code including a HACKy workaround: ```const response = await sendMessageV2({ chat_session_id: chatSessionId, input: requestInput, 'reply-to-id': getLastEnvelopeGuid() }); const reader = response.body.pipeThrough(new TextDecoderStream()).getReader(); while (true) { const { value, done } = await reader.read(); if (done) { hideProgress(); break; }

		// TODO: D-177 Fix bug where we get multiple objects in a single string
		let values = value.split('}{');			
		if (values.length > 1) {
			for (let i = 0; i < values.length; i += 2) {
				values[i] += '}';
				values[i + 1] = '{' + values[i + 1];
			}
		}
		values.forEach((value) => {
			try {
				let obj = JSON.parse(value);
				dispatchChatMessage(obj);
			} catch (err) {
				console.error(err);
				console.error('Unable to parse', value, 'as JSON');
			}
		});
		// END TODO: D-177 HACK
	}
};```

Here is the +server.js file ```

import { randomUUID } from 'crypto';

import axios from 'axios'; // Import the default export from the CommonJS module import pkg from 'stream-chain'; const { chain } = pkg;

import { fromInfer, toInfer } from '$lib/canonicalize';

import { INFERENCE_ENDPOINT } from '../../../config'; import { hashEmailToUUIDFormat } from '../../../../server/user_hash';

/** @type {import('./$types').RequestHandler} */ export async function POST({ request, locals }) { const ac = new AbortController();

let messageGuid = randomUUID();
let payload = await request.json();

let session = await locals.getSession();

// TODO: pull out of session
let userId = hashEmailToUUIDFormat(session?.user.email);

let requestPayload = toInfer(payload);
requestPayload.user_id = userId;
requestPayload.message_id = messageGuid;

const encoder = new TextEncoder();

const readable = new ReadableStream({
	async start(controller) {
		console.log('inference/api/v2/infer/send_message sending request', requestPayload);
		axios
			.post(`${INFERENCE_ENDPOINT}/api/v2/infer/send_message`, requestPayload, {
				headers: {
					accept: 'application/json-seq',
					'Content-Type': 'application/json'
				},
				responseType: 'stream'
			})
			.then((response) => {
				const pipeline = chain([
					response.data, // the axios response stream
					(rawChatMessage) => {
						console.log('AOK gotrawChtMessage');
						let chatMessage = rawChatMessage.toString('utf8').replace(/[\x0A\x0D\x1E]/g, '');
						return JSON.parse(chatMessage);
					},
					function* (data) {
						console.log('AOK generator starting up')
						// fromInfer may expand 1 to many
						let dataTypes = fromInfer(data);
						if (dataTypes) {
							for (let datum of dataTypes) {
								console.log('AOK generator yielding')
								yield datum;
							}
						}
						console.log('AOK generator done')
					},
					(data) => {
						
						console.log('inference/api/v2/infer/send_message sent', JSON.stringify(data));
						return encoder.encode(JSON.stringify(data));
					}
					
				]);					

				pipeline.on('data', (data) => {
					
					try {
						controller.enqueue(data);
					} catch (error) {
						console.error('Error while processing infer send_message data', data);
						console.error('Error while processing infer send_message error=', error);
					}
				});

				pipeline.on('end', () => {
					try {
						console.log('inference/api/v2/infer/send_message steam ended');
						controller.close();
					} catch (err) {
						console.error('ON END - Unable to close controller: ', err);
					}
				});

				pipeline.on('error', (err) => {
					console.error('Error processing data: ', err);
					try {
						controller.close();
					} catch (_e) {}
				});
			})
			.catch((error) => {
				console.error('Request failed', error);
				try {
					controller.close();
				} catch (err) {
					console.error('IN CATCH - Unable to close controller: ', err);
				}
			});
	}
});

// Setup streaming fetch response
return new Response(readable, {
	headers: {
		'content-type': 'text/event-stream'
	}
});

}


I probably don't understand how streaming is working at one or more layers, what is wrong with my code?It looks like you're encountering an issue with streaming JSON data where occasionally, two JSON objects are concatenated into a single string, causing `JSON.parse` to fail. This is a common issue in streaming scenarios where the stream boundary doesn't align with the JSON object boundaries. Let's break down the potential issues and solutions:

### Frontend Issue

1. **Splitting JSON Strings**: Your current workaround splits the string on `'}{'` and then re-adds the braces. This approach is fragile and will fail if the JSON object itself contains a string with `'}{'`. 

   **Solution**: A more robust solution involves writing a custom parser that can handle split JSON objects. However, this can be complex and may not be necessary if the backend can be adjusted to ensure proper JSON object boundaries.

### Backend Issue

1. **Streaming Data Handling**: In your `+server.js` file, you're using `chain` from the `stream-chain` package to process the streaming data. The transformation functions in this chain seem to assume that each chunk of data received is a complete JSON object, which might not always be the case.

   **Solution**: You need to ensure that each chunk of data you're processing is a complete JSON object. One way to do this is to buffer chunks until you have a complete JSON object before parsing and processing it. You can detect the end of a JSON object by counting the number of opening and closing braces, considering strings, escape characters, etc., though this can get quite complex.

2. **Content-Type**: The server is setting the `content-type` to `text/event-stream`. This is typically used for Server-Sent Events (SSE), which is a different mechanism and has a specific format. If you're not using SSE, you might want to use a different content type like `application/json`.

3. **Error Handling**: Ensure that error handling is robust, especially in streaming scenarios where partial data and network issues can cause more complex problems.

### Proposed Solution

A potential solution would be to modify the backend to send newline-delimited JSON (NDJSON), where each line is a valid JSON object. This format is more streaming-friendly and makes it easier to handle on the frontend:

1. **Backend**: Modify the backend to send NDJSON. Each JSON object should be separated by a newline character.

2. **Frontend**: Modify the frontend to read line by line. For each line, parse it as JSON.

Here's a simplified example of how you might handle NDJSON on the frontend:

```javascript
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
while (true) {
    const { value, done } = await reader.read();
    if (done) {
        break;
    }

    const lines = value.split('\n');
    lines.forEach(line => {
        if (line) {
            try {
                const obj = JSON.parse(line);
                dispatchChatMessage(obj);
            } catch (err) {
                console.error('Unable to parse line as JSON', line, err);
            }
        }
    });
}

This approach relies on the backend sending properly formatted NDJSON. Adjust the backend accordingly to ensure each JSON object is sent as a separate line.I logged the values, and I know for sure that +server.js code is always calling controller.enqueue(data); with a JSON encoded string with only a single object in it.Given that your +server.js backend is consistently enqueuing single JSON object strings, the issue might be arising in how these chunks are being read or concatenated on the frontend. In streaming scenarios, it's common for the reader to receive partial data, especially if the data chunks are being split across TCP packets or stream boundaries.

Investigating the Frontend

  1. Chunk Concatenation Issue: The TextDecoderStream and reader.read() combination you're using should, in theory, handle the stream correctly. However, depending on the implementation and timing, it might concatenate multiple chunks before they are read, especially if they arrive in quick succession. This can result in the appearance of multiple JSON objects being concatenated into a single string.

  2. Buffering and Parsing: To handle this, you need a more robust way of buffering and parsing the incoming data. Instead of relying on the split hack, you can buffer the data and only parse it when you're sure you have complete JSON objects.

Suggested Frontend Adjustment

Here's an adjusted approach for the frontend:

  1. Buffer Incoming Data: Accumulate incoming data in a buffer.
  2. Parse Complete JSON Objects: Parse the buffer for complete JSON objects. Since you're sure each enqueue call contains a single JSON object, you can look for the start ({) and end (}) of JSON objects.

Here's an example of how you might implement this:

const response = await sendMessageV2({ /* your request parameters */ });
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();

let buffer = '';
while (true) {
    const { value, done } = await reader.read();
    if (done) {
        break;
    }

    buffer += value;
    while (true) {
        const start = buffer.indexOf('{');
        const end = buffer.indexOf('}', start);
        if (start !== -1 && end !== -1) {
            const jsonStr = buffer.substring(start, end + 1);
            buffer = buffer.substring(end + 1);

            try {
                const obj = JSON.parse(jsonStr);
                dispatchChatMessage(obj);
            } catch (err) {
                console.error('Unable to parse JSON', jsonStr, err);
            }
        } else {
            break; // Wait for more data
        }
    }
}

This code accumulates data in buffer, and then tries to find complete JSON objects within that buffer. Once it finds a complete object, it parses it, handles it, and then removes it from the buffer. This way, even if multiple JSON objects are concatenated, as long as they are complete, they will be handled correctly.

This approach assumes that each JSON object is self-contained and doesn't span across multiple read calls. If your JSON objects are large and might be split across multiple reads, further adjustments will be needed to handle that scenario.Which npm libraries accomplish the adjustments that you recommendTo implement the adjustments I recommended for handling JSON streaming in your SvelteKit application, you can consider using a combination of npm libraries that are designed for handling streaming data and parsing JSON. Here's a list of libraries that could be useful:

  1. stream-json / stream-json-parser: These libraries are specifically designed for streaming JSON parsing. They can handle large JSON files and streaming scenarios very efficiently.

    • stream-json provides a way to stream and parse JSON files, breaking them down into smaller, manageable pieces.
    • stream-json-parser is another option that offers similar functionality, providing a streaming JSON parser.
  2. through2: This is a tiny wrapper around Node.js streams.Transform (Streams2/3) to avoid explicit subclassing noise. It can be used to create a transform stream that buffers and processes your JSON data as it's received.

  3. split2: This is a simple utility that can split a stream of text into lines. If you decide to use newline-delimited JSON (NDJSON), this library can be particularly useful.

  4. concat-stream: This module concatenates stream data (like Buffer objects, strings, or arrays) into a single Buffer or string. It can be useful if you need to collect chunks before processing.

  5. highland.js: Highland manages synchronous and asynchronous code easily, handling errors and ending streams properly. It's very useful for managing stream flow control, particularly in complex streaming scenarios.

Example Usage

If you opt to use stream-json for instance, your backend could look something like this:

// Using stream-json to parse JSON stream
import { parser } from 'stream-json';
import { streamArray } from 'stream-json/streamers/StreamArray';
import axios from 'axios';

// ... rest of your imports and setup ...

axios.post(yourEndpoint, yourData, { responseType: 'stream' })
  .then(response => {
    response.data
      .pipe(parser())  // Parse the JSON stream
      .pipe(streamArray())  // Convert it into a stream of objects
      .on('data', ({value}) => {
        // Handle each JSON object here
      })
      .on('end', () => {
        // Handle the end of the stream
      });
  });

On the frontend, if you stick with the standard JavaScript APIs for processing the response stream, the suggested buffering and parsing logic should work well. However, if the JSON objects are large and might span across multiple read calls, consider implementing a more sophisticated buffering and parsing mechanism, possibly inspired by the logic these libraries use.

Remember to thoroughly test the implementation with various edge cases, such as rapid succession of small JSON objects, large JSON objects, and network latency scenarios.

Streaming JSON Parsing Issue created at Tue Jan 20 1970 16:21:58 GMT+0000 (Coordinated Universal Time)

Related Chats

Node.js POST Request Stream 0.583

Use Promise.allSettled refactor code 0.459

New chat 0.425

SvelteKit REST Proxy 0.418

Svelte Component Help 0.392

Stream REST Data in JavaScript 0.390

Update Svelte Store Item 0.384

Fix SvelteKit Playwright Test 0.384

Store Iteration Order Issue 0.381