Skip to content

Instantly share code, notes, and snippets.

@ValentinMouret
Created April 5, 2024 08:18
Show Gist options
  • Select an option

  • Save ValentinMouret/03ea4f07bf2559543b883be29cdd3bc4 to your computer and use it in GitHub Desktop.

Select an option

Save ValentinMouret/03ea4f07bf2559543b883be29cdd3bc4 to your computer and use it in GitHub Desktop.
I/O typescript lib
/**
* Module for dealing with I/O.
*
* This avoid us to re-invent the wheel all the time.
* We write functions once, test them once, compose and reuse them
* throughout the project.
*
* We provide helper functions for dealing with streams
* as they are super helpful but might come with non-straightforward
* boilerplate.
*/
import { Storage } from '@google-cloud/storage'
import { pipeline, Readable, Writable } from 'stream'
import { promisify } from 'util'
import { createGzip, createUnzip } from 'zlib'
import jsonlines from 'jsonlines'
import { create as xmlCreate } from 'xmlbuilder2'
/**
* Converts a string to a readable stream.
*
* @param message String to convert to a readable stream.
* @returns {Readable} A readable stream that conveys the message.
*
* @example
* ```
* const message = "Hello!"
* const asReadable = stringToReadable(message)
* const out = new Bucket().file("some/destination").createWriteStream()
* copyStream(asReadable, out)
* ```
*/
export function stringToReadable(message: string): Readable {
const out = new Readable()
out.push(message)
out.push(null)
return out
}
export function bufferToReadable(buffer: Buffer): Readable {
const out = new Readable()
out.push(buffer)
out.push(null)
return out
}
/**
* Converts an object to a stream of its JSON representation.
* Inverse transform of `streamToObject`.
*
* @param object
*/
export function objectToStream(object: any): Readable {
return stringToReadable(JSON.stringify(object))
}
const pipe = promisify(pipeline)
/**
* Copies a readable stream to a writable stream, compressing it by default.
*
* @param input Stream to read from.
* @param output Stream to write to.
* @param compress Whether to compress the content with GZIP. True by default.
*/
export async function copyStream(input: Readable, output: Writable, compress: boolean = true) {
if (compress) {
const gzip = createGzip()
return await pipe(input, gzip, output)
} else {
return await pipe(input, output)
}
}
/**
* Converts a stream to an object directly.
* Assumes the content is a string representation of a JSON object.
* Inverse transform of `objectToStream`.
*
* @param stream Stream that will spit out the string representation of
* a JSON.
*
* @example Simple way back from a readable stream:
* ```
* type MyMessage = {
* name: string,
* }
* const message: MyMessage = { name: "Elisabeth" }
* const asStream = stringToReadable(JSON.stringify(message))
* assert(message === streamToObject(asStream))
* ```
*
* @example Reading a JSON from GCS:
* ```
* type MyMessage = {
* name: string,
* }
* const input = new Bucket().file("some/file.json.gz").createReadStream()
* const message = streamToObjectpipe<MyMessage>(input)
* assert(message === {name: "Elisabeth"})
* ```
*/
export async function streamToObject<T>(
stream: Readable,
opts?: StreamToObjectOptions,
): Promise<T> {
const input = opts?.decompress ? stream.pipe(createUnzip()) : stream
return JSON.parse(await streamToString(input)) as T
}
interface StreamToObjectOptions {
decompress?: boolean
}
/**
* Reads a stream and returns the string that’s read from it.
*
* @param stream Steam to read from.
* @returns
*/
export async function streamToString(stream: Readable): Promise<string> {
const chunks = []
for await (const chunk of stream) {
chunks.push(chunk)
}
return Buffer.concat(chunks).toString()
}
/**
* Writes the message as the content of a file on GCS.
*
* @param message Content of the file to write as a string.
* @param bucket Bucket where to write.
* @param destination Location of the file on the bucket.
* @param compressed Whether the content should be compressed.
*
* @example
* ```
* const message = JSON.stringify({hello: world})
* await io.stringToBucket(message, 'messages', 'hello/world.json.gz')
* ```
*/
export async function stringToBucket(
message: string,
bucket: string,
destination: string,
compressed: boolean = true,
) {
const input = stringToReadable(message)
await streamToBucket(input, bucket, destination, compressed)
}
export async function bufferToBucket(
message: Buffer,
bucket: string,
destination: string,
compressed: boolean = true,
) {
const input = bufferToReadable(message)
await streamToBucket(input, bucket, destination, compressed)
}
/**
* Writes the object as the content of a file on GCS.
*
* @param object Object to write as the content of the file.
* @param bucket Bucket where to write.
* @param destination Location of the file on the bucket.
* @param compressed Whether the content should be compressed.
*
* @example
* ```
* const message = { hello: world }
* await io.objectToBucket(message, 'messages', 'hello/world.json.gz')
* ```
*/
export async function objectToBucket(
object: any,
bucket: string,
destination: string,
compressed: boolean = true,
) {
const input = objectToStream(object)
await streamToBucket(input, bucket, destination, compressed)
}
/**
* Writes what’s read from the stream to a file on GCS.
*
* @param input Readable stream with the content of the file to write.
* @param bucket Bucket where to write.
* @param destination Location of the file on the bucket.
* @param compressed Whether the content should be compressed.
*
* @example
* ```
* const input = storage.file('some/file.json').createReadStream() // or axios.get(...)
* await io.streamToBucket(input, 'messages', 'hello/world.json.gz')
* ```
*/
export async function streamToBucket(
input: Readable,
bucket: string,
destination: string,
compressed: boolean = true,
) {
const output = new Storage().bucket(bucket).file(destination).createWriteStream()
await copyStream(input, output, compressed)
}
export async function mergeJSONStreams(
output: Writable,
inputs: Readable[],
compressed: boolean = false,
): Promise<void> {
const asString = (await Promise.all(inputs.map(async (s) => await streamToString(s)))).join('\n')
await copyStream(stringToReadable(asString), output, compressed)
}
export async function parseJsonl<T>(message: string): Promise<T[]> {
const parser = jsonlines.parse()
const objects: any[] = []
parser.write(message)
parser.end()
parser.on('data', (data) => {
objects.push(data)
})
return await new Promise((resolve) => {
parser.on('end', () => {
resolve(objects)
})
})
}
export function toJsonl(items: any[]) {
return items.map((item) => JSON.stringify(item)).join('\n') + '\n'
}
export function xmlToObject(data: string): Object {
return xmlCreate(data).end({ format: 'object' })
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment