| | """ |
| | Nexus-Nano Search Engine |
| | Fast alpha-beta with minimal overhead |
| | |
| | Focus: Speed > Depth |
| | Target: Sub-second responses |
| | """ |
| |
|
| | import chess |
| | import logging |
| | from typing import Optional, Tuple, List, Dict |
| |
|
| | from .evaluate import NexusNanoEvaluator |
| | from .transposition import TranspositionTable, NodeType |
| | from .move_ordering import MoveOrderer |
| | from .time_manager import TimeManager |
| | from .endgame import EndgameDetector |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class NexusNanoEngine: |
| | """Ultra-fast 2.8M parameter chess engine""" |
| | |
| | MATE_SCORE = 100000 |
| | MAX_PLY = 100 |
| | |
| | def __init__(self, model_path: str, num_threads: int = 1): |
| | """Initialize with single-threaded config""" |
| | |
| | self.evaluator = NexusNanoEvaluator(model_path, num_threads) |
| | self.tt = TranspositionTable(size_mb=64) |
| | self.move_orderer = MoveOrderer() |
| | self.time_manager = TimeManager() |
| | self.endgame_detector = EndgameDetector() |
| | |
| | self.nodes_evaluated = 0 |
| | self.depth_reached = 0 |
| | self.sel_depth = 0 |
| | self.principal_variation = [] |
| | |
| | logger.info("⚡ Nexus-Nano Engine initialized") |
| | logger.info(f" Model: {self.evaluator.get_model_size_mb():.2f} MB") |
| | logger.info(f" TT: 64 MB") |
| | |
| | def get_best_move( |
| | self, |
| | fen: str, |
| | depth: int = 4, |
| | time_limit: int = 2000 |
| | ) -> Dict: |
| | """ |
| | Fast move search |
| | |
| | Args: |
| | fen: Position |
| | depth: Max depth (1-6 recommended) |
| | time_limit: Time in ms |
| | """ |
| | |
| | board = chess.Board(fen) |
| | |
| | |
| | self.nodes_evaluated = 0 |
| | self.depth_reached = 0 |
| | self.sel_depth = 0 |
| | self.principal_variation = [] |
| | |
| | |
| | time_limit_sec = time_limit / 1000.0 |
| | self.time_manager.start_search(time_limit_sec, time_limit_sec) |
| | |
| | |
| | self.move_orderer.clear() |
| | self.tt.increment_age() |
| | |
| | |
| | legal_moves = list(board.legal_moves) |
| | |
| | if len(legal_moves) == 0: |
| | return self._no_legal_moves() |
| | |
| | if len(legal_moves) == 1: |
| | return self._single_move(board, legal_moves[0]) |
| | |
| | |
| | best_move = legal_moves[0] |
| | best_score = float('-inf') |
| | |
| | for current_depth in range(1, depth + 1): |
| | if self.time_manager.should_stop(current_depth): |
| | break |
| | |
| | score, move, pv = self._search_root( |
| | board, current_depth, float('-inf'), float('inf') |
| | ) |
| | |
| | if move: |
| | best_move = move |
| | best_score = score |
| | self.depth_reached = current_depth |
| | self.principal_variation = pv |
| | |
| | return { |
| | 'best_move': best_move.uci(), |
| | 'evaluation': round(best_score / 100.0, 2), |
| | 'depth_searched': self.depth_reached, |
| | 'seldepth': self.sel_depth, |
| | 'nodes_evaluated': self.nodes_evaluated, |
| | 'time_taken': int(self.time_manager.elapsed() * 1000), |
| | 'pv': [m.uci() for m in self.principal_variation], |
| | 'nps': int(self.nodes_evaluated / max(self.time_manager.elapsed(), 0.001)), |
| | 'tt_stats': self.tt.get_stats(), |
| | 'move_ordering_stats': self.move_orderer.get_stats() |
| | } |
| | |
| | def _search_root( |
| | self, |
| | board: chess.Board, |
| | depth: int, |
| | alpha: float, |
| | beta: float |
| | ) -> Tuple[float, Optional[chess.Move], List[chess.Move]]: |
| | """Root search""" |
| | |
| | legal_moves = list(board.legal_moves) |
| | |
| | |
| | zobrist_key = self.tt.compute_zobrist_key(board) |
| | tt_result = self.tt.probe(zobrist_key, depth, alpha, beta) |
| | tt_move = tt_result[1] if tt_result else None |
| | |
| | |
| | ordered_moves = self.move_orderer.order_moves( |
| | board, legal_moves, depth, tt_move |
| | ) |
| | |
| | best_move = ordered_moves[0] |
| | best_score = float('-inf') |
| | best_pv = [] |
| | |
| | for move in ordered_moves: |
| | board.push(move) |
| | score, pv = self._alpha_beta(board, depth - 1, -beta, -alpha) |
| | score = -score |
| | board.pop() |
| | |
| | if score > best_score: |
| | best_score = score |
| | best_move = move |
| | best_pv = [move] + pv |
| | |
| | if score > alpha: |
| | alpha = score |
| | |
| | if self.time_manager.should_stop(depth): |
| | break |
| | |
| | self.tt.store(zobrist_key, depth, best_score, NodeType.EXACT, best_move) |
| | |
| | return best_score, best_move, best_pv |
| | |
| | def _alpha_beta( |
| | self, |
| | board: chess.Board, |
| | depth: int, |
| | alpha: float, |
| | beta: float |
| | ) -> Tuple[float, List[chess.Move]]: |
| | """Fast alpha-beta search""" |
| | |
| | self.sel_depth = max(self.sel_depth, self.MAX_PLY - depth) |
| | |
| | |
| | if board.is_repetition(2) or board.is_fifty_moves(): |
| | return 0, [] |
| | |
| | |
| | zobrist_key = self.tt.compute_zobrist_key(board) |
| | tt_result = self.tt.probe(zobrist_key, depth, alpha, beta) |
| | |
| | if tt_result and tt_result[0] is not None: |
| | return tt_result[0], [] |
| | |
| | tt_move = tt_result[1] if tt_result else None |
| | |
| | |
| | if depth <= 0: |
| | return self._quiescence(board, alpha, beta, 0), [] |
| | |
| | |
| | legal_moves = list(board.legal_moves) |
| | if not legal_moves: |
| | if board.is_check(): |
| | return -self.MATE_SCORE + (self.MAX_PLY - depth), [] |
| | return 0, [] |
| | |
| | ordered_moves = self.move_orderer.order_moves( |
| | board, legal_moves, depth, tt_move |
| | ) |
| | |
| | |
| | best_score = float('-inf') |
| | best_pv = [] |
| | node_type = NodeType.UPPER_BOUND |
| | |
| | for move in ordered_moves: |
| | board.push(move) |
| | score, pv = self._alpha_beta(board, depth - 1, -beta, -alpha) |
| | score = -score |
| | board.pop() |
| | |
| | if score > best_score: |
| | best_score = score |
| | best_pv = [move] + pv |
| | |
| | if score > alpha: |
| | alpha = score |
| | node_type = NodeType.EXACT |
| | |
| | if not board.is_capture(move): |
| | self.move_orderer.update_killer_move(move, depth) |
| | |
| | if score >= beta: |
| | node_type = NodeType.LOWER_BOUND |
| | break |
| | |
| | self.tt.store(zobrist_key, depth, best_score, node_type, best_pv[0] if best_pv else None) |
| | |
| | return best_score, best_pv |
| | |
| | def _quiescence( |
| | self, |
| | board: chess.Board, |
| | alpha: float, |
| | beta: float, |
| | qs_depth: int |
| | ) -> float: |
| | """Fast quiescence (captures only)""" |
| | |
| | self.nodes_evaluated += 1 |
| | |
| | |
| | stand_pat = self.evaluator.evaluate_hybrid(board) |
| | stand_pat = self.endgame_detector.adjust_evaluation(board, stand_pat) |
| | |
| | if stand_pat >= beta: |
| | return beta |
| | if alpha < stand_pat: |
| | alpha = stand_pat |
| | |
| | |
| | if qs_depth >= 6: |
| | return stand_pat |
| | |
| | |
| | captures = [m for m in board.legal_moves if board.is_capture(m)] |
| | |
| | if not captures: |
| | return stand_pat |
| | |
| | captures = self.move_orderer.order_moves(board, captures, 0) |
| | |
| | for move in captures: |
| | board.push(move) |
| | score = -self._quiescence(board, -beta, -alpha, qs_depth + 1) |
| | board.pop() |
| | |
| | if score >= beta: |
| | return beta |
| | if score > alpha: |
| | alpha = score |
| | |
| | return alpha |
| | |
| | def _no_legal_moves(self) -> Dict: |
| | return { |
| | 'best_move': '0000', |
| | 'evaluation': 0.0, |
| | 'depth_searched': 0, |
| | 'nodes_evaluated': 0, |
| | 'time_taken': 0 |
| | } |
| | |
| | def _single_move(self, board: chess.Board, move: chess.Move) -> Dict: |
| | eval_score = self.evaluator.evaluate_hybrid(board) |
| | |
| | return { |
| | 'best_move': move.uci(), |
| | 'evaluation': round(eval_score / 100.0, 2), |
| | 'depth_searched': 0, |
| | 'nodes_evaluated': 1, |
| | 'time_taken': 0, |
| | 'pv': [move.uci()] |
| | } |
| | |
| | def validate_fen(self, fen: str) -> bool: |
| | try: |
| | chess.Board(fen) |
| | return True |
| | except: |
| | return False |
| | |
| | def get_model_size(self) -> float: |
| | return self.evaluator.get_model_size_mb() |