Skip to content

Strange buffering when using toReadableStream() #1315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
1 task done
acavestro-at-halueio opened this issue Feb 10, 2025 · 4 comments
Open
1 task done

Strange buffering when using toReadableStream() #1315

acavestro-at-halueio opened this issue Feb 10, 2025 · 4 comments
Labels
bug Something isn't working

Comments

@acavestro-at-halueio
Copy link

Confirm this is a Node library issue and not an underlying OpenAI API issue

  • This is an issue with the Node library

Describe the bug

Hi people, I want to show you an interesting issue I'm having using openai-node on a Remix project.

High level, the server part makes the request to OpenAI api and returned stream is embedded in the response. The client side which made the request using fetch parse the streamed response.

Speaking of code, the server part is this:

const stream = openai.beta.threads.runs
    .stream(threadId, {
      assistant_id: assistantId,
      include: ['step_details.tool_calls[*].file_search.results[*].content'],
    })
    .on('messageDelta', (message) => {
      console.log('messageDelta', JSON.stringify(message));
    })
   .toReadableStream();

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      Connection: 'keep-alive',
      Thread: threadId,
    },
  });

The client part is

const response = await fetch(localizedChatAPIURL, {
    method: 'POST',
    body: thePayload,
  });
const reader = response.body
          .pipeThrough(new TextDecoderStream())
          .pipeThrough(new TextLineDecoderStream())
          .pipeThrough<AssistantStreamEvent>(new JSONDecoderStream())
          .getReader();

The strange issue is that messages appears to be different from server to client.

The output of that console.log on messageDelta, server side is:

messageDelta {"content":[{"index":0,"type":"text","text":{"value":"E","annotations":[]}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"ddy"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":" Mer"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"ck"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"x"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":","}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":" noto"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":" come"}}]}

While the network inspector on the browser side logs these messages:

{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"Eddy Merckx, noto","annotations":[]}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"ddy"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" Mer"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"ck"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"x"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":","}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" noto"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" come"}}]}}}

The issue here is that this inconsistency is causing malformations of the first sentences. The example is in italian, I'm sorry, but the beginning of that sentence should be "Eddy Merkx , noto come". Instead, the client part prints "Eddy Merckx, notoddy Merckx, noto come".

I noticed the first message on the client side is longer than the first message on the server side so I'm suspecting some buffering happening somewhere.

Do you people have already encounter this issue? Do you have feedbacks about how to handle this and avoid the malformations?

Thank you

To Reproduce

Unclear steps to reproduce at the moment. Try creating a thread run as a stream, pipe it as a http response and consume it from a browser client.

Code snippets

OS

macOS

Node version

20.13.1

Library version

4.83.0

@acavestro-at-halueio acavestro-at-halueio added the bug Something isn't working label Feb 10, 2025
@spaceemotion
Copy link

spaceemotion commented Feb 12, 2025

@acavestro-at-halueio
Copy link
Author

Hi @spaceemotion not sure but could be.
For sure there's a transformation somewhere, it's not just a piping. Or some buffer in the middle not cleared properly.

These messages

messageDelta {"content":[{"index":0,"type":"text","text":{"value":"E","annotations":[]}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"ddy"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":" Mer"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"ck"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"x"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":","}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":" noto"}}]}

come like so to the browser:

{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"Eddy Merckx, noto","annotations":[]}}]}}}

but from the second message onwards they're also sent like they were in original

{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"ddy"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" Mer"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"ck"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"x"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":","}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" noto"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" come"}}]}}}

I'd say it's something in the middle from the stream creation openai.beta.threads.runs.stream() to the toReadableStream() call.

@mbuptivo
Copy link

Hi, I'm having a similar problem but in a different context. Basically, this code has the exact same problem:

    const run = this.openai.beta.threads.runs.stream(this.openAiThread.id, {
      assistant_id: this.assistant.id,
    });

    for await (const event of this.assistantStream) {
      console.log(event);
    }

The problem does not happen if the events are handled with ".on":

const run = this.openai.beta.threads.runs.stream(this.openAiThread.id, {
      assistant_id: this.assistant.id,
    }).on('event', (event) => console.log(event))

Or if awaiting for ".create" method instead:

    const run = await this.openai.beta.threads.runs.create(this.openAiThread.id, {
      assistant_id: this.assistant.id,
    });

    for await (const event of this.assistantStream) {
      console.log(event);
    }

@Lucas-Levandoski
Copy link

I was trying to replicate this in the simplest way possible but didn't manage to, here is my backend.

import OpenAI from "openai";
import { createServer } from 'http';

const openai = new OpenAI();

async function runRepro() {
  const assistant = await openai.beta.assistants.create({
    model: 'gpt-4o',
    instructions: 'Your are a math answer generator, you receive and equation and simply return the result with no explanation',
  });

  const thread = await openai.beta.threads.create({
    messages: [
      { role: 'user', content: '"I need to solve the equation `3x + 11 = 14`."' },
    ],
  });

  const stream = openai.beta.threads.runs
    .stream(thread.id, {
      assistant_id: assistant.id,
    })

  return stream.toReadableStream();
}

const server = createServer(async (req, res) => {
  res.writeHead(200, {
    'Access-Control-Allow-Origin': '*',
    'Content-Type': 'text/event-stream',
    'Connection': 'keep-alive'
  })

  const stream = await runRepro();

  for await (const value of stream.values()) {
    console.log(value);

    res.write(value);
  }

  res.end();
})

server.listen(3000, () => console.log('server listening on port: 3000'));

and this is the HTML

<!DOCTYPE html>
<html>
<head>
  <meta charset='utf-8'>
  <title>Stream test</title>
  <meta name='viewport' content='width=device-width, initial-scale=1'>
</head>
<body>
  <div style="display: flex; gap: 10px; justify-content: center; align-items: center;">
    <h3>Status</h3><p id="status" style="background-color: aqua; padding: 10px 20px;">not started</p>
  </div>
  <div style="display: flex; flex-direction: column; gap: 2px; justify-content: center; align-items: center;">
    <h3 style="margin-bottom: 0;">Result</h3><p id="result" style="padding: 10px 20px; border: 1px solid green; border-radius: 5px; margin-top: 0;"></p>
  </div>
  <div style="display: flex; flex-direction: column; gap: 10px; justify-content: center; align-items: center;">
    <h3>Processing Content</h3><div id="content" style="max-height: 60vh; max-width: 80vw; flex-wrap: nowrap; overflow: auto; padding: 10px 20px; border: solid 1px; border-radius: 5px; display: flex; flex-direction: column; text-align: center;"></div>
  </div>
</body>
<script>
  const url = 'http://localhost:3000';

  async function main() {
    const content = document.querySelector('#content');
    const status = document.querySelector('#status');
    const resultP = document.querySelector('#result');

    const response = await fetch(url);
    status.textContent = 'Started';

    const reader = response.body
            .pipeThrough(new TextDecoderStream())

    for await(const value of reader.values()) {
      const json = JSON.parse(value);

      const ruler = document.createElement("hr");
      const event = document.createElement("h3");
      event.textContent = json.event;
      const contentP = document.createElement("p")
      contentP.textContent = value;

      if(json.event === 'thread.message.delta') {
        resultP.textContent += json.data.delta.content[0].text.value;
      }

      content.append (ruler, event, contentP, ruler)

      await new Promise((resolve) => setTimeout(resolve, 500));
    }

    status.textContent = 'Finished'
  }

  main();
</script>
</html>

if you copy and paste those you will see it working just fine, did I miss something?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants