import os
import re
from collections import defaultdict
class AutoSysJobVisualizer:
def __init__(self, root_folder):
self.root_folder = root_folder
self.all_jobs = {}
self.box_jobs = defaultdict(list)
self.box_names = set()
def parse_jil_file(self, file_path):
jobs = {}
current_job = None
with open(file_path, 'r') as file:
for line in file:
line = line.strip()
if line.startswith('insert_job:'):
current_job = line.split()[1]
jobs[current_job] = {'conditions': [], 'box_name': None}
elif line.startswith('condition:') and current_job:
conditions = re.findall(r'([sdfn])\((.*?)\)', line)
jobs[current_job]['conditions'].extend(conditions)
elif line.startswith('box_name:') and current_job:
box_name = line.split()[1]
jobs[current_job]['box_name'] = box_name
self.box_jobs[box_name].append(current_job)
self.box_names.add(box_name)
return jobs
def find_all_dependencies(self, jobs):
dependencies = defaultdict(list)
for job, details in jobs.items():
for condition_type, condition_job in details['conditions']:
dependencies[job].append((condition_job, f'con: {condition_type}'))
if details['box_name']:
box_name = details['box_name']
dependencies[box_name].append((job, 'box: contains'))
return dependencies
def sanitize_name(self, name):
return name.replace('_', '').replace('-', '').replace('.', '').replace('^', '')
def generate_puml_script(self, dependencies):
puml_script = "@startuml\n"
job_names = set(dependencies.keys()).union(set(j for deps in dependencies.values() for j, _ in deps))
for job in job_names:
sanitized_name = self.sanitize_name(job)
if job in self.box_names:
puml_script += f'System_Ext({sanitized_name}, "{job}")\n'
else:
puml_script += f'System({sanitized_name}, "{job}")\n'
for source, targets in dependencies.items():
sanitized_source = self.sanitize_name(source)
for target, dep_type in targets:
sanitized_target = self.sanitize_name(target)
label = dep_type
puml_script += f'{sanitized_source} -> {sanitized_target} : {label}\n'
puml_script += "@enduml"
return puml_script
def run(self, output_file):
for dirpath, _, filenames in os.walk(self.root_folder):
for filename in filenames:
if filename.endswith('.jil'):
file_path = os.path.join(dirpath, filename)
jobs = self.parse_jil_file(file_path)
self.all_jobs.update(jobs)
dependencies = self.find_all_dependencies(self.all_jobs)
output = "\nJob chains:\n"
for source, targets in dependencies.items():
for target, dep_type in targets:
output += f"{source} -> {target} ({dep_type})\n"
puml_script = self.generate_puml_script(dependencies)
output += "\nC4 PlantUML Script:\n"
output += puml_script
output += "\nFull list of downstream jobs/boxes:\n"
output += ", ".join(dependencies.keys())
with open(output_file, 'w') as file:
file.write(output)
if __name__ == "__main__":
root_folder = '/path/to/your/jil/files' # Change to your JIL files directory
output_file = 'output.txt' # Change to your desired output file path
visualizer = AutoSysJobVisualizer(root_folder)
visualizer.run(output_file)
jil full
Leave a reply