diff --git a/backend/app/api/v1/module_generator/gencode/crud.py b/backend/app/api/v1/module_generator/gencode/crud.py index d61dd6c702aa858dabc4a78ed39a67a01b66755f..e540d53aab3d440ff6803ef8a1ec61be9617902c 100644 --- a/backend/app/api/v1/module_generator/gencode/crud.py +++ b/backend/app/api/v1/module_generator/gencode/crud.py @@ -2,6 +2,7 @@ from sqlalchemy.engine.row import Row from sqlalchemy import and_, select, text +from sqlalchemy import inspect as sa_inspect from typing import Sequence from sqlglot.expressions import Expression @@ -118,6 +119,42 @@ class GenTableCRUD(CRUDBase[GenTableModel, GenTableSchema, GenTableSchema]): """ await self.delete(ids=ids) + @staticmethod + def _sync_get_table_info(session, bind, database_type): + """ + 同步函数:获取数据库表信息 + + 参数: + - session: 数据库会话对象(由run_sync自动传入) + - bind: 数据库绑定对象 + - database_type: 数据库类型 + + 返回: + - tuple: (table_names, table_comments) + """ + # 使用SQLAlchemy Inspector获取表信息 + inspector = sa_inspect(bind) + + # 获取所有表名 + if database_type == "postgres": + table_names = inspector.get_table_names(schema='public') + else: + table_names = inspector.get_table_names() + + # 获取表注释信息 + table_comments = {} + for table_name in table_names: + try: + # 获取表选项(包括注释) + table_options = inspector.get_table_options(table_name) + comment = table_options.get('comment', '') + table_comments[table_name] = comment + except Exception: + # 如果无法获取表选项,设置为空字符串 + table_comments[table_name] = '' + + return table_names, table_comments + async def get_db_table_list(self, search: GenTableQueryParam | None = None) -> list[dict]: """ 根据查询参数获取数据库表列表信息。 @@ -128,79 +165,40 @@ class GenTableCRUD(CRUDBase[GenTableModel, GenTableSchema, GenTableSchema]): 返回: - list[dict]: 数据库表列表信息(已转为可序列化字典)。 """ - - # 使用更健壮的方式检测数据库方言 - if settings.DATABASE_TYPE == "postgres": - query_sql = ( - select( - text("t.table_catalog as database_name"), - text("t.table_name as table_name"), - text("t.table_type as table_type"), - text("pd.description as table_comment"), - ) - .select_from(text( - "information_schema.tables t \n" - "LEFT JOIN pg_catalog.pg_class c ON c.relname = t.table_name \n" - "LEFT JOIN pg_catalog.pg_namespace n ON n.nspname = t.table_schema AND c.relnamespace = n.oid \n" - "LEFT JOIN pg_catalog.pg_description pd ON pd.objoid = c.oid AND pd.objsubid = 0" - )) - .where( - and_( - text("t.table_catalog = (select current_database())"), - text("t.is_insertable_into = 'YES'"), - text("t.table_schema = 'public'"), - ) - ) - ) - else: - query_sql = ( - select( - text("table_schema as database_name"), - text("table_name as table_name"), - text("table_type as table_type"), - text("table_comment as table_comment"), - ) - .select_from(text("information_schema.tables")) - .where( - and_( - text("table_schema = (select database())"), - ) - ) - ) + # 使用run_sync包装同步的inspect操作 + table_names, table_comments = await self.auth.db.run_sync( + GenTableCRUD._sync_get_table_info, + self.auth.db.get_bind(), + settings.DATABASE_TYPE + ) - # 动态条件构造 - params = {} - if search and search.table_name: - query_sql = query_sql.where( - text("lower(table_name) like lower(:table_name)") - ) - params['table_name'] = f"%{search.table_name}%" - if search and search.table_comment: - # 对于PostgreSQL,表注释字段是pd.description,而不是table_comment - if settings.DATABASE_TYPE == "postgres": - query_sql = query_sql.where( - text("lower(pd.description) like lower(:table_comment)") - ) - else: - query_sql = query_sql.where( - text("lower(table_comment) like lower(:table_comment)") - ) - params['table_comment'] = f"%{search.table_comment}%" - - # 执行查询并绑定参数 - all_data = (await self.auth.db.execute(query_sql, params)).fetchall() - - # 将Row对象转换为字典列表,解决JSON序列化问题 + # 构造结果数据 dict_data = [] - for row in all_data: - # 检查row是否为Row对象 - if isinstance(row, Row): - # 使用._mapping获取字典 - dict_row = GenDBTableSchema(**dict(row._mapping)).model_dump() - dict_data.append(dict_row) - else: - dict_row = GenDBTableSchema(**dict(row)).model_dump() - dict_data.append(dict_row) + database_name = self.auth.db.get_bind().url.database or 'default' + + for table_name in table_names: + # 应用搜索过滤器 + if search: + # 过滤表名 + if search.table_name and search.table_name.lower() not in table_name.lower(): + continue + + # 过滤表注释 + if search.table_comment and search.table_comment.lower() not in table_comments[table_name].lower(): + continue + + # 构造表信息字典 + table_info = { + "database_name": database_name, + "table_name": table_name, + "table_type": "BASE TABLE", # SQLAlchemy Inspector不直接提供表类型,我们假设都是BASE TABLE + "table_comment": table_comments[table_name] + } + + # 使用GenDBTableSchema验证并转换为字典 + dict_row = GenDBTableSchema(**table_info).model_dump() + dict_data.append(dict_row) + return dict_data async def get_db_table_list_by_names(self, table_names: list[str]) -> list[GenDBTableSchema]: @@ -216,68 +214,19 @@ class GenTableCRUD(CRUDBase[GenTableModel, GenTableSchema, GenTableSchema]): # 处理空列表情况 if not table_names: return [] - - # 使用更健壮的方式检测数据库方言 - if settings.DATABASE_TYPE == "postgres": - # PostgreSQL使用ANY操作符和正确的参数绑定 - query_sql = """ - SELECT - t.table_catalog as database_name, - t.table_name as table_name, - t.table_type as table_type, - pd.description as table_comment - FROM - information_schema.tables t - LEFT JOIN pg_catalog.pg_class c ON c.relname = t.table_name - LEFT JOIN pg_catalog.pg_namespace n ON n.nspname = t.table_schema AND c.relnamespace = n.oid - LEFT JOIN pg_catalog.pg_description pd ON pd.objoid = c.oid AND pd.objsubid = 0 - WHERE - t.table_catalog = (select current_database()) - AND t.is_insertable_into = 'YES' - AND t.table_schema = 'public' - AND t.table_name = ANY(:table_names) - """ - else: - query_sql = """ - SELECT - table_schema as database_name, - table_name as table_name, - table_type as table_type, - table_comment as table_comment - FROM - information_schema.tables - WHERE - table_schema = (select database()) - AND table_name IN :table_names - """ - # 创建新的数据库会话上下文来执行查询,避免受外部事务状态影响 - try: - # 去重表名列表,避免重复查询 - unique_table_names = list(set(table_names)) - - # 使用只读事务执行查询,不影响主事务 - if settings.DATABASE_TYPE == "postgres": - gen_db_table_list = (await self.auth.db.execute(text(query_sql), {"table_names": unique_table_names})).fetchall() - else: - gen_db_table_list = (await self.auth.db.execute(text(query_sql), {"table_names": tuple(unique_table_names)})).fetchall() - except Exception as e: - log.error(f"查询表信息时发生错误: {e}") - # 查询错误时直接抛出,不需要事务处理 - raise + # 调用get_db_table_list获取所有表信息 + all_tables = await self.get_db_table_list() - # 将Row对象转换为字典列表,解决JSON序列化问题 - dict_data = [] - for row in gen_db_table_list: - # 检查row是否为Row对象 - if isinstance(row, Row): - # 使用._mapping获取字典 - dict_row = GenDBTableSchema(**dict(row._mapping)) - dict_data.append(dict_row) - else: - dict_row = GenDBTableSchema(**dict(row)) - dict_data.append(dict_row) - return dict_data + # 过滤出指定名称的表 + table_names_set = set(table_names) # 转换为集合以提高查找效率 + filtered_tables = [ + GenDBTableSchema(**table) + for table in all_tables + if table["table_name"] in table_names_set + ] + + return filtered_tables async def check_table_exists(self, table_name: str) -> bool: """ @@ -290,14 +239,15 @@ class GenTableCRUD(CRUDBase[GenTableModel, GenTableSchema, GenTableSchema]): - bool: 如果表存在返回True,否则返回False。 """ try: - # 根据不同数据库类型使用不同的查询方式 - if settings.DATABASE_TYPE.lower() == 'mysql': - query = text("SELECT 1 FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = :table_name") - else: - query = text("SELECT 1 FROM pg_tables WHERE tablename = :table_name") + # 使用SQLAlchemy的inspect功能检查表是否存在,这是数据库无关的方法 + bind = self.auth.db.get_bind() + inspector = sa_inspect(bind) - result = await self.auth.db.execute(query, {"table_name": table_name}) - return result.scalar() is not None + # 对于PostgreSQL,需要指定schema + if settings.DATABASE_TYPE.lower() == 'postgres': + return inspector.has_table(table_name, schema='public') + else: + return inspector.has_table(table_name) except Exception as e: log.error(f"检查表格存在性时发生错误: {e}") # 出错时返回False,避免误报表已存在 @@ -355,6 +305,81 @@ class GenTableColumnCRUD(CRUDBase[GenTableColumnModel, GenTableColumnSchema, Gen - auth (AuthSchema): 认证信息模型 """ super().__init__(model=GenTableColumnModel, auth=auth) + + @staticmethod + def _sync_get_table_columns(session, bind, database_type, table_name): + """ + 同步函数:获取数据库表的列信息 + + 参数: + - session: 数据库会话对象(由run_sync自动传入) + - bind: 数据库绑定对象 + - database_type: 数据库类型 + - table_name: 表名 + + 返回: + - list: 列信息列表 + """ + # 使用SQLAlchemy Inspector获取表列信息 + inspector = sa_inspect(bind) + + # 获取列信息 + columns = inspector.get_columns(table_name) + + # 获取主键信息 + try: + pk_constraint = inspector.get_pk_constraint(table_name) + primary_keys = set(pk_constraint.get("constrained_columns", [])) if pk_constraint else set() + except Exception: + primary_keys = set() + + # 获取唯一约束信息 + unique_columns = set() + try: + unique_constraints = inspector.get_unique_constraints(table_name) + for constraint in unique_constraints: + unique_columns.update(constraint.get("column_names", [])) + except Exception: + pass + + # 处理列信息 + columns_list = [] + for idx, column in enumerate(columns): + # 获取列的基本信息 + column_name = column['name'] + column_type = str(column['type']) + is_nullable = column.get('nullable', True) + column_default = column.get('default', None) + # 获取列注释(如果有的话) + column_comment = column.get('comment', '') + # 判断是否为主键 + is_pk = column_name in primary_keys + # 判断是否为唯一约束 + is_unique = column_name in unique_columns + # 判断是否为自增列(基于数据库类型和列类型) + is_increment = column.get('autoincrement', False) in (True, 'auto') + # 获取列长度(如果适用) + column_length = None + if hasattr(column['type'], 'length') and column['type'].length: + column_length = str(column['type'].length) + + # 构造列信息字典 + column_info = { + "column_name": column_name, + "column_comment": column_comment or '', + "column_type": column_type, + "column_length": column_length or '', + "column_default": str(column_default) if column_default is not None else '', + "sort": idx + 1, # 序号从1开始 + "is_pk": 1 if is_pk else 0, + "is_increment": 1 if is_increment else 0, + "is_nullable": 1 if is_nullable else 0, + "is_unique": 1 if is_unique else 0 + } + + columns_list.append(column_info) + + return columns_list async def get_gen_table_column_by_id(self, id: int, preload: list | None = None) -> GenTableColumnModel | None: """根据业务表字段ID获取业务表字段信息。 @@ -406,101 +431,21 @@ class GenTableColumnCRUD(CRUDBase[GenTableColumnModel, GenTableColumnSchema, Gen # 检查表名是否为空 if not table_name: raise ValueError("数据表名称不能为空") - + try: - if settings.DATABASE_TYPE == "mysql": - query_sql = """ - SELECT - c.column_name AS column_name, - c.column_comment AS column_comment, - c.column_type AS column_type, - c.character_maximum_length AS column_length, - c.column_default AS column_default, - c.ordinal_position AS sort, - (CASE WHEN c.column_key = 'PRI' THEN 1 ELSE 0 END) AS is_pk, - (CASE WHEN c.extra = 'auto_increment' THEN 1 ELSE 0 END) AS is_increment, - (CASE WHEN (c.is_nullable = 'NO' AND c.column_key != 'PRI') THEN 1 ELSE 0 END) AS is_nullable, - (CASE - WHEN c.column_name IN ( - SELECT k.column_name - FROM information_schema.key_column_usage k - JOIN information_schema.table_constraints t - ON k.constraint_name = t.constraint_name - WHERE k.table_schema = c.table_schema - AND k.table_name = c.table_name - AND t.constraint_type = 'UNIQUE' - ) THEN 1 ELSE 0 - END) AS is_unique - FROM - information_schema.columns c - WHERE c.table_schema = (SELECT DATABASE()) - AND c.table_name = :table_name - ORDER BY - c.ordinal_position - """ - else: - query_sql = """ - SELECT - c.column_name AS column_name, - COALESCE(pgd.description, '') AS column_comment, - c.udt_name AS column_type, - c.character_maximum_length AS column_length, - c.column_default AS column_default, - c.ordinal_position AS sort, - (CASE WHEN EXISTS ( - SELECT 1 FROM information_schema.table_constraints tc - JOIN information_schema.constraint_column_usage ccu ON tc.constraint_name = ccu.constraint_name - WHERE tc.table_name = c.table_name - AND tc.constraint_type = 'PRIMARY KEY' - AND ccu.column_name = c.column_name - ) THEN 1 ELSE 0 END) AS is_pk, - (CASE WHEN c.column_default LIKE 'nextval%' THEN 1 ELSE 0 END) AS is_increment, - (CASE WHEN c.is_nullable = 'NO' THEN 1 ELSE 0 END) AS is_nullable, - (CASE WHEN EXISTS ( - SELECT 1 FROM information_schema.table_constraints tc - JOIN information_schema.constraint_column_usage ccu ON tc.constraint_name = ccu.constraint_name - WHERE tc.table_name = c.table_name - AND tc.constraint_type = 'UNIQUE' - AND ccu.column_name = c.column_name - ) THEN 1 ELSE 0 END) AS is_unique - FROM - information_schema.columns c - LEFT JOIN pg_catalog.pg_description pgd ON - pgd.objoid = (SELECT oid FROM pg_class WHERE relname = c.table_name) - AND pgd.objsubid = c.ordinal_position - WHERE c.table_catalog = current_database() - AND c.table_schema = 'public' - AND c.table_name = :table_name - ORDER BY - c.ordinal_position - """ - - query = text(query_sql).bindparams(table_name=table_name) - result = await self.auth.db.execute(query) - rows = result.fetchall() if result else [] - - # 确保rows是可迭代对象 - if not rows: - return [] + # 使用run_sync包装同步的inspect操作 + columns_info = await self.auth.db.run_sync( + GenTableColumnCRUD._sync_get_table_columns, + self.auth.db.get_bind(), + settings.DATABASE_TYPE, + table_name + ) + # 转换为GenTableColumnOutSchema对象列表 columns_list = [] - for row in rows: - # 防御性编程:检查row是否有足够的元素 - if len(row) >= 10: - columns_list.append( - GenTableColumnOutSchema( - column_name=row[0], - column_comment=row[1], - column_type=row[2], - column_length=str(row[3]) if row[3] is not None else '', - column_default=str(row[4]) if row[4] is not None else '', - sort=row[5], - is_pk=row[6], - is_increment=row[7], - is_nullable=row[8], - is_unique=row[9], - ) - ) + for column_info in columns_info: + columns_list.append(GenTableColumnOutSchema(**column_info)) + return columns_list except Exception as e: log.error(f"获取表{table_name}的字段列表时出错: {str(e)}") diff --git a/backend/app/api/v1/module_generator/gencode/service.py b/backend/app/api/v1/module_generator/gencode/service.py index 51bc1329bfd9c3c2cba1433efd43b814a85560cc..2586b1ead04b3a9d31eee727c5fcd7337f64b435 100644 --- a/backend/app/api/v1/module_generator/gencode/service.py +++ b/backend/app/api/v1/module_generator/gencode/service.py @@ -192,36 +192,113 @@ class GenTableService: """ 执行菜单 SQL(INSERT / DO 块)并写入 sys_menu。 - 仅处理菜单 SQL,不再混杂建表逻辑; - - 文件不存在时给出友好提示; - - 统一异常信息,日志与业务提示分离。 + - 直接使用 SQLAlchemy ORM 插入数据; + - Todo: 如果有了菜单,需要清除后再创建。比对 uuid ? """ - sql_path = f'{BASE_DIR}/sql/menu/{gen_table.module_name}/{gen_table.business_name}.sql' - - # 文件存在性前置检查,避免多余解析开销 - if not os.path.isfile(sql_path): - raise CustomException(msg=f'菜单 SQL 文件不存在: {sql_path}') - - sql = Path(sql_path).read_text(encoding='utf-8').strip() - if not sql: - raise CustomException(msg='菜单 SQL 文件内容为空') - - # 仅做语法校验,不限制关键字;真正的语义安全由数据库权限控制 - try: - statements = sqlglot_parse(sql, dialect=settings.DATABASE_TYPE) - if not statements: - raise CustomException(msg='菜单 SQL 语法解析失败,请检查文件内容') - except Exception as e: - log.error(f'菜单 SQL 解析异常: {e}') - raise CustomException(msg='菜单 SQL 语法错误,请检查文件内容') - - # 执行 SQL - try: - await GenTableCRUD(auth).execute_sql(sql) - log.info(f'成功执行菜单 SQL: {sql_path}') - return True - except Exception as e: - log.error(f'菜单 SQL 执行失败: {e}') - raise CustomException(msg='菜单 SQL 执行失败,请确认语句及数据库状态') + from app.api.v1.module_system.menu.crud import MenuCRUD + from app.api.v1.module_system.menu.schema import MenuCreateSchema + from app.utils.common_util import CamelCaseUtil + + # 构建权限前缀 + permission_prefix = f"{gen_table.module_name}:{gen_table.business_name}" + + # 创建菜单 CRUD 实例 + menu_crud = MenuCRUD(auth) + + # 创建父菜单(类型=2:菜单) + parent_menu = await menu_crud.create( + MenuCreateSchema( + name=gen_table.function_name, + type=2, + order=9999, + permission=f"{permission_prefix}:query", + icon="menu", + route_name=CamelCaseUtil.snake_to_camel(gen_table.business_name), + route_path=f"/{gen_table.module_name}/{gen_table.business_name}", + component_path=f"{gen_table.module_name}/{gen_table.business_name}/index", + redirect=None, + hidden=False, + keep_alive=True, + always_show=False, + title=gen_table.function_name, + params=None, + affix=False, + parent_id=gen_table.parent_menu_id if gen_table.parent_menu_id is not None else 7, + status="0", + description=f"{gen_table.function_name}菜单" + ) + ) + + # 创建按钮权限(类型=3:按钮/权限) + buttons = [ + { + "name": f"{gen_table.function_name}查询", + "permission": f"{permission_prefix}:query", + "order": 1 + }, + { + "name": f"{gen_table.function_name}新增", + "permission": f"{permission_prefix}:create", + "order": 2 + }, + { + "name": f"{gen_table.function_name}修改", + "permission": f"{permission_prefix}:update", + "order": 3 + }, + { + "name": f"{gen_table.function_name}删除", + "permission": f"{permission_prefix}:delete", + "order": 4 + }, + { + "name": f"{gen_table.function_name}导出", + "permission": f"{permission_prefix}:export", + "order": 5 + }, + { + "name": f"{gen_table.function_name}导入", + "permission": f"{permission_prefix}:import", + "order": 6 + }, + { + "name": f"{gen_table.function_name}批量状态修改", + "permission": f"{permission_prefix}:patch", + "order": 7 + }, + { + "name": f"{gen_table.function_name}下载导入模板", + "permission": f"{permission_prefix}:download", + "order": 8 + } + ] + + for button in buttons: + await menu_crud.create( + MenuCreateSchema( + name=button["name"], + type=3, + order=button["order"], + permission=button["permission"], + icon=None, + route_name=None, + route_path=None, + component_path=None, + redirect=None, + hidden=False, + keep_alive=True, + always_show=False, + title=button["name"], + params=None, + affix=False, + parent_id=parent_menu.id, + status="0", + description=f"{gen_table.function_name}菜单" + ) + ) + + log.info(f"成功创建{gen_table.function_name}菜单及按钮权限") + return True @classmethod def __is_valid_create_table(cls, sql_statements: list[Expression | None]) -> bool: diff --git a/backend/app/config/setting.py b/backend/app/config/setting.py index 3b3be36a22cd2d34cce4edd4e0f83fc9dda45e8e..974b7aec61124ee308f018be95e5eff478d04a5f 100755 --- a/backend/app/config/setting.py +++ b/backend/app/config/setting.py @@ -35,6 +35,7 @@ class Settings(BaseSettings): # ******************* API文档配置 ****************** # # ================================================= # DEBUG: bool = True # 调试模式 + LOGGER_LEVEL: str = 'DEBUG' # 日志级别 DEBUG、INFO、WARNING、ERROR、CRITICAL TITLE: str = "🎉 FastapiAdmin 🎉 -dev" # 文档标题 VERSION: str = '0.1.0' # 版本号 DESCRIPTION: str = "该项目是一个基于python的web服务框架,基于fastapi和sqlalchemy实现。" # 文档描述 diff --git a/backend/app/core/database.py b/backend/app/core/database.py index 97c26d07ebfd4ff4043bcbbd20ae69ef0677eb94..3e8cd0f460c4bffe32878418889225d184a77888 100644 --- a/backend/app/core/database.py +++ b/backend/app/core/database.py @@ -55,18 +55,28 @@ def create_async_engine_and_session( if not settings.SQL_DB_ENABLE: raise CustomException(msg="请先开启数据库连接", data="请启用 app/config/setting.py: SQL_DB_ENABLE") # 异步数据库引擎 - async_engine: AsyncEngine = create_async_engine( - url=db_url, - echo=settings.DATABASE_ECHO, - echo_pool=settings.ECHO_POOL, - pool_pre_ping=settings.POOL_PRE_PING, - future=settings.FUTURE, - pool_recycle=settings.POOL_RECYCLE, - pool_size=settings.POOL_SIZE, - max_overflow=settings.MAX_OVERFLOW, - pool_timeout=settings.POOL_TIMEOUT, - pool_use_lifo=settings.POOL_USE_LIFO, - ) + if settings.DATABASE_TYPE == 'sqlite': + async_engine: AsyncEngine = create_async_engine( + url=db_url, + echo=settings.DATABASE_ECHO, + echo_pool=settings.ECHO_POOL, + pool_pre_ping=settings.POOL_PRE_PING, + future=settings.FUTURE, + pool_recycle=settings.POOL_RECYCLE, + ) + else: + async_engine: AsyncEngine = create_async_engine( + url=db_url, + echo=settings.DATABASE_ECHO, + echo_pool=settings.ECHO_POOL, + pool_pre_ping=settings.POOL_PRE_PING, + future=settings.FUTURE, + pool_recycle=settings.POOL_RECYCLE, + pool_size=settings.POOL_SIZE, + max_overflow=settings.MAX_OVERFLOW, + pool_timeout=settings.POOL_TIMEOUT, + pool_use_lifo=settings.POOL_USE_LIFO, + ) except Exception as e: log.error(f'❌ 数据库连接失败 {e}') raise diff --git a/backend/app/core/logger.py b/backend/app/core/logger.py index 197f6112fffc29fcc1d5d1ed0a04e2d46e96ae23..23f16bbadf6335f2b3643de4ef4db2ef3862d30e 100644 --- a/backend/app/core/logger.py +++ b/backend/app/core/logger.py @@ -95,7 +95,7 @@ def setup_logging(): handler_id = logger.add( sys.stdout, format=log_format, - level="DEBUG" if settings.DEBUG else "INFO", + level=settings.LOGGER_LEVEL, enqueue=use_async, # 开发同步,生产异步 backtrace=True, # 显示完整的异常回溯 diagnose=True, # 显示变量值等诊断信息