Skip to content

Commit 553c419

Browse files
authored
Use async reader for parsing Apache Arrow responses (#2788) (#2790)
1 parent 95cd9b1 commit 553c419

File tree

3 files changed

+39
-24
lines changed

3 files changed

+39
-24
lines changed

docs/reference/client-helpers.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -475,15 +475,15 @@ Added in `v8.16.0`
475475

476476
ES|QL can return results in multiple binary formats, including [Apache Arrow](https://arrow.apache.org/)'s streaming format. Because it is a very efficient format to read, it can be valuable for performing high-performance in-memory analytics. And, because the response is streamed as batches of records, it can be used to produce aggregations and other calculations on larger-than-memory data sets.
477477

478-
`toArrowReader` returns a [`RecordBatchStreamReader`](https://arrow.apache.org/docs/js/classes/Arrow_dom.RecordBatchReader.md).
478+
`toArrowReader` returns an [`AsyncRecordBatchStreamReader`](https://github.com/apache/arrow/blob/520ae44272d491bbb52eb3c9b84864ed7088f11a/js/src/ipc/reader.ts#L216).
479479

480480
```ts
481481
const reader = await client.helpers
482482
.esql({ query: 'FROM sample_data' })
483483
.toArrowReader()
484484

485485
// print each record as JSON
486-
for (const recordBatch of reader) {
486+
for await (const recordBatch of reader) {
487487
for (const record of recordBatch) {
488488
console.log(record.toJSON())
489489
}

src/helpers.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import assert from 'node:assert'
1111
import * as timersPromises from 'node:timers/promises'
1212
import { Readable } from 'node:stream'
1313
import { errors, TransportResult, TransportRequestOptions, TransportRequestOptionsWithMeta } from '@elastic/transport'
14-
import { Table, TypeMap, tableFromIPC, RecordBatchStreamReader } from 'apache-arrow/Arrow.node'
14+
import { Table, TypeMap, tableFromIPC, AsyncRecordBatchStreamReader } from 'apache-arrow/Arrow.node'
1515
import Client from './client'
1616
import * as T from './api/types'
1717
import { Id } from './api/types'
@@ -135,7 +135,7 @@ export interface EsqlColumn {
135135
export interface EsqlHelper {
136136
toRecords: <TDocument>() => Promise<EsqlToRecords<TDocument>>
137137
toArrowTable: () => Promise<Table<TypeMap>>
138-
toArrowReader: () => Promise<RecordBatchStreamReader>
138+
toArrowReader: () => Promise<AsyncRecordBatchStreamReader>
139139
}
140140

141141
export interface EsqlToRecords<TDocument> {
@@ -1000,7 +1000,7 @@ export default class Helpers {
10001000
return tableFromIPC(response)
10011001
},
10021002

1003-
async toArrowReader (): Promise<RecordBatchStreamReader> {
1003+
async toArrowReader (): Promise<AsyncRecordBatchStreamReader> {
10041004
if (metaHeader !== null) {
10051005
reqOptions.headers = reqOptions.headers ?? {}
10061006
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa`
@@ -1009,9 +1009,9 @@ export default class Helpers {
10091009

10101010
params.format = 'arrow'
10111011

1012-
// @ts-expect-error the return type will be ArrayBuffer when the format is set to 'arrow'
1013-
const response: ArrayBuffer = await client.esql.query(params, reqOptions)
1014-
return RecordBatchStreamReader.from(response)
1012+
// @ts-expect-error response is a Readable when asStream is true
1013+
const response: Readable = await client.esql.query(params, reqOptions)
1014+
return await AsyncRecordBatchStreamReader.from(Readable.from(response))
10151015
}
10161016
}
10171017

test/unit/helpers/esql.test.ts

+31-16
Original file line numberDiff line numberDiff line change
@@ -158,17 +158,28 @@ test('ES|QL helper', t => {
158158
t.end()
159159
})
160160

161-
test('toArrowReader', t => {
162-
t.test('Parses a binary response into an Arrow stream reader', async t => {
163-
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
161+
test('toArrowReader', async t => {
162+
const testRecords = [
163+
{ amount: 4.900000095367432, },
164+
{ amount: 8.199999809265137, },
165+
{ amount: 15.5, },
166+
{ amount: 9.899999618530273, },
167+
{ amount: 13.899999618530273, },
168+
]
169+
170+
// build reusable Arrow table
171+
const table = arrow.tableFromJSON(testRecords)
172+
const rawData = await arrow.RecordBatchStreamWriter.writeAll(table).toUint8Array()
164173

174+
t.test('Parses a binary response into an Arrow stream reader', async t => {
165175
const MockConnection = connection.buildMockConnection({
166176
onRequest (_params) {
167177
return {
168-
body: Buffer.from(binaryContent, 'base64'),
178+
body: Buffer.from(rawData),
169179
statusCode: 200,
170180
headers: {
171-
'content-type': 'application/vnd.elasticsearch+arrow+stream'
181+
'content-type': 'application/vnd.elasticsearch+arrow+stream',
182+
'transfer-encoding': 'chunked'
172183
}
173184
}
174185
}
@@ -182,26 +193,28 @@ test('ES|QL helper', t => {
182193
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
183194
t.ok(result.isStream())
184195

185-
const recordBatch = result.next().value
186-
t.same(recordBatch.get(0)?.toJSON(), {
187-
amount: 4.900000095367432,
188-
date: 1729532586965,
189-
})
196+
let count = 0
197+
for await (const recordBatch of result) {
198+
for (const record of recordBatch) {
199+
t.same(record.toJSON(), testRecords[count])
200+
count++
201+
}
202+
}
203+
190204
t.end()
191205
})
192206

193207
t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => {
194-
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
195-
196208
const MockConnection = connection.buildMockConnection({
197209
onRequest (params) {
198210
const header = params.headers?.['x-elastic-client-meta'] ?? ''
199211
t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`)
200212
return {
201-
body: Buffer.from(binaryContent, 'base64'),
213+
body: Buffer.from(rawData),
202214
statusCode: 200,
203215
headers: {
204-
'content-type': 'application/vnd.elasticsearch+arrow+stream'
216+
'content-type': 'application/vnd.elasticsearch+arrow+stream',
217+
'transfer-encoding': 'chunked'
205218
}
206219
}
207220
}
@@ -240,10 +253,12 @@ test('ES|QL helper', t => {
240253
new arrow.RecordBatch(schema, batch3.data),
241254
])
242255

256+
const rawData = await arrow.RecordBatchStreamWriter.writeAll(table).toUint8Array()
257+
243258
const MockConnection = connection.buildMockConnection({
244259
onRequest (_params) {
245260
return {
246-
body: Buffer.from(arrow.tableToIPC(table, "stream")),
261+
body: Buffer.from(rawData),
247262
statusCode: 200,
248263
headers: {
249264
'content-type': 'application/vnd.elasticsearch+arrow+stream'
@@ -261,7 +276,7 @@ test('ES|QL helper', t => {
261276
t.ok(result.isStream())
262277

263278
let counter = 0
264-
for (const batch of result) {
279+
for await (const batch of result) {
265280
for (const row of batch) {
266281
counter++
267282
const { id, val } = row.toJSON()

0 commit comments

Comments
 (0)