Skip to content

Commit 193aa29

Browse files
committed
/tests/ports/psoc6: Check tests.
Signed-off-by: NikhitaR-IFX <nikhita.rajasekhar@infineon.com>
1 parent 4181ee4 commit 193aa29

1 file changed

Lines changed: 150 additions & 0 deletions

File tree

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import subprocess
2+
import sys
3+
import os
4+
import logging
5+
import argparse
6+
7+
logger = logging.getLogger("fs")
8+
logging.basicConfig(format="%(levelname)s: %(message)s", encoding="utf-8", level=logging.DEBUG)
9+
10+
11+
test_input_dir = "./ports/psoc6/inputs"
12+
test_script_dir = "./ports/psoc6/mp_custom"
13+
args = ""
14+
15+
mpremote = "./tools/mpremote/mpremote.py"
16+
17+
18+
def set_mpr_run_script(mem_type):
19+
if mem_type == "sd":
20+
return f"run {test_script_dir}/fs_mount_sd.py"
21+
return ""
22+
23+
24+
def set_remote_dir_path(mem_type):
25+
return "/sd/" if mem_type == "sd" else "/"
26+
27+
28+
def get_test_input_files(test_type):
29+
cp_input_files = [
30+
"test_fs_small_file.txt",
31+
"test_fs_medium_file.txt",
32+
"test_fs_large_file.txt",
33+
]
34+
input_files_sizes = ["10240", "511876", "1047584"]
35+
36+
if test_type == "basic":
37+
return [cp_input_files[0]], [input_files_sizes[0]]
38+
elif test_type == "adv":
39+
return cp_input_files, input_files_sizes
40+
41+
42+
def get_test_name(test_type, mem_type):
43+
mem_type_suffix = "_sd" if mem_type == "sd" else ""
44+
return f"fs_{test_type}{mem_type_suffix}.py"
45+
46+
47+
def run_subprocess(command):
48+
try:
49+
result = subprocess.run(command, shell=True, capture_output=True, check=True)
50+
return result.stdout.decode().split("\n")
51+
except subprocess.CalledProcessError as e:
52+
logger.error(f"Command '{command}' failed with error: {e}")
53+
sys.exit(1)
54+
55+
56+
def ls_files(files, mpr_connect, mpr_run_script, remote_directory_path):
57+
mpr_ls = f"{mpr_connect} {mpr_run_script} fs ls {remote_directory_path}"
58+
output_lines = run_subprocess(mpr_ls)
59+
files_result = []
60+
61+
for file in files:
62+
file_size = -1
63+
for line in output_lines:
64+
line = line.split()
65+
if file in line:
66+
file_size = line[0]
67+
files_result.append(file_size)
68+
69+
return files_result
70+
71+
72+
def rm_files(files, mpr_connect, mpr_run_script, remote_directory_path):
73+
rm_sub_cmd = " + ".join([f"fs rm {remote_directory_path}{file}" for file in files])
74+
mpr_rm_cmd = f"{mpr_connect} {mpr_run_script} {rm_sub_cmd}"
75+
run_subprocess(mpr_rm_cmd)
76+
77+
78+
def rm_files_if_exist(files, mpr_connect, mpr_run_script, remote_directory_path):
79+
matches = ls_files(files, mpr_connect, mpr_run_script, remote_directory_path)
80+
existing_files = [files[i] for i in range(len(matches)) if matches[i] != -1]
81+
82+
if existing_files:
83+
print(f"Removing existing files...")
84+
rm_files(existing_files, mpr_connect, mpr_run_script, remote_directory_path)
85+
matches = ls_files(files, mpr_connect, mpr_run_script, remote_directory_path)
86+
if matches == [-1 for _ in range(len(files))]:
87+
print(f"Existing files removed.")
88+
89+
90+
def copy_files(input_cp_files, mpr_connect, remote_directory_path):
91+
cp_sub_cmd = " + ".join(
92+
[f"cp {test_input_dir}/{file} :{remote_directory_path}" for file in input_cp_files]
93+
)
94+
cp_cmd = f"{mpr_connect} {cp_sub_cmd}"
95+
run_subprocess(cp_cmd)
96+
97+
98+
def validate_test(files, file_sizes, mpr_connect, mpr_run_script, remote_directory_path):
99+
found_sizes = ls_files(files, mpr_connect, mpr_run_script, remote_directory_path)
100+
if found_sizes != file_sizes:
101+
print(f"\nfail {get_test_name(test_type, mem_type)}")
102+
sys.exit(1)
103+
else:
104+
print(f"\npass {get_test_name(test_type, mem_type)}")
105+
sys.exit(0)
106+
107+
108+
def cp_files_test(
109+
input_files, input_files_size, mpr_connect, mpr_run_script, remote_directory_path
110+
):
111+
rm_files_if_exist(input_files, mpr_connect, mpr_run_script, remote_directory_path)
112+
copy_files(input_files, mpr_connect, remote_directory_path)
113+
validate_test(
114+
input_files, input_files_size, mpr_connect, mpr_run_script, remote_directory_path
115+
)
116+
117+
118+
def large_file_tests(device, test_type, mem_type):
119+
mpr_connect = f"{mpremote} connect {device}"
120+
mpr_run_script = set_mpr_run_script(mem_type)
121+
remote_directory_path = set_remote_dir_path(mem_type)
122+
123+
input_files, input_files_size = get_test_input_files(test_type)
124+
cp_files_test(
125+
input_files, input_files_size, mpr_connect, mpr_run_script, remote_directory_path
126+
)
127+
128+
129+
if __name__ == "__main__":
130+
parser = argparse.ArgumentParser(
131+
prog="fs_test.py",
132+
usage="%(prog)s [options] [command]",
133+
description="Filesystem test script",
134+
)
135+
parser.add_argument(
136+
'--device', required=False, default="/dev/ttyACM4", help="Device to connect to"
137+
)
138+
parser.add_argument(
139+
'--test_type', choices=['basic', 'adv'], default='basic', help="Type of test to run"
140+
)
141+
parser.add_argument(
142+
'--mem_type', choices=['flash', 'sd'], default='flash', help="Type of memory to test"
143+
)
144+
145+
args = parser.parse_args(sys.argv[1:])
146+
device = args.device
147+
test_type = args.test_type
148+
mem_type = args.mem_type
149+
150+
large_file_tests(device, test_type, mem_type)

0 commit comments

Comments
 (0)