Skip to content

Output visualizer #296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
/support/default_validator/default_validator
/support/interactive/interactive
build/
*_images/
6 changes: 0 additions & 6 deletions examples/different/input_validators/different.ctd

This file was deleted.

35 changes: 35 additions & 0 deletions examples/different/output_visualizer/outputvisualizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import sys
import os
from PIL import Image, ImageDraw, ImageFont

def main():
if not len(sys.argv) == 3:
print("Usage: output_visualizer.py <submission_output> <feedback_dir>")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sys.exit(1)

with open (sys.argv[1], 'r') as f:
submission_output = f.read()
feedback_dir = sys.argv[2]

# Create an image with a white background
width, height = 400, 200
image = Image.new('RGB', (width, height), color=(255, 255, 255))
draw = ImageDraw.Draw(image)

# Use default font
font = ImageFont.load_default()

# Construct text to display (limit length for simplicity)
text = f"Output:\n{submission_output[:100]}"

# Draw the text onto the image
draw.multiline_text((10, 10), text, fill=(0, 0, 0), font=font)

# Save the image to feedback directory
outfile = os.path.join(feedback_dir, "visualizer_output.png")
image.save(outfile)
print(f"Image saved to {outfile}")
sys.exit(0)

if __name__ == '__main__':
main()
210 changes: 201 additions & 9 deletions problemtools/verifyproblem.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import hashlib
import collections
import os
from pathlib import Path
import signal
import re
import shutil
Expand Down Expand Up @@ -99,6 +100,7 @@ def __init__(self, args: argparse.Namespace, executor: ThreadPoolExecutor|None)
self.data_filter: Pattern[str] = args.data_filter
self.submission_filter: Pattern[str] = args.submission_filter
self.fixed_timelim: int|None = args.fixed_timelim
self.save_output_visualizer_images: bool = args.save_visualizer
self.executor = executor
self._background_work: list[concurrent.futures.Future[object]] = []

Expand Down Expand Up @@ -322,8 +324,16 @@ def run_submission(self, sub, runner: Runner, context: Context) -> Result:

def run_submission_real(self, sub, context: Context, timelim: int, timelim_low: int, timelim_high: int) -> Result:
# This may be called off-main thread.

feedbackdir = os.path.join(self._problem.tmpdir, f"feedback-{self.counter}")
# The problem directory persists long enough that reuse is a problem
if os.path.exists(feedbackdir):
shutil.rmtree(feedbackdir)
os.makedirs(feedbackdir)

outfile = "" # TODO: what should we do for interactive/multipass?
if self._problem.get(ProblemTestCases)['is_interactive']:
res_high = self._problem.classes[OutputValidators.PART_NAME].validate_interactive(self, sub, timelim_high, self._problem.classes[Submissions.PART_NAME])
res_high = self._problem.classes[OutputValidators.PART_NAME].validate_interactive(self, sub, timelim_high, self._problem.classes[Submissions.PART_NAME], feedbackdir)
else:
outfile = os.path.join(self._problem.tmpdir, f'output-{self.counter}')
errfile = os.path.join(self._problem.tmpdir, f'error-{self.counter}')
Expand All @@ -341,7 +351,7 @@ def run_submission_real(self, sub, context: Context, timelim: int, timelim_low:
info = None
res_high = SubmissionResult('RTE', additional_info=info)
else:
res_high = self._problem.classes[OutputValidators.PART_NAME].validate(self, outfile)
res_high = self._problem.classes[OutputValidators.PART_NAME].validate(self, outfile, feedbackdir)
res_high.runtime = runtime

if res_high.runtime <= timelim_low:
Expand All @@ -365,6 +375,16 @@ def run_submission_real(self, sub, context: Context, timelim: int, timelim_low:
res.set_ac_runtime()
res_low.set_ac_runtime()
res_high.set_ac_runtime()

visualizer = self._problem.classes.get(OutputVisualizer.PART_NAME)
if visualizer.visualizer_exists():
output_path = Path(f"{self._problem.shortname}_images") / Path(sub.path).name / Path(self.infile).stem
if outfile:
visualizer.visualize(outfile, feedbackdir, output_path, context)
else:
with tempfile.NamedTemporaryFile() as f:
visualizer.visualize(f.name, feedbackdir, output_path, context)

return (res, res_low, res_high)

def _init_result_for_testcase(self, res: SubmissionResult) -> SubmissionResult:
Expand Down Expand Up @@ -1368,7 +1388,15 @@ def _actual_validators(self) -> list:
return [val for val in vals if val is not None]


def validate_interactive(self, testcase: TestCase, submission, timelim: int, errorhandler: Submissions) -> SubmissionResult:
def validate_interactive(self, testcase: TestCase, submission, timelim: int, errorhandler: Submissions, feedbackdir: str) -> SubmissionResult:
"""
Validate a submission against all output validators.

Parameters:
testcase: The test case we are validating.
submission: The submission to validate.
feedback_dir_path: Path to feedback directory. If None, a temporary directory will be created and cleaned up.
"""
# This may be called off-main thread.
interactive_output_re = r'\d+ \d+\.\d+ \d+ \d+\.\d+ (validator|submission)'
res = SubmissionResult('JE')
Expand All @@ -1383,13 +1411,23 @@ def validate_interactive(self, testcase: TestCase, submission, timelim: int, err

val_timelim = self.problem.get(ProblemConfig)['limits']['validation_time']
val_memlim = self.problem.get(ProblemConfig)['limits']['validation_memory']
first_validator = True
for val in self._actual_validators():
if val.compile()[0]:
feedbackdir = tempfile.mkdtemp(prefix='feedback', dir=self.problem.tmpdir)
# Subtle point: if we're running multipass, we must ensure feedback dir is not reset
# If we're running multipass, there exists exactly one output validator
# Only deleting on second iteration is still fine with legacy and multiple output validators
if not first_validator:
feedback_path = Path(feedbackdir)
shutil.rmtree(feedback_path)
feedback_path.mkdir()
first_validator = False

validator_args[2] = feedbackdir + os.sep
f = tempfile.NamedTemporaryFile(delete=False)
interactive_out = f.name
f.close()

i_status, _ = interactive.run(outfile=interactive_out,
args=initargs + val.get_runcmd(memlim=val_memlim) + validator_args + [';'] + submission_args, work_dir=submission.path)
if is_RTE(i_status):
Expand Down Expand Up @@ -1425,25 +1463,48 @@ def validate_interactive(self, testcase: TestCase, submission, timelim: int, err
res.validator_first = (first == 'validator')

os.unlink(interactive_out)
shutil.rmtree(feedbackdir)
if res.verdict != 'AC':
res.from_validator = True
return res
# TODO: check that all output validators give same result
return res


def validate(self, testcase: TestCase, submission_output: str) -> SubmissionResult:
def validate(self, testcase: TestCase, submission_output: str, feedback_dir_path: str|None = None) -> SubmissionResult:
"""
Run all output validators on the given test case and submission output.

Parameters:
testcase: The test case we are validating.
submission_output: Path to out file of submission.
feedback_dir_path: Path to feedback directory. If None, a temporary directory will be created and cleaned up.
"""
res = SubmissionResult('JE')
res.from_validator = True
val_timelim = self.problem.get(ProblemConfig)['limits']['validation_time']
val_memlim = self.problem.get(ProblemConfig)['limits']['validation_memory']
flags = self.problem.get(ProblemConfig)['validator_flags'].split() + testcase.testcasegroup.config['output_validator_flags'].split()

first_validator = True
for val in self._actual_validators():
if val.compile()[0]:
feedbackdir = tempfile.mkdtemp(prefix='feedback', dir=self.problem.tmpdir)
# Subtle point: if we're running multipass, we must ensure feedback dir is not reset
# If we're running multipass, there exists exactly one output validator
# Only deleting on second iteration is still fine with legacy and multiple output validators
if not first_validator:
feedback_path = Path(feedbackdir)
shutil.rmtree(feedback_path)
feedback_path.mkdir()
first_validator = False

if feedback_dir_path:
feedbackdir = feedback_dir_path
else:
feedbackdir = tempfile.mkdtemp(prefix='feedback', dir=self.problem.tmpdir)
validator_output = tempfile.mkdtemp(prefix='checker_out', dir=self.problem.tmpdir)
outfile = validator_output + "/out.txt"
errfile = validator_output + "/err.txt"
status, runtime = val.run(submission_output,
status, runtime = val.run(infile=submission_output,
args=[testcase.infile, testcase.ansfile, feedbackdir] + flags,
timelim=val_timelim, memlim=val_memlim,
outfile=outfile, errfile=errfile)
Expand All @@ -1460,15 +1521,140 @@ def validate(self, testcase: TestCase, submission_output: str) -> SubmissionResu
except IOError as e:
self.info("Failed to read validator output: %s", e)
res = self._parse_validator_results(val, status, feedbackdir, testcase)
shutil.rmtree(feedbackdir)
shutil.rmtree(validator_output)
if feedback_dir_path is None:
shutil.rmtree(feedbackdir)
res.from_validator = True
if res.verdict != 'AC':
return res

# TODO: check that all output validators give same result
return res


class OutputVisualizer(ProblemPart):
"""Handles output visualizers. Runs an output visualizer for each
testcase on each submission. Always runs visualizer to sanity check
output, saves images to disk if flag is passed.

Example of created file structure when run on different:
different_images
├── different.c
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if I have different.c both as an accepted and as a wrong_answer submission?

│ ├── 01
│ │ └── visualizer_output.png
│ ├── 02_extreme_cases
│ │ └── visualizer_output.png
│ └── 1
│ └── visualizer_output.png
└── different.cc
├── 01
│ └── visualizer_output.png
├── 02_extreme_cases
│ └── visualizer_output.png
└── 1
└── visualizer_output.png
"""
PART_NAME = 'output_visualizer'

ALLOWED_FILE_EXTENSIONS = [".png", ".jpg", ".jpeg", ".svg"]

def setup(self):
self._visualizer = run.find_programs(os.path.join(self.problem.probdir,'output_visualizer'),
work_dir=self.problem.tmpdir,
language_config=self.problem.language_config)

self.count = 0
self._has_precompiled = False

def __str__(self) -> str:
return 'output visualizer'

@staticmethod
def setup_dependencies():
return [OutputValidators]

# Does an early compilatilation of the visualizer
def start_background_work(self, context: Context) -> None: #kan
if not self._has_precompiled:
context.submit_background_work(lambda v: v.compile(), self._visualizer)
self._has_precompiled = True

def check(self, context: Context) -> bool:
if len(self._visualizer) != 1:
self.warning(f'Wrong amount of visualizers. \nExcpected: 1\nActual: {len(self._visualizer)}')

# Checks if a file's extension is allowed, and if so validates its header
def check_is_valid_image(self, file) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are libraries and standard tools to do this type of file type detection. I'd prefer we use them, if possible, instead of re-implementing that logic ourselves. There's a python package called python-magic. (An uglier option would be to just use file --mime-type from the shell, but that feels hacky).

simple_header_formats = [
('.png', b'\x89PNG\r\n\x1a\n'), # PNG file header
('jpg', b'\xff\xd8\xff\xe0\x10\x00JF') # JPEG and JPG file header
]
simple_header_formats.append(('jpeg', simple_header_formats[1][1]))
# If the file is not an svg it then reads in the first 8 bytes and checks
# that its header / magic number is as expected
if any(file.suffix == end for end, _ in simple_header_formats):
header = [header for end, header in simple_header_formats if file.suffix == end][0]
with open(file, "rb") as f:
file_signature = f.read(8)
if not file_signature.startswith(header):
self.warning(f"File {file} has incorrect file header")
return False
return True
elif file.endswith('.svg'):
try:
#TODO make this more robust
# Reads the XML declaration and first 256 characters, as <svg has a high probability of existing here
# If this breaks due to svg appearing further in, either increase bytes read or do this properly
with open(file, 'r', encoding='utf-8') as f:
content = f.read(256)
if content.startswith('<?xml') and '<svg' in content:
return True
except Exception as e:
self.warning(f"Error checking SVG: {e}")
else:
return False

def visualizer_exists(self)->bool:
return bool(self._visualizer)

def visualize(self, result_file: str, feedback_dir: str, output_dir: Path, context: Context):
"""Run visualizer on result_file, and optionally save it to disk"""
generated_image_paths = []

if not self.visualizer_exists():
return
visualizer = self._visualizer[0]

#Tries to run the visualzier and raises a warning if failed
try:
status, runtime = visualizer.run(args=[result_file,feedback_dir])
if status != 0:
self.warning(f'The output visualizer crashed, status: {status}')
except Exception as e:
self.warning(f'Error running output visualizer: {e}')

any_invalid_headers = False

for file in Path(feedback_dir).iterdir():
if file.is_file() and file.suffix in self.ALLOWED_FILE_EXTENSIONS:
if self.check_is_valid_image(file):
generated_image_paths.append(file)
else:
any_invalid_headers = True

if context.save_output_visualizer_images:
for image in generated_image_paths:
output_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(image, output_dir / Path(image).name)

#Raises a warning if the file signature is wrong or the list is empty
if not generated_image_paths:
if any_invalid_headers:
self.warning("The visualizer did not generate images with proper file headers")
else:
self.warning("The visualizer did not generate any images")


class Runner:
def __init__(self, problem: Problem, sub, context: Context, timelim: int, timelim_low: int, timelim_high: int) -> None:
self._problem = problem
Expand Down Expand Up @@ -1735,9 +1921,11 @@ def check(self, context: Context) -> bool:
'graders': [Graders],
'data': [ProblemTestCases],
'submissions': [Submissions],
'visualizers': [OutputVisualizer] #TODO for testing
},
'2023-07': { # TODO: Add all the parts
'statement': [ProblemStatement2023_07, Attachments],
'visualizers': [OutputVisualizer]
}
}

Expand Down Expand Up @@ -1841,6 +2029,7 @@ def check(self, args: argparse.Namespace) -> tuple[int, int]:
self.msg(f'Checking {part}')
for item in self.part_mapping[part]:
self.classes[item.PART_NAME].check(context)
#TODO Dirsystem with multipass
except VerifyError:
pass
finally:
Expand Down Expand Up @@ -1905,6 +2094,9 @@ def argparser_basic_arguments(parser: argparse.ArgumentParser) -> None:
default='automatic', choices=list(PROBLEM_FORMATS.keys()) + ['automatic'],
help='which problem format should the package be interpreted as, or "automatic" if it should be figured out from problem.yaml')

parser.add_argument('-sv', '--save_visualizer',
action='store_true',
help="Save visualizer outputs to disk")

def argparser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description='Validate a problem package in the Kattis problem format.')
Expand Down