Skip to content

Commit

Permalink
Text File Splitting Job (#556)
Browse files Browse the repository at this point in the history
implement job for text file splitting

Co-authored-by: michelwi <[email protected]>
Co-authored-by: Nick Rossenbach <[email protected]>
  • Loading branch information
3 people authored Dec 4, 2024
1 parent 23704ff commit ab07cf4
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 3 deletions.
24 changes: 23 additions & 1 deletion text/info.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
__all__ = ["SentenceLengthHistogramJob"]
__all__ = ["SentenceLengthHistogramJob", "CountLinesJob"]

from collections import Counter
import subprocess
from typing import Optional

from i6_core.util import uopen
Expand Down Expand Up @@ -88,3 +89,24 @@ def _plot(counter: Counter, fig_path: str):
ax.plot(x, y, "s")

fig.savefig(fig_path, bbox_inches="tight")


class CountLinesJob(Job):
def __init__(self, input_text: tk.Path):
self.input_text = input_text

self.out_num_lines = self.output_var("num_lines")

self.rqmt = {"cpu": 1, "mem": 1, "time": 1}

def tasks(self):
yield Task("run", rqmt=self.rqmt)

def run(self):
zcat_cmd = ["zcat", "-f", self.input_text.get_path()]
zcat_res = subprocess.run(zcat_cmd, check=True, capture_output=True)

wc_cmd = ["wc", "-l"]
wc_res = subprocess.run(wc_cmd, input=zcat_res.stdout, check=True, capture_output=True)

self.out_num_sentences.set(int(wc_res.stdout.decode("utf-8").strip()))
91 changes: 89 additions & 2 deletions text/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@
"TailJob",
"SetDifferenceJob",
"WriteToTextFileJob",
"SplitTextFileJob",
]

import logging
import os
import shutil
import subprocess
from collections.abc import Iterable
from typing import List, Union
import tempfile
from typing import List, Optional, Union

from sisyphus import Job, Task, Path, global_settings as gs
from sisyphus import Job, Task, Path, global_settings as gs, toolkit as tk
from sisyphus.delayed_ops import DelayedBase

import i6_core.util as util
Expand Down Expand Up @@ -305,3 +310,85 @@ def run(self):
f.write(f"{line}\n")
else:
raise NotImplementedError


class SplitTextFileJob(Job):
def __init__(
self,
text_file: tk.Path,
num_lines_per_split: int,
num_text_file_lines: Optional[int] = None,
zip_output: bool = True,
):
"""
Job splits a text file into several smaller files.
https://stackoverflow.com/a/45761990/2062195
:param text_file: Input text file to be processed.
:param num_lines_per_split: Number of lines per split.
:param num_text_file_lines: Number of lines in the input text file.
:param zip_output: compress the output files.
"""
self.in_text_file = text_file
self.num_lines_per_split = num_lines_per_split
self.num_text_file_lines = num_text_file_lines
self.zip_output = zip_output

if num_text_file_lines is not None:
self.num_output_files = self.num_text_file_lines // self.num_lines_per_split + int(
bool(self.num_text_file_lines % self.num_lines_per_split)
)
else:
raise NotImplementedError

self.out_split_text_files = {
k: self.output_path(f'split.{k:04}.{"txt.gz" if zip_output else "txt"}')
for k in range(1, self.num_output_files + 1)
}

self.run_rqmt = {"cpu": 1, "mem": 12.0, "time": 6.0}

def tasks(self):
yield Task("run", rqmt=self.run_rqmt)

def run(self):
with tempfile.TemporaryDirectory() as tmp_dir:
if self.in_text_file.get_path().endswith(".gz"):
logging.info("Un-compressing file")
text_file = f"{tmp_dir}/input_file.txt"
with open(text_file, "wt") as f_in:
uncompress_cmd = ["gzip", "-cdk", self.in_text_file.get_path()]
subprocess.run(uncompress_cmd, check=True, stdout=f_in)
else:
text_file = self.in_text_file.get_path()

logging.info("Split lines")
split_cmd = [
"split",
"-l",
str(self.num_lines_per_split),
"--suffix-length=4",
"--numeric-suffixes=1",
"--additional-suffix=.txt",
text_file,
f"{tmp_dir}/split.",
]
subprocess.run(split_cmd, check=True)

for file_id in range(1, self.num_output_files + 1):
file_path = f"split.{file_id:04}.txt"
assert os.path.isfile(file_path) and os.path.getsize(file_path) > 0

if self.zip_output:
logging.info("Compressing file")
compress_cmd = ["gzip"] + [
f"{tmp_dir}/split.{file_id:04}.txt" for file_id in range(1, self.num_output_files + 1)
]
subprocess.run(compress_cmd, check=True)

for file_id in range(1, self.num_output_files + 1):
shutil.move(
f"{tmp_dir}/split.{file_id:04}.txt.gz" if self.zip_output else f"split.{file_id:04}.txt",
self.out_split_text_files[file_id].get_path(),
)

0 comments on commit ab07cf4

Please sign in to comment.