Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions src/cfengine_cli/docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,16 @@ def extract_inline_code(path, languages):
indent = count_indent(lines[first_line])
# Index of first line to NOT include, the line after closing triple backtick:
last_line = child.map[1]
lines_to_extract = lines[first_line:last_line]
_remove_indentation(lines_to_extract, indent)
yield {
"language": language,
"flags": flags,
"first_line": child.map[0],
"last_line": child.map[1],
"indent": indent,
"lines": lines[
first_line:last_line
], # Includes backtick fences on both sides
# TODO: Dead code, the actual lines are extracted "twice" (2 places in the codebase)
"lines": lines_to_extract, # Includes backtick fences on both sides
}


Expand Down Expand Up @@ -97,12 +98,34 @@ def get_markdown_files(start, languages):
return return_dict


def _leading_spaces(s):
n = 0
for c in s:
if c == " ":
n += 1
else:
return n
return n


def _remove_indentation(snippet_lines, indent):
for i, line in enumerate(snippet_lines):
if line == "":
continue
assert line.startswith(" " * indent)
snippet_lines[i] = line[indent:]


def fn_extract(origin_path, snippet_path, _language, first_line, last_line):
try:
with open(origin_path, "r") as f:
content = f.read()

code_snippet = "\n".join(content.split("\n")[first_line + 1 : last_line - 1])
lines = f.readlines()
lines = [x[0:-1] for x in lines] # Remove newlines
fence = lines[first_line]
indent = _leading_spaces(fence)
snippet_lines = lines[first_line + 1 : last_line - 1]
_remove_indentation(snippet_lines, indent)
code_snippet = "\n".join(snippet_lines)

with open(snippet_path, "w") as f:
f.write(code_snippet + "\n")
Expand Down Expand Up @@ -174,7 +197,7 @@ def fn_replace(origin_path, snippet_path, _language, first_line, last_line, inde
origin_lines = f.read().split("\n")
pretty_lines = pretty_content.split("\n")

pretty_lines = [" " * indent + x for x in pretty_lines]
pretty_lines = ["" if x == "" else " " * indent + x for x in pretty_lines]

offset = len(pretty_lines) - len(
origin_lines[first_line + 1 : last_line - 1]
Expand Down Expand Up @@ -213,7 +236,6 @@ def fn_autoformat(
except json.decoder.JSONDecodeError:
raise UserError(f"Invalid json in '{snippet_path}'")
case "cf":
# Note: Dead code - Not used for CFEngine policy yet
format_policy_file(snippet_path, 80, False)
return False

Expand Down Expand Up @@ -434,7 +456,7 @@ def _format_docs_single_arg(path):
formatted = True
_process_markdown_code_blocks(
path=path,
languages=["json"], # TODO: Add cfengine3 here
languages=["json", "cfengine3"],
extract=True,
syntax_check=False,
output_check=False,
Expand Down
89 changes: 59 additions & 30 deletions src/cfengine_cli/format.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from enum import IntEnum
from typing import IO

import json
Expand Down Expand Up @@ -38,6 +39,14 @@
SAMELINE_COMMENT_SEPARATOR = " "


class FormatResult(IntEnum):
"""Outcome of a formatting operation."""

NO_CHANGE = 0 # File was already formatted, no reformat needed
NEEDS_FORMAT = 1 # Reformat needed (reported in check mode)
REFORMATTED = 2 # File was reformatted successfully


def _has_direct_macro(node: Node) -> bool:
"""Check if any direct child of a node is a macro (non-recursive)."""
return any(child.type == "macro" for child in node.children)
Expand All @@ -64,12 +73,8 @@ def _contains_list_with_comment(nodes: Node | list[Node]) -> bool:
return _contains_list_with_comment(nodes.children)


def format_json_file(filename: str, check: bool) -> int:
"""Reformat a JSON file in place using cfbs pretty-printer.

Returns 0 in case of successful reformat or no reformat needed.
Returns 1 when check is True and reformat is needed.
"""
def format_json_file(filename: str, check: bool) -> FormatResult:
"""Reformat a JSON file in place using cfbs pretty-printer."""
assert filename.endswith(".json")

if check:
Expand All @@ -78,16 +83,17 @@ def format_json_file(filename: str, check: bool) -> int:
assert type(success) is bool
if not success:
print(f"JSON file '{filename}' needs reformatting")
return int(not success)
return FormatResult.NEEDS_FORMAT
return FormatResult.NO_CHANGE

try:
reformatted = pretty_file(filename)
if reformatted:
print(f"JSON file '{filename}' was reformatted")
return 0 # Successfully reformatted or no reformat needed
return FormatResult.REFORMATTED
return FormatResult.NO_CHANGE
except json.decoder.JSONDecodeError as e:
print(f"JSON file '{filename}' invalid ({str(e)})")
return 1
raise UserError(f"JSON file '{filename}' invalid ({str(e)})")


def text(node: Node) -> str:
Expand Down Expand Up @@ -1008,11 +1014,9 @@ def _autoformat(
# ---------------------------------------------------------------------------


def format_policy_file(filename: str, line_length: int, check: bool) -> int:
def format_policy_file(filename: str, line_length: int, check: bool) -> FormatResult:
"""Format a .cf policy file in place, writing only if content changed.

Returns 0 in case of successful reformat or no reformat needed.
Returns 1 when check is True and reformat is needed.
Raises PolicySyntaxError when the file has syntax errors."""
assert filename.endswith(".cf")

Expand All @@ -1031,21 +1035,28 @@ def format_policy_file(filename: str, line_length: int, check: bool) -> int:
fmt = Formatter()
_autoformat(root_node, fmt, line_length)

new_data = fmt.buffer + "\n"
new_data = fmt.buffer
if not new_data.endswith("\n"):
# TODO: Look into why formatter sometimes outputs
# trailing newline and other times not.
new_data += "\n"
assert new_data.endswith("\n") and not new_data.endswith("\n\n")

if new_data != original_data.decode("utf-8"):
if check:
print(f"Policy file '{filename}' needs reformatting")
return 1
return FormatResult.NEEDS_FORMAT

with open(filename, "w") as f:
f.write(new_data)
print(f"Policy file '{filename}' was reformatted")
return 0
return FormatResult.REFORMATTED
return FormatResult.NO_CHANGE


def format_policy_fin_fout(
fin: IO[str], fout: IO[str], line_length: int, check: bool
) -> int:
) -> FormatResult:
"""Format CFEngine policy read from fin, writing the result to fout.

Raises PolicySyntaxError when the input has syntax errors."""
Expand All @@ -1065,10 +1076,12 @@ def format_policy_fin_fout(

new_data = fmt.buffer + "\n"
fout.write(new_data)
return 0
if new_data != original_data.decode("utf-8"):
return FormatResult.REFORMATTED
return FormatResult.NO_CHANGE


def _format_filename(filename: str, line_length: int, check: bool) -> int:
def _format_filename(filename: str, line_length: int, check: bool) -> FormatResult:
"""Format a single file.

Raises PolicySyntaxError for .cf files with syntax errors."""
Expand All @@ -1079,8 +1092,17 @@ def _format_filename(filename: str, line_length: int, check: bool) -> int:
raise UserError(f"Unrecognized file format: {filename}")


def _format_dirname(directory: str, line_length: int, check: bool) -> int:
ret = 0
def _combine_results(a: FormatResult, b: FormatResult) -> FormatResult:
"""Combine two format results. A non-NO_CHANGE result takes precedence.

Within a single format run, `check` is fixed, so NEEDS_FORMAT and
REFORMATTED never occur together — one always dominates NO_CHANGE."""
return b if b != FormatResult.NO_CHANGE else a


def _format_dirname(directory: str, line_length: int, check: bool) -> FormatResult:
"""Format files in directory recursively."""
ret = FormatResult.NO_CHANGE
for root, dirs, files in os.walk(directory):
# Don't recurse into hidden folders
dirs[:] = [d for d in dirs if not d.startswith(".")]
Expand All @@ -1100,18 +1122,20 @@ def _format_dirname(directory: str, line_length: int, check: bool) -> int:
continue # Test files skipped during directory traversal
filepath = os.path.join(root, name)
if name.endswith(".json") or name.endswith(".cf"):
ret |= _format_filename(filepath, line_length, check)
ret = _combine_results(
ret, _format_filename(filepath, line_length, check)
)
return ret


def _format_paths_inner(names, line_length, check) -> int:
def _format_paths_inner(names, line_length, check) -> FormatResult:
if not names:
return _format_dirname(".", line_length, check)
if len(names) == 1 and names[0] == "-":
# Special case, format policy file from stdin to stdout
return format_policy_fin_fout(sys.stdin, sys.stdout, line_length, check)

ret = 0
ret = FormatResult.NO_CHANGE
for name in names:
if name == "-":
raise UserError(
Expand All @@ -1120,14 +1144,12 @@ def _format_paths_inner(names, line_length, check) -> int:
if not os.path.exists(name):
raise UserError(f"{name} does not exist")
if os.path.isfile(name):
ret |= _format_filename(name, line_length, check)
ret = _combine_results(ret, _format_filename(name, line_length, check))
continue
if os.path.isdir(name):
ret |= _format_dirname(name, line_length, check)
ret = _combine_results(ret, _format_dirname(name, line_length, check))
continue
if check:
return ret
return 0
return ret


def format_paths(names, line_length, check) -> int:
Expand All @@ -1138,7 +1160,14 @@ def format_paths(names, line_length, check) -> int:
needed), 1 when reformatting is needed in check mode or a policy file
has syntax errors."""
try:
return _format_paths_inner(names, line_length, check)
r = _format_paths_inner(names, line_length, check)
if r == FormatResult.NEEDS_FORMAT:
assert check
# File needs reformatting
return 1
assert r in (FormatResult.NO_CHANGE, FormatResult.REFORMATTED)
# Success - no format needed or successfully reformatted
return 0
except PolicySyntaxError as e:
print(f"Error: {e}")
return 1
Loading