|
1 | 1 | import configparser |
| 2 | +import ipaddress |
2 | 3 | import shutil |
3 | 4 | import sys |
4 | 5 | from os import mkdir |
|
8 | 9 | from sqlite3 import connect |
9 | 10 | from threading import Lock |
10 | 11 |
|
11 | | -from sqlalchemy import create_engine, MetaData |
| 12 | +from sqlalchemy import create_engine, MetaData, func |
12 | 13 | from sqlalchemy.exc import IllegalStateChangeError |
13 | 14 | from sqlalchemy.orm import sessionmaker, scoped_session |
14 | 15 |
|
@@ -109,7 +110,36 @@ def initialize_db(): |
109 | 110 |
|
110 | 111 | # Even if the default workspace exists, we still need to check if every protocol has a database (in case of a new protocol) |
111 | 112 | init_protocol_dbs("default") |
112 | | - |
| 113 | + |
| 114 | +def format_host_query(q, filter_term, HostsTable): |
| 115 | + """One annoying thing is that if you search for an ip such as '10.10.10.5', |
| 116 | + it will return 10.10.10.5 and 10.10.10.52, so we have to check if its an ip address first |
| 117 | + """ |
| 118 | + # the FTP and SSH protocols call the column host instead of IP |
| 119 | + # TODO: normalize these column names |
| 120 | + if hasattr(HostsTable.c, "ip"): |
| 121 | + ip_column = HostsTable.c.ip |
| 122 | + nxc_logger.debug("Using 'ip' column for filtering") |
| 123 | + elif hasattr(HostsTable.c, "host"): |
| 124 | + ip_column = HostsTable.c.host |
| 125 | + nxc_logger.debug("Using 'host' column for filtering") |
| 126 | + else: |
| 127 | + nxc_logger.debug("Neither 'ip' nor 'host' columns found in the table") |
| 128 | + return q |
| 129 | + |
| 130 | + # first we check if its an ip address |
| 131 | + try: |
| 132 | + ipaddress.ip_address(filter_term) |
| 133 | + nxc_logger.debug(f"filter_term is an IP address: {filter_term}") |
| 134 | + q = q.filter(ip_column == filter_term) |
| 135 | + except ValueError: |
| 136 | + nxc_logger.debug(f"filter_term is not an IP address: {filter_term}") |
| 137 | + like_term = func.lower(f"%{filter_term}%") |
| 138 | + |
| 139 | + # check if the hostname column exists for hostname searching |
| 140 | + q = q.filter(ip_column.like(like_term) | func.lower(HostsTable.c.hostname).like(like_term)) if hasattr(HostsTable.c, "hostname") else q.filter(ip_column.like(like_term)) |
| 141 | + |
| 142 | + return q |
113 | 143 |
|
114 | 144 | class BaseDB: |
115 | 145 | def __init__(self, db_engine): |
|
0 commit comments