该工具用于将大型 CSV 文件切割为更小的部分,以便更轻松地导入到数据库中。支持的 CSV 文件包括评分、标签、链接和电影信息。该工具旨在提高数据处理的效率和便捷性。
- 自动切割大型 CSV 文件,减少导入时的负担。
- 为每个 CSV 文件添加适当的表头。
- 支持 MySQL 数据库的连接和数据导入。
- 处理后自动删除原始 CSV 文件以节省空间。
- Node.js
- MySQL 数据库
knex和csv-parser模块
npm install knex csv-parser mysql2This tool is designed to split large CSV files into smaller parts for easier import into a database. Supported CSV files include ratings, tags, links, and movie information. The tool aims to enhance data processing efficiency and convenience.
knex and csv-parser modulesnpm install knex csv-parser mysql2| const knexLib = require('knex'); | |
| const fs = require('fs'); | |
| const csv = require('csv-parser'); | |
| let {DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DB_NAME} = process.env; | |
| let dbDir = "../../db/"; | |
| if (!fs.existsSync(dbDir)) { | |
| dbDir = "./db/"; | |
| } | |
| const genres = []; | |
| let genreMap = {} | |
| const moviesGenres = []; | |
| const insert_num = 500; | |
| const knex = knexLib({ | |
| client: 'mysql2', connection: { | |
| host: DB_HOST ?? "127.0.0.1", | |
| port: DB_PORT ?? 3306, | |
| user: DB_USER ?? "admin", | |
| password: DB_PASSWORD ?? "123456", | |
| database: DB_NAME ?? 'data', | |
| }, pool: { | |
| min: 2, max: 150, acquireTimeoutMillis: 60000, createTimeoutMillis: 6000, idleTimeoutMillis: 60000, | |
| } | |
| }); | |
| const tableSchemas = { | |
| links: (table) => { | |
| table.integer('movieId').notNullable(); | |
| table.string('imdbId', 30).nullable(); | |
| table.string('tmdbId', 30).nullable(); | |
| table.primary('movieId'); | |
| }, movies: (table) => { | |
| table.integer('movieId').notNullable(); | |
| table.string('title', 255).nullable(); | |
| table.primary('movieId'); | |
| }, ratings: (table) => { | |
| table.integer('userId').notNullable(); | |
| table.integer('movieId').notNullable(); | |
| table.decimal('rating', 2, 1).nullable(); | |
| table.integer('timestamp').nullable(); | |
| table.primary(['userId', 'movieId']); | |
| }, tags: (table) => { | |
| table.integer('userId').notNullable(); | |
| table.integer('movieId').notNullable(); | |
| table.string('tag', 255).notNullable(); | |
| table.integer('timestamp').nullable(); | |
| table.primary(['userId', 'movieId', 'tag']); | |
| }, genres: (table) => { | |
| table.increments('genreId').unsigned().primary(); | |
| table.string('genre', 255).nullable().unique(); | |
| }, movies_genres: (table) => { | |
| table.integer('movieId').notNullable(); | |
| table.integer('genreId').unsigned().notNullable(); | |
| table.primary(['movieId', 'genreId']); | |
| table.foreign('movieId').references('movies.movieId').onDelete('CASCADE'); | |
| table.foreign('genreId').references('genres.genreId').onDelete('CASCADE'); | |
| } | |
| }; | |
| function msg_last(num) { | |
| for (let i = 0; i < num; i++) { | |
| process.stdout.write('\033[1A'); | |
| process.stdout.write('\033[K'); | |
| } | |
| } | |
| function delay(ms) { | |
| return new Promise(resolve => setTimeout(resolve, ms)); | |
| } | |
| async function dbTest() { | |
| await knex.raw('SELECT 1') | |
| .then(() => { | |
| console.log("✅ Database connected successfully"); | |
| }) | |
| .catch((err) => { | |
| console.error("❌ Database connection failed:", err); | |
| process.exit(0); | |
| }); | |
| } | |
| async function createTable() { | |
| for (const table of Object.keys(tableSchemas).reverse()) { | |
| if (await knex.schema.hasTable(table)) await knex.schema.dropTable(table); | |
| } | |
| for (const table of Object.keys(tableSchemas)) { | |
| await knex.schema.createTable(table, tableSchemas[table]); | |
| console.log(`✅ Table "${table}" created successfully`); | |
| } | |
| } | |
| async function importDB() { | |
| const files = fs.readdirSync(dbDir).filter(file => file.includes('.csv')); | |
| for (const file of files) { | |
| let table = file.split('.')[0]; | |
| let filePath = `${dbDir}${file}`; | |
| const records = []; | |
| await new Promise((resolve, reject) => { | |
| console.log(`Importing "${table}" from "${filePath}"...`); | |
| fs.createReadStream(filePath) | |
| .pipe(csv()) | |
| .on('data', (data) => { | |
| if (table === 'movies') { | |
| records.push({movieId: data.movieId, title: data.title}); | |
| const genreList = data.genres.split('|').map(genre => genre.trim()); | |
| for (const genre of genreList) { | |
| if (!genres.includes(genre)) genres.push(genre); | |
| } | |
| } else { | |
| records.push(data); | |
| } | |
| }) | |
| .on('end', async () => { | |
| for (let i = 0; i < records.length; i += insert_num) { | |
| await knex(table).insert(records.slice(i, i + insert_num)).onConflict().ignore(); | |
| msg_last(1) | |
| console.log(`✅ Inserted ${i + insert_num} records into ${table}.`); | |
| } | |
| console.log(`✅ Inserted ${records.length} records into "${table}". [${Object.values(records[records.length - 1])}]`); | |
| resolve(); | |
| }) | |
| .on('error', (error) => { | |
| reject(error); | |
| }); | |
| }); | |
| if (!filePath.includes("movies")) { | |
| fs.unlinkSync(filePath); | |
| console.log(`✅ Deleted "${filePath}"`); | |
| } | |
| } | |
| console.log('All databases have been imported successfully.'); | |
| } | |
| async function importMoviesGenres() { | |
| for (const genre of genres) { | |
| const [genreId] = await knex('genres').insert({genre}); | |
| // const [genreId] = await knex('genres').insert({genre}).onConflict().ignore(); | |
| genreMap[genre] = genreId; | |
| } | |
| console.log('Inserted genres:', genres.length); | |
| const files = fs.readdirSync(dbDir).filter(file => file.includes('movies')); | |
| for (const file of files) { | |
| let filePath = `${dbDir}${file}`; | |
| await new Promise((resolve, reject) => { | |
| console.log(`Importing "movies_genres" from "${filePath}"...`); | |
| fs.createReadStream(filePath) | |
| .pipe(csv()) | |
| .on('data', (data) => { | |
| const movieId = parseInt(data.movieId); | |
| const genreList = data.genres.split('|').map(genre => genre.trim()); | |
| genreList.forEach(genre => { | |
| const genreId = genreMap[genre]; | |
| if (genreId) { | |
| moviesGenres.push({movieId, genreId: parseInt(genreId)}); | |
| } else { | |
| console.log(`Genre "${genre}" not found in genreMap.`); | |
| } | |
| }); | |
| }) | |
| .on('end', async () => { | |
| if (moviesGenres.length > 0) { | |
| try { | |
| for (let i = 0; i < moviesGenres.length; i += insert_num) { | |
| await knex('movies_genres').insert(moviesGenres.slice(i, i + insert_num)).onConflict().ignore(); | |
| msg_last(1) | |
| console.log(`Inserted ${i + insert_num} records into "movies_genres".`); | |
| } | |
| } catch (error) { | |
| console.error('Error inserting data:', error); | |
| } finally { | |
| await knex.destroy(); | |
| } | |
| } else { | |
| console.log('No genre data found to insert.'); | |
| await knex.destroy(); | |
| } | |
| resolve(); | |
| }) | |
| .on('error', (error) => { | |
| reject(error); | |
| }); | |
| }); | |
| } | |
| } | |
| async function modifyAllTables() { | |
| try { | |
| await knex.schema.alterTable('movies', (table) => { | |
| table.bigInteger('movieId').alter(); | |
| table.string('title', 255).defaultTo('Unknown Title').alter(); | |
| }); | |
| console.log('Modified movies table'); | |
| await knex.schema.alterTable('genres', (table) => { | |
| table.bigInteger('genreId').alter(); | |
| table.string('genre', 255).defaultTo('Unknown Genre').alter(); | |
| }); | |
| console.log('Modified genres table'); | |
| await knex.schema.alterTable('movies_genres', (table) => { | |
| table.bigInteger('movieId').alter(); | |
| table.bigInteger('genreId').alter(); | |
| }); | |
| console.log('Modified movies_genres table'); | |
| await knex.schema.alterTable('links', (table) => { | |
| table.bigInteger('movieId').alter(); | |
| table.string('imdbId', 50).defaultTo(null).alter(); | |
| table.string('tmdbId', 50).defaultTo(null).alter(); | |
| }); | |
| console.log('Modified links table'); | |
| await knex.schema.alterTable('ratings', (table) => { | |
| table.bigInteger('userId').alter(); | |
| table.bigInteger('movieId').alter(); | |
| table.decimal('rating', 3, 2).defaultTo(null).alter(); | |
| table.integer('timestamp').defaultTo(null).alter(); | |
| }); | |
| console.log('Modified ratings table'); | |
| await knex.schema.alterTable('tags', (table) => { | |
| table.bigInteger('userId').alter(); | |
| table.bigInteger('movieId').alter(); | |
| table.string('tag', 500).notNullable().alter(); | |
| table.integer('timestamp').defaultTo(null).alter(); | |
| }); | |
| console.log('Modified tags table'); | |
| } catch (error) { | |
| console.error('Error modifying tables:', error); | |
| } finally { | |
| await knex.destroy(); | |
| } | |
| } | |
| (async function () { | |
| await dbTest() | |
| if (fs.existsSync(dbDir + "links.csv")) { | |
| await createTable(); | |
| } | |
| await importDB().catch(err => console.error(err)); | |
| await importMoviesGenres().catch(err => console.error(err)); | |
| await knex.destroy(); | |
| })(); |
| echo "Splitting files..." | |
| ratings_header="userId,movieId,rating,timestamp" | |
| tags_header="userId,movieId,tag,timestamp" | |
| links_header="movieId,imdbId,tmdbId" | |
| movies_header="movieId,title,genres" | |
| mkdir -p db | |
| split -l 800000 -d ml-32m/ratings.csv db/ratings.csv_ | |
| split -l 800000 -d ml-32m/tags.csv db/tags.csv_ | |
| cp -f ml-32m/links.csv db/ | |
| cp -f ml-32m/movies.csv db/ | |
| threshold=$((10 * 1024 * 1024)) | |
| if uname -a | grep -q Darwin; then | |
| stats="stat -f%z" | |
| seds="sed -i ''" | |
| else | |
| stats="stat -c%s" | |
| seds="sed -i" | |
| fi | |
| for file in db/*.csv*; do | |
| filesize=$(eval "$stats \"$file\"") | |
| if ((filesize > threshold)); then | |
| echo "Splitting $file" | |
| split -l 100000 "$file" "${file}_" | |
| rm -vf "$file" | |
| fi | |
| done | |
| for file in db/*.csv*; do | |
| if ! head -n 1 "$file" | grep -q "movieId"; then | |
| echo "Adding header to $file" | |
| case "$file" in | |
| *ratings.csv_*) eval "$seds \"1s/^/$ratings_header\\n/\" \"$file\"" ;; | |
| *tags.csv_*) eval "$seds \"1s/^/$tags_header\\n/\" \"$file\"" ;; | |
| *links.csv*) eval "$seds \"1s/^/$links_header\\n/\" \"$file\"" ;; | |
| *movies.csv*) eval "$seds \"1s/^/$movies_header\\n/\" \"$file\"" ;; | |
| esac | |
| fi | |
| done | |
| echo "Processing completed." |