diff --git a/corpus/llm-jp-corpus-v4/common/dedup/README-ja.md b/corpus/llm-jp-corpus-v4/common/dedup/README-ja.md new file mode 100644 index 00000000..00a025fe --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/README-ja.md @@ -0,0 +1,39 @@ +# 類似重複除去スクリプト + +このディレクトリには、コーパスからの類似重複を除去するためのスクリプトが含まれています。 +重複除去は、[datatrove](https://github.com/huggingface/datatrove) に実装された Minhash-LSH をベースとしています。 + +重複除去は以下の2段階で行います: +- 各コーパス内での重複除去 +- 各コーパスでの重複除去後、全体での重複除去 + +## スクリプト実行順 + +0. 必要なライブラリのインストール + - `installer/install.sh` + +1. ファイルサイズを均一化して処理時間のバランスを取るためのリシャーディング + - `preprocess/reshard_all.sh` + +2. 各コーパスごとの重複除去 + - `minhash` + - 詳細は `minhash/README.md` を参照 + +3. シンボリックリンクを用いて、前処理済みのファイルを1つのディレクトリに集約 + - `preprocess/make_links.sh` + +4. 全コーパスを対象としたグローバルな重複除去 + - `minhash` + - 詳細は `minhash/README.md` を参照 + +5. 重複除去されたファイルの再配置 + - 重複除去後のファイルはディレクトリ構造を保持せずに保存されます。 + - 以下の手順で再配置を行います: + 1. 重複除去中にテキストの順序がランダム化されていないことを確認 + - `postprocess/check_original_path_consisitency.py` + 2. 各コーパスの元のディレクトリ構造を復元 + - `postprocess/reconstruct_stracture.py` + +## 関連リポジトリ + +参考:[datatrove](https://github.com/huggingface/datatrove) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/README.md b/corpus/llm-jp-corpus-v4/common/dedup/README.md new file mode 100644 index 00000000..1ea8ca60 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/README.md @@ -0,0 +1,39 @@ +# Deduplication Scripts + +This directory contains scripts for near-duplicate removal from corpora. +The deduplication process is based on Minhash-LSH implemented in [datatrove](https://github.com/huggingface/datatrove). + +We perform deduplication in two main stages: +- Deduplication within each individual corpus +- Global deduplication across all corpora that have been locally deduplicated + +## Script Execution Order + +0. Install required libraries + - `installer/install.sh` + +1. Reshard files to equalize file sizes and balance processing time + - `preprocess/reshard_all.sh` + +2. Perform deduplication within each corpus + - `minhash` + - See `minhash/README.md` for details + +3. Collect all preprocessed files into a single directory using symbolic links + - `preprocess/make_links.sh` + +4. Perform global deduplication across all corpora + - `minhash` + - See `minhash/README.md` for details + +5. Reorganize the deduplicated files + - Deduplicated files are saved without preserving directory structure. + - Steps: + 1. Verify that texts are not randomized during deduplication + - `postprocess/check_original_path_consisitency.py` + 2. Reconstruct original directory structure for each corpus + - `postprocess/reconstruct_stracture.py` + +## Related Repository + +See also: [datatrove](https://github.com/huggingface/datatrove) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/installer/datatrove_diff.patch b/corpus/llm-jp-corpus-v4/common/dedup/installer/datatrove_diff.patch new file mode 100644 index 00000000..52054816 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/installer/datatrove_diff.patch @@ -0,0 +1,26 @@ +diff --git a/src/datatrove/pipeline/dedup/minhash.py b/src/datatrove/pipeline/dedup/minhash.py +index f644389..3f1b50b 100644 +--- a/src/datatrove/pipeline/dedup/minhash.py ++++ b/src/datatrove/pipeline/dedup/minhash.py +@@ -256,6 +254,8 @@ class MinhashDedupSignature(PipelineStep): + doc_idx, + ) + ) ++ else: ++ logger.warning(f"No singles {doc.text=} on {rank=}") + for file in buckets: + file.close() + +diff --git a/src/datatrove/utils/text.py b/src/datatrove/utils/text.py +index 7ab7d3d..d0c9cb6 100644 +--- a/src/datatrove/utils/text.py ++++ b/src/datatrove/utils/text.py +@@ -259,6 +257,8 @@ def simplify_text(text: str, config=DEF_TEXT_NORM_CONFIG) -> str: + + # from https://tedboy.github.io/nlps/_modules/nltk/util.html#ngrams + def ngrams(sequence: Iterable, n: int): ++ if isinstance(sequence, list) and len(sequence) < n: ++ sequence += [""] * (n - len(sequence)) + iterables = tee(sequence, n) + + for i, sub_iterable in enumerate(iterables): # For each window, diff --git a/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh b/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh new file mode 100644 index 00000000..c2a00f75 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +work_dir=/model/experiments/0118_dedup_corpusv4_ja +env_dir=${work_dir}/environment +venv_dir=${env_dir}/.venv +src_dir=${env_dir}/src +script_root=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup + +export UV_PROJECT_ENVIRONMENT=$venv_dir + +# create environment +cd $work_dir || exit +mkdir -p $env_dir +cd $env_dir || exit + +python3 -m venv $venv_dir +source $venv_dir/bin/activate +pip install --upgrade --no-cache-dir pip uv +uv init + +# install requirement +uv add --no-cache -r ${script_root}/installer/requirements.txt + +# install customized datatrove +mkdir -p $src_dir +cd $src_dir || exit +git clone https://github.com/huggingface/datatrove.git -b v0.4.0 +cd datatrove || exit +patch -p1 <${script_root}/installer/datatrove_diff.patch +uv pip install --no-cache-dir ".[io,processing,cli]" diff --git a/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt b/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt new file mode 100644 index 00000000..eee707e2 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt @@ -0,0 +1,4 @@ +datatrove[io,processing,cli]==0.4.0 # will be re-installed, but install to resolve dependency +spacy[ja] # required when running minhash (datatrove) +sudachipy==0.5.4 # required when running minhash (datatrove) +paramiko # required to run processes on local multi node diff --git a/corpus/llm-jp-corpus-v4/common/dedup/minhash/README-ja.md b/corpus/llm-jp-corpus-v4/common/dedup/minhash/README-ja.md new file mode 100644 index 00000000..4c082a39 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/minhash/README-ja.md @@ -0,0 +1,32 @@ +# 重複除去コード + +このディレクトリには、[datatrove](https://github.com/huggingface/datatrove) を使用した重複除去スクリプトが含まれています。 +SLURMクラスタおよび複数のローカルサーバーの両方で実行できるように構成されています。 + +## SLURM 環境 + +このバージョンは SLURM クラスタ上での実行を想定しています。 + +### 使い方 + +```bash +python slurm/minhash_dedup.py {input_dir} {output_dir} +``` + +- `input_dir`: 入力データが格納されたディレクトリのパス。サブディレクトリも再帰的に走査されます。 +- `output_dir`: 重複除去後のデータを保存するディレクトリ。出力サブディレクトリは自動で作成されます。 +- ハッシュ処理に関するハイパーパラメータ(例:n-gram サイズ、バケット数、ハッシュ数)を設定可能です。詳細はスクリプト内のコメントを参照してください。 + +> `slurm/minhash_dedup.py` は [こちらの公式サンプル](https://github.com/huggingface/datatrove/blob/main/examples/minhash_deduplication.py) をもとに作成されています。 + +## ローカルマルチノード環境 + +このバージョンは、SSH 経由で複数のローカルマシンにまたがって分散実行することを想定しています。 + +### 構成 + +- `local_multi_node/submit_minhash_all_subcorpus.sh`: 全てのサブコーパスに対して重複除去を実行するシェルスクリプト。 +- `local_multi_node/submit_minhash.py`: ノードリストを読み込み、各ノードで重複除去処理を起動するランチャースクリプト。 +- `local_multi_node/minhash_dedup.py`: 各ノード上で実行されるワーカースクリプト。 + +> ※ このコードはプロトタイプ段階ですが、参考実装として共有します。 diff --git a/corpus/llm-jp-corpus-v4/common/dedup/minhash/README.md b/corpus/llm-jp-corpus-v4/common/dedup/minhash/README.md new file mode 100644 index 00000000..ee7b317e --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/minhash/README.md @@ -0,0 +1,33 @@ +# Deduplication Code + +This directory provides deduplication scripts using [datatrove](https://github.com/huggingface/datatrove). +It supports execution in two environments: on a SLURM cluster and on multiple local servers. + +## SLURM + +This version is designed for use on a SLURM-based cluster. + +### Usage + +```bash +python slurm/minhash_dedup.py {input_dir} {output_dir} +``` + +- `input_dir`: Path to the directory containing the input data. The script recursively scans subdirectories for files. +- `output_dir`: Path where deduplicated files will be written. Subdirectories will be automatically created under this path. +- You can also configure hyperparameters related to hashing (e.g., n-gram size, number of buckets, number of hashes per bucket). + Please refer to the comments in the code for details. + +> The script `slurm/minhash_dedup.py` was adapted from [this official example](https://github.com/huggingface/datatrove/blob/main/examples/minhash_deduplication.py). + +## Local Multi-Node + +This version supports deduplication across multiple local machines using distributed processing via SSH. + +### Structure + +- `local_multi_node/submit_minhash_all_subcorpus.sh`: Main launcher shell script to deduplicate all sub-corpus. +- `local_multi_node/submit_minhash.py`: Main launcher that reads the node list and runs deduplication on each machine. +- `local_multi_node/minhash_dedup.py`: Worker script executed on each node. + +> Note: This code is a prototype, but it is shared here for reference. diff --git a/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/minhash_dedup.py b/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/minhash_dedup.py new file mode 100644 index 00000000..4435b42c --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/minhash_dedup.py @@ -0,0 +1,138 @@ +from argparse import ArgumentParser +from dataclasses import dataclass +from pathlib import Path + +from datatrove.executor.local import LocalPipelineExecutor +from datatrove.pipeline.dedup import MinhashDedupSignature +from datatrove.pipeline.dedup.minhash import (MinhashConfig, + MinhashDedupBuckets, + MinhashDedupCluster, + MinhashDedupFilter) +from datatrove.pipeline.readers import JsonlReader +from datatrove.pipeline.tokens import TokensCounter +from datatrove.pipeline.writers.jsonl import JsonlWriter +from datatrove.utils.hashing import HashConfig +from datatrove.utils.typeshelper import Languages + + +@dataclass +class Args: + input: str + output: str + ngram: int + buckets: int + hashes_per_bucket: int + local_tasks: int + local_rank_offset: int + max_worker: int + stage:int + + +argparser = ArgumentParser() +argparser.add_argument("input", type=str) +argparser.add_argument("output", type=str) +argparser.add_argument("--ngram", default=5, type=int) +argparser.add_argument("-r", "--buckets", default=20, type=int) +argparser.add_argument("-b", "--hashes_per_bucket", default=10, type=int) +argparser.add_argument("-task", "--local_tasks", type=int, default=-1) +argparser.add_argument("-rank", "--local_rank_offset", type=int, default=0) +argparser.add_argument("-worker", "--max_worker", type=int, default=16) +argparser.add_argument("-stage", "--stage", type=int, choices=[1,2,3,4],default=4) +args = argparser.parse_args(namespace=Args) + + +MINHASH_DIRNAME = ( + f"minhash-{args.ngram}gram-{args.buckets}buckets-{args.hashes_per_bucket}hashes" +) +MINHASH_DIR = Path(args.output) / MINHASH_DIRNAME +RESULT_DIR = f"{MINHASH_DIR}/results" +LOG_DIR = f"{MINHASH_DIR}/logs" +SLURM_LOGS_DIR = f"{MINHASH_DIR}/slurm_logs" + +all_files = [_f for _f in Path(args.input).rglob("*") if _f.resolve().is_file()] +TOTAL_TASKS = len(all_files) + +# this is the original data that we want to deduplicate +INPUT_READER = JsonlReader(args.input) +# you can also change ngrams or the number of buckets and their size here +minhash_config = MinhashConfig( + hash_config=HashConfig(precision=64), + n_grams=args.ngram, + num_buckets=args.buckets, + hashes_per_bucket=args.hashes_per_bucket, +) # better precision -> fewer false positives (collisions) + +# stage 1 computes minhash signatures for each task (each task gets a set of files) +stage1 = LocalPipelineExecutor( + pipeline=[ + INPUT_READER, + MinhashDedupSignature( + output_folder=f"{RESULT_DIR}/signatures", + config=minhash_config, + language=Languages.japanese, + skip_existing_sigs=True, + ), + ], + tasks=TOTAL_TASKS, + workers=args.max_worker, + logging_dir=f"{LOG_DIR}/signatures", + local_tasks=args.local_tasks, + local_rank_offset=args.local_rank_offset, + randomize_start_duration=10, +) + +# stage 2 finds matches between signatures in each bucket +stage2 = LocalPipelineExecutor( + pipeline=[ + MinhashDedupBuckets( + input_folder=f"{RESULT_DIR}/signatures", + output_folder=f"{RESULT_DIR}/buckets", + config=minhash_config, + ), + ], + tasks=minhash_config.num_buckets, + workers=args.max_worker, + logging_dir=f"{LOG_DIR}/buckets", + depends=stage1, +) + +# stage 3 creates clusters of duplicates using the results from all buckets +stage3 = LocalPipelineExecutor( + pipeline=[ + MinhashDedupCluster( + input_folder=f"{RESULT_DIR}/buckets", + output_folder=f"{RESULT_DIR}/remove_ids", + config=minhash_config, + save_cluster_id=True, + save_cluster_size=True, + ), + ], + tasks=args.max_worker, + logging_dir=f"{LOG_DIR}/clusters", + depends=stage2, +) + +# stage 4 reads the original input data and removes all but 1 sample per duplicate cluster +# the data must match exactly stage 1, so number of tasks and the input source must be the same +stage4 = LocalPipelineExecutor( + pipeline=[ + INPUT_READER, + # TokensCounter(), # nice way to see how many tokens we had before and after deduplication + MinhashDedupFilter( + input_folder=f"{RESULT_DIR}/remove_ids", + exclusion_writer=JsonlWriter(f"{RESULT_DIR}/removed"), + load_cluster_ids=True, + load_cluster_sizes=True, + ), + JsonlWriter(output_folder=f"{RESULT_DIR}/deduplicated_output"), + ], + tasks=TOTAL_TASKS, + logging_dir=f"{LOG_DIR}/filter", + depends=stage3, + workers=args.max_worker, + local_tasks=args.local_tasks, + local_rank_offset=args.local_rank_offset, +) + +if __name__ == "__main__": + exec(f"stage{args.stage}.run()") diff --git a/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash.py b/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash.py new file mode 100644 index 00000000..71d0ad06 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash.py @@ -0,0 +1,204 @@ +import argparse +import getpass +import logging +from pathlib import Path + +import paramiko + +# Args default +WORK_DIR_DEFAULT = "/model/experiments/0118_dedup_corpusv4_ja/data" +PYTHON_SCRIPT_PATH_DEFAULT = ( + WORK_DIR_DEFAULT + + "/scripts/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/minhash_dedup.py" +) +VENV_PATH_DEFAULT = WORK_DIR_DEFAULT + "/environment/.venv/bin/activate" + +# server settings +USER_NAME = getpass.getuser() + +logging.basicConfig( + level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +class Args: + # For runner + input_dir: str + output_dir: str + stage: int + + # About paths + log_dir: str + venv_path: str + python_script: str + + # About server + node_list: list[str] + max_node_worker: int + + +def setup_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser() + runner_parser = parser.add_argument_group("Arguments for running scripts") + runner_parser.add_argument( + "--input_dir", + required=True, + type=str, + ) + runner_parser.add_argument( + "--output_dir", + required=True, + type=str, + ) + runner_parser.add_argument( + "-stage", "--stage", type=int, choices=[1, 2, 3, 4], default=4 + ) + path_parser = parser.add_argument_group("Arguments about paths") + path_parser.add_argument( + "--log_dir", + required=True, + type=str, + ) + path_parser.add_argument( + "--venv_path", + default=VENV_PATH_DEFAULT, + type=str, + ) + path_parser.add_argument( + "--python_script", + default=PYTHON_SCRIPT_PATH_DEFAULT, + type=str, + ) + server_parser = parser.add_argument_group("Arguments about server") + server_parser.add_argument( + "--node_list", + nargs="+", + type=str, + ) + server_parser.add_argument( + "--max_node_worker", + type=int, + ) + + return parser + + +# command = "kill -9 -- -1" + + +def submit_task(node, command): + try: + # SSHクライアントの準備 + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy( + paramiko.AutoAddPolicy() + ) # ホストキーの確認を無効化 + + # SSHエージェントから鍵を取得 + agent = paramiko.Agent() + keys = agent.get_keys() + if len(keys) == 0: + raise Exception( + "SSHエージェントに鍵が登録されていません。`ssh-add`で鍵を追加してください。" + ) + + # 実行ユーザー名でSSH接続 + ssh.connect(node, username=USER_NAME, pkey=keys[0]) # エージェントの鍵を使用 + + ssh.exec_command(command) + ssh.close() + + logger.info(f"{node}: ✅ Task Started as {USER_NAME}") + except Exception as e: + logger.info(f"{node}: ❌ FAILED - {str(e)}") + + +def prepare_command_prefix( + input_dir: Path, + output_dir: Path, + venv_path: Path, + python_script: Path, +): + python_args = [input_dir, output_dir] + concat_args = " ".join(python_args) + python_commands = f"python {python_script} {concat_args}" + command_prefix = "&&".join( + [ + "ulimit -n 65536 1048576", + f"source {venv_path}", + f"nohup bash -c '{python_commands}", + ] + ) + return command_prefix + + +def submit_minhash_job( + node_list: list[str], + input_dir: Path, + command_prefix, + stage: int, + log_dir: Path, + max_node_worker: int, +): + all_files = [_f for _f in input_dir.rglob("*") if _f.resolve().is_file()] + total_tasks = len(all_files) + + for i, finish_tasks in enumerate(range(0, total_tasks, max_node_worker)): + rest_tasks = total_tasks - finish_tasks + local_tasks = min(rest_tasks, max_node_worker) + node = node_list[i] + + # complete commannd + log_path = log_dir / (node + ".out") + logging_command = f"> {log_path} 2>&1" + nohup_sufix = "> nohup.out 2>&1 &" + command = "".join( + [ + command_prefix, + "--stage", + f"{stage}", + "--local_tasks", + f"{local_tasks}", + "--local_rank_offset", + f"{finish_tasks}", + f"{logging_command}", + f"{nohup_sufix}", + ] + ) + + submit_task(node, command) + + if stage in [2,3]: + # On stage2 and stage3, process is not distributed + break + + +def main( + input_dir: str, + output_dir: str, + stage: int, + log_dir: str, + venv_path: str, + python_script: str, + node_list: list[str], + max_node_worker: int, +): + command_prefix = prepare_command_prefix( + input_dir, output_dir, venv_path, python_script + ) + submit_minhash_job(node_list, input_dir, command_prefix,stage, log_dir, max_node_worker) + + +if __name__ == "__main__": + args = setup_parser().parse_args(namespace=Args) + main( + args.input_dir, + args.output_dir, + args.stage, + args.log_dir, + args.venv_path, + args.python_script, + args.node_list, + args.max_node_worker, + ) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash_all_subcorpus.sh b/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash_all_subcorpus.sh new file mode 100644 index 00000000..6ae79447 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash_all_subcorpus.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +set -eux + +stage=$1 +work_dir="/model/experiments/0118_dedup_corpusv4_ja" +env_path="${work_dir}/environment/.venv/bin/activate" +log_root="${work_dir}/local_logs" + +node_list=() +for i in $(seq 0 99); do + node_list+=("z-cpu$i") +done + +source $env_path + +target_dirs=( + aozorabunko + cc + ceek_news + e-gov + fineweb-2 + kaken + kokkai_giji + nwc2010 + nwjc + patent + sip_comprehensive_html + sip_comprehensive_pdf-pdf2text + sip_comprehensive_pdf-surya + warp_html + warp_pdf_e0 + warp_pdf_e0.2 + wiki +) + +# reshard +python_submit_script=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash.py +python_minhash_script=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/minhash_dedup.py + +for _dirname in "${target_dirs[@]}"; do + _target_dir=$work_dir/data/subcorpus/${_dirname} + if [ ! -d "$_target_dir" ]; then + echo "Directory does not exit. Skip: $_target_dir" + continue + fi + continue + + python $python_submit_script \ + --input_dir "${_target_dir}/reshard_1B" \ + --output_dir "$_target_dir" \ + --stage "${stage}" \ + --log_dir "${log_root}/${stage}/${_dirname}" \ + --venv_path $env_path \ + --python_script $python_minhash_script \ + --node_list "${node_list[@]}" \ + --max_node_worker 150 +done diff --git a/corpus/llm-jp-corpus-v4/common/dedup/minhash/slurm/minhash_dedup.py b/corpus/llm-jp-corpus-v4/common/dedup/minhash/slurm/minhash_dedup.py new file mode 100644 index 00000000..0fa00d68 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/minhash/slurm/minhash_dedup.py @@ -0,0 +1,158 @@ +from argparse import ArgumentParser +from dataclasses import dataclass +from pathlib import Path + +from datatrove.executor.slurm import SlurmPipelineExecutor +from datatrove.pipeline.dedup import MinhashDedupSignature +from datatrove.pipeline.dedup.minhash import (MinhashConfig, + MinhashDedupBuckets, + MinhashDedupCluster, + MinhashDedupFilter) +from datatrove.pipeline.readers import JsonlReader +from datatrove.pipeline.tokens import TokensCounter +from datatrove.pipeline.writers.jsonl import JsonlWriter +from datatrove.utils.hashing import HashConfig +from datatrove.utils.typeshelper import Languages + +WORK_DIR = "/home/shared/experiments/0118_dedup_corpusv4_ja" +VENV_PATH = f"{WORK_DIR}/environment/.venv/bin/activate" + + +@dataclass +class Args: + input: str + output: str + ngram: int + buckets: int + hashes_per_bucket: int + venv: str + + +argparser = ArgumentParser() +argparser.add_argument("input", type=str) +argparser.add_argument("output", type=str) +argparser.add_argument("--ngram", default=5, type=int) +argparser.add_argument("-r", "--buckets", default=20, type=int) +argparser.add_argument("-b", "--hashes_per_bucket", default=10, type=int) +argparser.add_argument("--venv", default=VENV_PATH, type=str) +args = argparser.parse_args(namespace=Args) + + +MINHASH_DIRNAME = ( + f"minhash-{args.ngram}gram-{args.buckets}buckets-{args.hashes_per_bucket}hashes" +) +MINHASH_DIR = Path(args.output) / MINHASH_DIRNAME +RESULT_DIR = f"{MINHASH_DIR}/results" +LOG_DIR = f"{MINHASH_DIR}/logs" +SLURM_LOGS_DIR = f"{MINHASH_DIR}/slurm_logs" + +all_files = [_f for _f in Path(args.input).rglob("*") if _f.resolve().is_file()] +TOTAL_TASKS = len(all_files) + +# this is the original data that we want to deduplicate +INPUT_READER = JsonlReader(args.input) +# you can also change ngrams or the number of buckets and their size here +minhash_config = MinhashConfig( + hash_config=HashConfig(precision=64), + n_grams=args.ngram, + num_buckets=args.buckets, + hashes_per_bucket=args.hashes_per_bucket, +) # better precision -> fewer false positives (collisions) + + +# stage 1 computes minhash signatures for each task (each task gets a set of files) +stage1 = SlurmPipelineExecutor( + job_name="calc_minhash", + pipeline=[ + INPUT_READER, + MinhashDedupSignature( + output_folder=f"{RESULT_DIR}/signatures", + config=minhash_config, + language=Languages.japanese, + skip_existing_sigs=True, + ), + ], + tasks=TOTAL_TASKS, + time="48:00:00", + partition="cpu", + logging_dir=f"{LOG_DIR}/signatures", + slurm_logs_folder=SLURM_LOGS_DIR, + venv_path=args.venv, + max_array_launch_parallel=True, + stagger_max_array_jobs=5, +) + +# stage 2 finds matches between signatures in each bucket +stage2 = SlurmPipelineExecutor( + job_name="match_minhash", + pipeline=[ + MinhashDedupBuckets( + input_folder=f"{RESULT_DIR}/signatures", + output_folder=f"{RESULT_DIR}/buckets", + config=minhash_config, + ), + ], + tasks=minhash_config.num_buckets, + time="120:00:00", + partition="cpu", + logging_dir=f"{LOG_DIR}/buckets", + depends=stage1, + slurm_logs_folder=SLURM_LOGS_DIR, + venv_path=args.venv, + max_array_launch_parallel=True, + stagger_max_array_jobs=5, +) + +# stage 3 creates clusters of duplicates using the results from all buckets +stage3 = SlurmPipelineExecutor( + job_name="calc_cluster", + pipeline=[ + MinhashDedupCluster( + input_folder=f"{RESULT_DIR}/buckets", + output_folder=f"{RESULT_DIR}/remove_ids", + config=minhash_config, + save_cluster_id=True, + save_cluster_size=True, + ), + ], + tasks=1, + time="120:00:00", + partition="cpu", + logging_dir=f"{LOG_DIR}/clusters", + mem_per_cpu_gb=70, + cpus_per_task=2, + depends=stage2, + slurm_logs_folder=SLURM_LOGS_DIR, + venv_path=args.venv, + max_array_launch_parallel=True, + stagger_max_array_jobs=5, +) + +# stage 4 reads the original input data and removes all but 1 sample per duplicate cluster +# the data must match exactly stage 1, so number of tasks and the input source must be the same +stage4 = SlurmPipelineExecutor( + job_name="remove_duplicate", + pipeline=[ + INPUT_READER, + TokensCounter(), # nice way to see how many tokens we had before and after deduplication + MinhashDedupFilter( + input_folder=f"{RESULT_DIR}/remove_ids", + exclusion_writer=JsonlWriter(f"{RESULT_DIR}/removed"), + load_cluster_ids=True, + load_cluster_sizes=True, + ), + JsonlWriter(output_folder=f"{RESULT_DIR}/deduplicated_output"), + ], + tasks=TOTAL_TASKS, + time="120:00:00", + partition="cpu", + logging_dir=f"{LOG_DIR}/filter", + depends=stage3, + slurm_logs_folder=SLURM_LOGS_DIR, + venv_path=args.venv, + max_array_launch_parallel=True, + stagger_max_array_jobs=5, +) + + +stage4.run() diff --git a/corpus/llm-jp-corpus-v4/common/dedup/postprocess/check_original_path_consisitency.py b/corpus/llm-jp-corpus-v4/common/dedup/postprocess/check_original_path_consisitency.py new file mode 100644 index 00000000..01ae3c97 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/postprocess/check_original_path_consisitency.py @@ -0,0 +1,69 @@ +import gzip +import json +import multiprocessing +import sys +from pathlib import Path + +from tqdm import tqdm + +input_dir = "/model/experiments/0118_dedup_corpusv4_ja/data/all/minhash-5gram-20buckets-10hashes/results/deduplicated_output" +parallel_jobs = 32 + +# fineweb-2 preserve original path on original fineweb-2 +patterns = ["s3://commoncrawl/crawl-data", "/fsx/guilherme/cc2023-50"] + + +def convert_patterns(path: str) -> str: + """ + Normalize the file path based on known prefix patterns. + + Examples: + >>> convert_patterns("s3://commoncrawl/crawl-data/CC-MAIN-2023/file1") + "s3://commoncrawl/crawl-data" + + >>> convert_patterns("/data/local/custom_corpus/file3") + "/data/local/custom_corpus/file3" + """ + for _pat in patterns: + if _pat in path: + return _pat + return path + + +def process_file(file): + unique_paths = set() + + try: + with gzip.open(file,"rt") as f: + for line in f: + try: + data = json.loads(line) + file_path = data.get("metadata").get("file_path") + converted_path = convert_patterns(file_path) + unique_paths.add(converted_path) + except json.JSONDecodeError: + continue + except Exception as e: + print(f"Error processing {file}: {e}", file=sys.stderr) + return None + + if len(unique_paths) != 1: + print(f"Warning: {file} has {len(unique_paths)} unique values!") + + +def main(): + files = list(Path(input_dir).rglob("*.jsonl.gz")) + + with multiprocessing.Pool(parallel_jobs) as pool: + list( + tqdm( + pool.imap_unordered(process_file, files), + total=len(files), + desc="Processing files", + ncols=0, + ), + ) + + +if __name__ == "__main__": + main() diff --git a/corpus/llm-jp-corpus-v4/common/dedup/postprocess/reconstruct_stracture.py b/corpus/llm-jp-corpus-v4/common/dedup/postprocess/reconstruct_stracture.py new file mode 100644 index 00000000..f0113d3b --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/postprocess/reconstruct_stracture.py @@ -0,0 +1,165 @@ +# Description: +# This script reconstructs the original directory structure and metadata for deduplicated JSONL.gz files. +# It uses the original file paths (stored in metadata) to group and rename the deduplicated files into subfolders +# using 0-based sequential filenames (e.g., 0000.jsonl.gz, 0001.jsonl.gz, ...). +# The metadata is also normalized to preserve both original and deduplication-related information. + +import argparse +import gzip +import json +from pathlib import Path +from typing import NamedTuple + +from tqdm import tqdm + + +class Args: + """Argument container for command-line parsing.""" + worker: int + input_dir: str + output_dir: str + + +def setup_parser(): + """Set up and return the command-line argument parser.""" + parser = argparse.ArgumentParser() + parser.add_argument("input_dir") + parser.add_argument("output_dir") + parser.add_argument("--worker", type=int, default=32) + return parser + + +class FilePathCreator: + """ + Helper class to create output file paths that mirror original corpus structure. + + It groups files using metadata information and creates sequential file names. + """ + def __init__(self) -> str: + self.counter = 0 + self.prev_mid_path = None + + def get_mid_path(self, path: str) -> str: + """ + Convert original input file path into a logical middle path for grouping. + Some known datasets are mapped to a fixed identifier like "ja_fineweb-2". + + Examples: + >>> get_mid_path("s3://commoncrawl/crawl-data/CC-MAIN-2023/file1") + "ja_fineweb-2" + + >>> get_mid_path("/model/experiments/0118_dedup_corpusv4_ja/data/subcorpus/warp_pdf_e0/metadata/sample.jsonl.gz") + "ja_warp_pdf/e0" + + >>> get_mid_path("/model/experiments/0118_dedup_corpusv4_ja/data/subcorpus/sip_comprehensive_pdf/section/sample.jsonl.gz") + "ja_sip/comprehensive/pdf" + """ + if "s3://commoncrawl/crawl-data" in path or "/fsx/guilherme/cc2023-50" in path: + return "ja_fineweb-2" + + original_file_prefix = ( + "/model/experiments/0118_dedup_corpusv4_ja/data/subcorpus/" + ) + path_sufix = path.replace(original_file_prefix, "") + path_parts = Path(path_sufix).parts + assert len(path_parts) >= 3, f"Input path is invalid format: {path}" + + path_root = "ja_" + path_parts[0] + if "sip_comprehensive_pdf" in path_root: + return path_root.replace("-", "/") + elif "warp_pdf" in path_root: + return path_root.replace("_e", "/e") + elif len(path_parts) == 3: + return path_root + else: + # len(path_parts)>3 + return "/".join([path_root] + list(path_parts[2:-1])) + + def get_file_path(self, path: str) -> Path: + """ + Generate a new file path using the normalized middle path and a counter-based filename. + The counter resets when the middle path changes. + """ + mid_path = self.get_mid_path(path) + if mid_path != self.prev_mid_path: + self.counter = 0 + self.prev_mid_path = mid_path + new_file = f"{self.counter:04d}.jsonl.gz" + self.counter += 1 + return Path(mid_path) / new_file + + +def normalize_jsonl(data: dict, add_file_path: bool = False): + """ + Normalize the metadata format of a JSONL entry. + + Combines metadata from various levels and relocates deduplication-related fields under `meta["dedup_meta"]`. + """ + meta: dict = data.get("metadata", {}).get("meta", {}) + meta_other1 = {k: v for k, v in data.items() if k not in ["text", "metadata", "id"]} + + dedup_meta_keys = ["minhash_cluster_id", "minhash_cluster_size", "token_count"] + # Extract metadata keys excluding the deduplication-related ones + meta_other2 = { + k: v + for k, v in data["metadata"].items() + if k not in (["file_path", "meta"] + dedup_meta_keys) + } + + # Ensure no overlapping keys between different metadata sections + assert len(set(meta.keys()) & set(meta_other1.keys())) == 0 + assert len(set(meta.keys()) & set(meta_other2.keys())) == 0 + assert len(set(meta_other1.keys()) & set(meta_other2.keys())) == 0 + + # Store deduplication metadata if required + if add_file_path: + dedup_meta_keys.append("file_path") + dedup_meta = {k: v for k, v in data["metadata"].items() if k in dedup_meta_keys} + + new_meta = meta | meta_other1 | meta_other2 | {"dedup_meta": dedup_meta} + + return {"text": data["text"], "meta": new_meta} + + +def convert_file(input_file: Path, output_file: Path): + """ + Read a gzipped JSONL file, normalize each line's metadata, and write to a new gzipped file. + If the file is from ja_fineweb-2, include the original file path in dedup metadata. + """ + output_file.parent.mkdir(parents=True, exist_ok=True) + assert not output_file.exists(), f"{output_file} exists!" + + # Determine if the original file is from ja_fineweb-2 to include additional metadata + add_file_path = False + if output_file.parts[3] == "ja_fineweb-2": + add_file_path = True + + with gzip.open(input_file, "rt") as f_read, gzip.open(output_file, "wt") as f_write: + for line in f_read: + data = json.loads(line) + normalized = normalize_jsonl(data, add_file_path) + f_write.write(json.dumps(normalized, ensure_ascii=False) + "\n") + + +class IO_File(NamedTuple): + """Simple tuple that pairs an input file with its output file path.""" + input_file: Path + output_file: Path + + +def setup_io(input_files: Path, output_dir: Path) -> list[IO_File]: + """ + Prepare a list of IO_File pairs by inspecting metadata from each input file. + Determines the correct output location and file name based on metadata. + """ + io_list = [] + file_path_creator = FilePathCreator() + for _file in tqdm(input_files, ncols=0): + with gzip.open(_file, "rt") as f: + line = f.readline() + data = json.loads(line) + original_file_path = data["metadata"]["file_path"] + output_file = file_path_creator.get_file_path(str(original_file_path)) + output_file = Path(output_dir) / output_file + io_list.append(IO_File(_file, output_file)) + return io_list diff --git a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/make_links.sh b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/make_links.sh new file mode 100644 index 00000000..bb9cfa8b --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/make_links.sh @@ -0,0 +1,12 @@ +#!/bin/bash +data_dir=/model/experiments/0118_dedup_corpusv4_ja/data + +for dir in "${data_dir}"/subcorpus/*; do + dir_name=$(basename "$dir") + + for file in "$dir/minhash-5gram-20buckets-10hashes/results/deduplicated_output/"*; do + file_name=$(basename "$file") + mkdir -p "$data_dir/all/deduped_subcorpus/$dir_name" + ln -s "$file" "$data_dir/all/deduped_subcorpus/$dir_name/$file_name" + done +done diff --git a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh new file mode 100644 index 00000000..1f3d46fa --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh @@ -0,0 +1,72 @@ +#!/bin/bash +#SBATCH --job-name=0118_reshard_corpus +#SBATCH --partition=cpu +#SBATCH --exclusive +#SBATCH --mem=0 +#SBATCH --output=slurm_logs/%j-%x.out +#SBATCH --error=slurm_logs/%j-%x.err + +# This script splits files from an input directory into smaller compressed chunks, +# preserving directory structure. Supports optional file pattern filtering and .gz input. +# +# Usage: +# sbatch this_script.sh [unit_size] [pattern] +# Example: +# sbatch this_script.sh ./data ./sharded 500M '\.jsonl$' + +set -eux + +input_dir=$1 +output_dir=$2 +unit_size=${3:-"1G"} # Target size per split chunk (default: 1G) +pattern=${4:-""} # Optional pattern to filter files + +input_dir=$(realpath "$input_dir") +mkdir -p "$output_dir" + +# Get list of all files (respecting directory structure) +all_files=$(find -L "$input_dir" -type f) + +# Filter files if a pattern is specified +if [[ -n "$pattern" ]]; then + all_files=$(echo "$all_files" | grep -E "$pattern" || true) +fi + +# Exit if no files match the pattern +if [[ -z "$all_files" ]]; then + echo "No matching files found. Exiting." + exit 1 +fi + +# Group files by their parent directory (relative to input_dir) +declare -A dir_files_map +while IFS= read -r file; do + relative_dir=$(dirname "${file#$input_dir/}") + output_subdir="$output_dir/$relative_dir" + + mkdir -p "$output_subdir" + dir_files_map["$output_subdir"]+="$file " +done <<<"$all_files" + +# For each group of files, perform splitting +for subdir in "${!dir_files_map[@]}"; do + file_list=${dir_files_map["$subdir"]} + + split_args=( + --suffix-length=4 + --additional-suffix=.jsonl + --line-bytes="$unit_size" + -d + --filter='zstd -T0 -o $FILE.zst' + --verbose + ) + + # Concatenate and split each file; decompress if .gz + for file in $file_list; do + if [[ "$file" == *.gz ]]; then + gunzip -c "$file" + else + cat "$file" + fi + done | split "${split_args[@]}" - "$subdir/" +done diff --git a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard_all.sh b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard_all.sh new file mode 100644 index 00000000..559b5d36 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard_all.sh @@ -0,0 +1,52 @@ +#!/bin/bash + +set -eux + +work_dir="/model/experiments/0118_dedup_corpusv4_ja" +reshard_script=${work_dir}/scripts/subcorpus/reshard.sh + +target_dirs=( + aozorabunko + cc + ceek_news + e-gov + fineweb-2 + kaken + kokkai_giji + nwc2010 + nwjc + patent + sip_comprehensive_html + sip_comprehensive_pdf-pdf2text + sip_comprehensive_pdf-surya + warp_html + warp_pdf_e0 + warp_pdf_e0.2 + wiki +) + +# reshard +reshard_script=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh +unit_size=1G + +declare -A patterns=( + ["kaken"]="train_*" + ["wiki"]="*train*" +) + +for _dir in "${target_dirs[@]}"; do + trg_dir=$work_dir/data/subcorpus/${_dir} + if [ ! -d "$trg_dir" ]; then + echo "Directory does not exit. Skip: $trg_dir" + continue + fi + continue + + pattern=${patterns[$_dir]:-""} + + bash $reshard_script \ + "${trg_dir}/raw" \ + "${trg_dir}/reshard_${unit_size}B" \ + "$unit_size" \ + "$pattern" +done