Created
April 5, 2024 08:18
-
-
Save ValentinMouret/03ea4f07bf2559543b883be29cdd3bc4 to your computer and use it in GitHub Desktop.
I/O typescript lib
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Module for dealing with I/O. | |
| * | |
| * This avoid us to re-invent the wheel all the time. | |
| * We write functions once, test them once, compose and reuse them | |
| * throughout the project. | |
| * | |
| * We provide helper functions for dealing with streams | |
| * as they are super helpful but might come with non-straightforward | |
| * boilerplate. | |
| */ | |
| import { Storage } from '@google-cloud/storage' | |
| import { pipeline, Readable, Writable } from 'stream' | |
| import { promisify } from 'util' | |
| import { createGzip, createUnzip } from 'zlib' | |
| import jsonlines from 'jsonlines' | |
| import { create as xmlCreate } from 'xmlbuilder2' | |
| /** | |
| * Converts a string to a readable stream. | |
| * | |
| * @param message String to convert to a readable stream. | |
| * @returns {Readable} A readable stream that conveys the message. | |
| * | |
| * @example | |
| * ``` | |
| * const message = "Hello!" | |
| * const asReadable = stringToReadable(message) | |
| * const out = new Bucket().file("some/destination").createWriteStream() | |
| * copyStream(asReadable, out) | |
| * ``` | |
| */ | |
| export function stringToReadable(message: string): Readable { | |
| const out = new Readable() | |
| out.push(message) | |
| out.push(null) | |
| return out | |
| } | |
| export function bufferToReadable(buffer: Buffer): Readable { | |
| const out = new Readable() | |
| out.push(buffer) | |
| out.push(null) | |
| return out | |
| } | |
| /** | |
| * Converts an object to a stream of its JSON representation. | |
| * Inverse transform of `streamToObject`. | |
| * | |
| * @param object | |
| */ | |
| export function objectToStream(object: any): Readable { | |
| return stringToReadable(JSON.stringify(object)) | |
| } | |
| const pipe = promisify(pipeline) | |
| /** | |
| * Copies a readable stream to a writable stream, compressing it by default. | |
| * | |
| * @param input Stream to read from. | |
| * @param output Stream to write to. | |
| * @param compress Whether to compress the content with GZIP. True by default. | |
| */ | |
| export async function copyStream(input: Readable, output: Writable, compress: boolean = true) { | |
| if (compress) { | |
| const gzip = createGzip() | |
| return await pipe(input, gzip, output) | |
| } else { | |
| return await pipe(input, output) | |
| } | |
| } | |
| /** | |
| * Converts a stream to an object directly. | |
| * Assumes the content is a string representation of a JSON object. | |
| * Inverse transform of `objectToStream`. | |
| * | |
| * @param stream Stream that will spit out the string representation of | |
| * a JSON. | |
| * | |
| * @example Simple way back from a readable stream: | |
| * ``` | |
| * type MyMessage = { | |
| * name: string, | |
| * } | |
| * const message: MyMessage = { name: "Elisabeth" } | |
| * const asStream = stringToReadable(JSON.stringify(message)) | |
| * assert(message === streamToObject(asStream)) | |
| * ``` | |
| * | |
| * @example Reading a JSON from GCS: | |
| * ``` | |
| * type MyMessage = { | |
| * name: string, | |
| * } | |
| * const input = new Bucket().file("some/file.json.gz").createReadStream() | |
| * const message = streamToObjectpipe<MyMessage>(input) | |
| * assert(message === {name: "Elisabeth"}) | |
| * ``` | |
| */ | |
| export async function streamToObject<T>( | |
| stream: Readable, | |
| opts?: StreamToObjectOptions, | |
| ): Promise<T> { | |
| const input = opts?.decompress ? stream.pipe(createUnzip()) : stream | |
| return JSON.parse(await streamToString(input)) as T | |
| } | |
| interface StreamToObjectOptions { | |
| decompress?: boolean | |
| } | |
| /** | |
| * Reads a stream and returns the string that’s read from it. | |
| * | |
| * @param stream Steam to read from. | |
| * @returns | |
| */ | |
| export async function streamToString(stream: Readable): Promise<string> { | |
| const chunks = [] | |
| for await (const chunk of stream) { | |
| chunks.push(chunk) | |
| } | |
| return Buffer.concat(chunks).toString() | |
| } | |
| /** | |
| * Writes the message as the content of a file on GCS. | |
| * | |
| * @param message Content of the file to write as a string. | |
| * @param bucket Bucket where to write. | |
| * @param destination Location of the file on the bucket. | |
| * @param compressed Whether the content should be compressed. | |
| * | |
| * @example | |
| * ``` | |
| * const message = JSON.stringify({hello: world}) | |
| * await io.stringToBucket(message, 'messages', 'hello/world.json.gz') | |
| * ``` | |
| */ | |
| export async function stringToBucket( | |
| message: string, | |
| bucket: string, | |
| destination: string, | |
| compressed: boolean = true, | |
| ) { | |
| const input = stringToReadable(message) | |
| await streamToBucket(input, bucket, destination, compressed) | |
| } | |
| export async function bufferToBucket( | |
| message: Buffer, | |
| bucket: string, | |
| destination: string, | |
| compressed: boolean = true, | |
| ) { | |
| const input = bufferToReadable(message) | |
| await streamToBucket(input, bucket, destination, compressed) | |
| } | |
| /** | |
| * Writes the object as the content of a file on GCS. | |
| * | |
| * @param object Object to write as the content of the file. | |
| * @param bucket Bucket where to write. | |
| * @param destination Location of the file on the bucket. | |
| * @param compressed Whether the content should be compressed. | |
| * | |
| * @example | |
| * ``` | |
| * const message = { hello: world } | |
| * await io.objectToBucket(message, 'messages', 'hello/world.json.gz') | |
| * ``` | |
| */ | |
| export async function objectToBucket( | |
| object: any, | |
| bucket: string, | |
| destination: string, | |
| compressed: boolean = true, | |
| ) { | |
| const input = objectToStream(object) | |
| await streamToBucket(input, bucket, destination, compressed) | |
| } | |
| /** | |
| * Writes what’s read from the stream to a file on GCS. | |
| * | |
| * @param input Readable stream with the content of the file to write. | |
| * @param bucket Bucket where to write. | |
| * @param destination Location of the file on the bucket. | |
| * @param compressed Whether the content should be compressed. | |
| * | |
| * @example | |
| * ``` | |
| * const input = storage.file('some/file.json').createReadStream() // or axios.get(...) | |
| * await io.streamToBucket(input, 'messages', 'hello/world.json.gz') | |
| * ``` | |
| */ | |
| export async function streamToBucket( | |
| input: Readable, | |
| bucket: string, | |
| destination: string, | |
| compressed: boolean = true, | |
| ) { | |
| const output = new Storage().bucket(bucket).file(destination).createWriteStream() | |
| await copyStream(input, output, compressed) | |
| } | |
| export async function mergeJSONStreams( | |
| output: Writable, | |
| inputs: Readable[], | |
| compressed: boolean = false, | |
| ): Promise<void> { | |
| const asString = (await Promise.all(inputs.map(async (s) => await streamToString(s)))).join('\n') | |
| await copyStream(stringToReadable(asString), output, compressed) | |
| } | |
| export async function parseJsonl<T>(message: string): Promise<T[]> { | |
| const parser = jsonlines.parse() | |
| const objects: any[] = [] | |
| parser.write(message) | |
| parser.end() | |
| parser.on('data', (data) => { | |
| objects.push(data) | |
| }) | |
| return await new Promise((resolve) => { | |
| parser.on('end', () => { | |
| resolve(objects) | |
| }) | |
| }) | |
| } | |
| export function toJsonl(items: any[]) { | |
| return items.map((item) => JSON.stringify(item)).join('\n') + '\n' | |
| } | |
| export function xmlToObject(data: string): Object { | |
| return xmlCreate(data).end({ format: 'object' }) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment