Skip to content

Instantly share code, notes, and snippets.

@Ran-Xing
Created February 8, 2025 20:09
Show Gist options
  • Select an option

  • Save Ran-Xing/729024b4042facc22364768553a84355 to your computer and use it in GitHub Desktop.

Select an option

Save Ran-Xing/729024b4042facc22364768553a84355 to your computer and use it in GitHub Desktop.
CSV Split & Import Tool

CSV Split & Import Tool

项目简介

该工具用于将大型 CSV 文件切割为更小的部分,以便更轻松地导入到数据库中。支持的 CSV 文件包括评分、标签、链接和电影信息。该工具旨在提高数据处理的效率和便捷性。

特性

  • 自动切割大型 CSV 文件,减少导入时的负担。
  • 为每个 CSV 文件添加适当的表头。
  • 支持 MySQL 数据库的连接和数据导入。
  • 处理后自动删除原始 CSV 文件以节省空间。

需求

  • Node.js
  • MySQL 数据库
  • knexcsv-parser 模块

安装

npm install knex csv-parser mysql2

CSV Split & Import Tool

Project Overview

This tool is designed to split large CSV files into smaller parts for easier import into a database. Supported CSV files include ratings, tags, links, and movie information. The tool aims to enhance data processing efficiency and convenience.

Features

  • Automatically splits large CSV files to reduce import burden.
  • Adds appropriate headers to each CSV file.
  • Supports MySQL database connection and data import.
  • Removes original CSV files after processing to save space.

Requirements

  • Node.js
  • MySQL Database
  • knex and csv-parser modules

Installation

npm install knex csv-parser mysql2
const knexLib = require('knex');
const fs = require('fs');
const csv = require('csv-parser');
let {DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DB_NAME} = process.env;
let dbDir = "../../db/";
if (!fs.existsSync(dbDir)) {
dbDir = "./db/";
}
const genres = [];
let genreMap = {}
const moviesGenres = [];
const insert_num = 500;
const knex = knexLib({
client: 'mysql2', connection: {
host: DB_HOST ?? "127.0.0.1",
port: DB_PORT ?? 3306,
user: DB_USER ?? "admin",
password: DB_PASSWORD ?? "123456",
database: DB_NAME ?? 'data',
}, pool: {
min: 2, max: 150, acquireTimeoutMillis: 60000, createTimeoutMillis: 6000, idleTimeoutMillis: 60000,
}
});
const tableSchemas = {
links: (table) => {
table.integer('movieId').notNullable();
table.string('imdbId', 30).nullable();
table.string('tmdbId', 30).nullable();
table.primary('movieId');
}, movies: (table) => {
table.integer('movieId').notNullable();
table.string('title', 255).nullable();
table.primary('movieId');
}, ratings: (table) => {
table.integer('userId').notNullable();
table.integer('movieId').notNullable();
table.decimal('rating', 2, 1).nullable();
table.integer('timestamp').nullable();
table.primary(['userId', 'movieId']);
}, tags: (table) => {
table.integer('userId').notNullable();
table.integer('movieId').notNullable();
table.string('tag', 255).notNullable();
table.integer('timestamp').nullable();
table.primary(['userId', 'movieId', 'tag']);
}, genres: (table) => {
table.increments('genreId').unsigned().primary();
table.string('genre', 255).nullable().unique();
}, movies_genres: (table) => {
table.integer('movieId').notNullable();
table.integer('genreId').unsigned().notNullable();
table.primary(['movieId', 'genreId']);
table.foreign('movieId').references('movies.movieId').onDelete('CASCADE');
table.foreign('genreId').references('genres.genreId').onDelete('CASCADE');
}
};
function msg_last(num) {
for (let i = 0; i < num; i++) {
process.stdout.write('\033[1A');
process.stdout.write('\033[K');
}
}
function delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async function dbTest() {
await knex.raw('SELECT 1')
.then(() => {
console.log("✅ Database connected successfully");
})
.catch((err) => {
console.error("❌ Database connection failed:", err);
process.exit(0);
});
}
async function createTable() {
for (const table of Object.keys(tableSchemas).reverse()) {
if (await knex.schema.hasTable(table)) await knex.schema.dropTable(table);
}
for (const table of Object.keys(tableSchemas)) {
await knex.schema.createTable(table, tableSchemas[table]);
console.log(`✅ Table "${table}" created successfully`);
}
}
async function importDB() {
const files = fs.readdirSync(dbDir).filter(file => file.includes('.csv'));
for (const file of files) {
let table = file.split('.')[0];
let filePath = `${dbDir}${file}`;
const records = [];
await new Promise((resolve, reject) => {
console.log(`Importing "${table}" from "${filePath}"...`);
fs.createReadStream(filePath)
.pipe(csv())
.on('data', (data) => {
if (table === 'movies') {
records.push({movieId: data.movieId, title: data.title});
const genreList = data.genres.split('|').map(genre => genre.trim());
for (const genre of genreList) {
if (!genres.includes(genre)) genres.push(genre);
}
} else {
records.push(data);
}
})
.on('end', async () => {
for (let i = 0; i < records.length; i += insert_num) {
await knex(table).insert(records.slice(i, i + insert_num)).onConflict().ignore();
msg_last(1)
console.log(`✅ Inserted ${i + insert_num} records into ${table}.`);
}
console.log(`✅ Inserted ${records.length} records into "${table}". [${Object.values(records[records.length - 1])}]`);
resolve();
})
.on('error', (error) => {
reject(error);
});
});
if (!filePath.includes("movies")) {
fs.unlinkSync(filePath);
console.log(`✅ Deleted "${filePath}"`);
}
}
console.log('All databases have been imported successfully.');
}
async function importMoviesGenres() {
for (const genre of genres) {
const [genreId] = await knex('genres').insert({genre});
// const [genreId] = await knex('genres').insert({genre}).onConflict().ignore();
genreMap[genre] = genreId;
}
console.log('Inserted genres:', genres.length);
const files = fs.readdirSync(dbDir).filter(file => file.includes('movies'));
for (const file of files) {
let filePath = `${dbDir}${file}`;
await new Promise((resolve, reject) => {
console.log(`Importing "movies_genres" from "${filePath}"...`);
fs.createReadStream(filePath)
.pipe(csv())
.on('data', (data) => {
const movieId = parseInt(data.movieId);
const genreList = data.genres.split('|').map(genre => genre.trim());
genreList.forEach(genre => {
const genreId = genreMap[genre];
if (genreId) {
moviesGenres.push({movieId, genreId: parseInt(genreId)});
} else {
console.log(`Genre "${genre}" not found in genreMap.`);
}
});
})
.on('end', async () => {
if (moviesGenres.length > 0) {
try {
for (let i = 0; i < moviesGenres.length; i += insert_num) {
await knex('movies_genres').insert(moviesGenres.slice(i, i + insert_num)).onConflict().ignore();
msg_last(1)
console.log(`Inserted ${i + insert_num} records into "movies_genres".`);
}
} catch (error) {
console.error('Error inserting data:', error);
} finally {
await knex.destroy();
}
} else {
console.log('No genre data found to insert.');
await knex.destroy();
}
resolve();
})
.on('error', (error) => {
reject(error);
});
});
}
}
async function modifyAllTables() {
try {
await knex.schema.alterTable('movies', (table) => {
table.bigInteger('movieId').alter();
table.string('title', 255).defaultTo('Unknown Title').alter();
});
console.log('Modified movies table');
await knex.schema.alterTable('genres', (table) => {
table.bigInteger('genreId').alter();
table.string('genre', 255).defaultTo('Unknown Genre').alter();
});
console.log('Modified genres table');
await knex.schema.alterTable('movies_genres', (table) => {
table.bigInteger('movieId').alter();
table.bigInteger('genreId').alter();
});
console.log('Modified movies_genres table');
await knex.schema.alterTable('links', (table) => {
table.bigInteger('movieId').alter();
table.string('imdbId', 50).defaultTo(null).alter();
table.string('tmdbId', 50).defaultTo(null).alter();
});
console.log('Modified links table');
await knex.schema.alterTable('ratings', (table) => {
table.bigInteger('userId').alter();
table.bigInteger('movieId').alter();
table.decimal('rating', 3, 2).defaultTo(null).alter();
table.integer('timestamp').defaultTo(null).alter();
});
console.log('Modified ratings table');
await knex.schema.alterTable('tags', (table) => {
table.bigInteger('userId').alter();
table.bigInteger('movieId').alter();
table.string('tag', 500).notNullable().alter();
table.integer('timestamp').defaultTo(null).alter();
});
console.log('Modified tags table');
} catch (error) {
console.error('Error modifying tables:', error);
} finally {
await knex.destroy();
}
}
(async function () {
await dbTest()
if (fs.existsSync(dbDir + "links.csv")) {
await createTable();
}
await importDB().catch(err => console.error(err));
await importMoviesGenres().catch(err => console.error(err));
await knex.destroy();
})();
echo "Splitting files..."
ratings_header="userId,movieId,rating,timestamp"
tags_header="userId,movieId,tag,timestamp"
links_header="movieId,imdbId,tmdbId"
movies_header="movieId,title,genres"
mkdir -p db
split -l 800000 -d ml-32m/ratings.csv db/ratings.csv_
split -l 800000 -d ml-32m/tags.csv db/tags.csv_
cp -f ml-32m/links.csv db/
cp -f ml-32m/movies.csv db/
threshold=$((10 * 1024 * 1024))
if uname -a | grep -q Darwin; then
stats="stat -f%z"
seds="sed -i ''"
else
stats="stat -c%s"
seds="sed -i"
fi
for file in db/*.csv*; do
filesize=$(eval "$stats \"$file\"")
if ((filesize > threshold)); then
echo "Splitting $file"
split -l 100000 "$file" "${file}_"
rm -vf "$file"
fi
done
for file in db/*.csv*; do
if ! head -n 1 "$file" | grep -q "movieId"; then
echo "Adding header to $file"
case "$file" in
*ratings.csv_*) eval "$seds \"1s/^/$ratings_header\\n/\" \"$file\"" ;;
*tags.csv_*) eval "$seds \"1s/^/$tags_header\\n/\" \"$file\"" ;;
*links.csv*) eval "$seds \"1s/^/$links_header\\n/\" \"$file\"" ;;
*movies.csv*) eval "$seds \"1s/^/$movies_header\\n/\" \"$file\"" ;;
esac
fi
done
echo "Processing completed."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment