# """ # Snowflake ID 生成器,适配三台 MySQL 服务器(IP 尾号 251/252/253)。 # """ # import os # import socket # import threading # import time # import warnings # from typing import Dict, Optional # # # Snowflake 位分配:41-bit 时间戳 | 5-bit 机房 | 5-bit 节点 | 12-bit 自增序列 # EPOCH_MS = 1704067200000 # 2024-01-01 00:00:00 UTC # WORKER_ID_BITS = 5 # DATACENTER_ID_BITS = 5 # SEQUENCE_BITS = 12 # # MAX_WORKER_ID = (1 << WORKER_ID_BITS) - 1 # MAX_DATACENTER_ID = (1 << DATACENTER_ID_BITS) - 1 # SEQUENCE_MASK = (1 << SEQUENCE_BITS) - 1 # # WORKER_SHIFT = SEQUENCE_BITS # DATACENTER_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS # TIMESTAMP_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS # # # 默认映射:将三台 MySQL 服务器 IP(或尾段)映射到 worker_id # # DEFAULT_IP_WORKER_MAP: Dict[str, int] = { # # "251": 1, # # "252": 2, # # "253": 3, # # } # # # # 单机 # DEFAULT_IP_WORKER_MAP: Dict[str, int] = { # "168": 1, # } # # # def _now_ms() -> int: # return int(time.time() * 1000) # # # def _wait_next_ms(last_ts: int) -> int: # ts = _now_ms() # while ts <= last_ts: # ts = _now_ms() # return ts # # # def _get_host_ip() -> str: # """获取当前主机的主 IP 地址。""" # s = None # try: # s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # # 连接一个无需真实可达的地址,只为获取本机出站 IP # s.connect(("10.255.255.255", 1)) # ip = s.getsockname()[0] # except Exception: # ip = socket.gethostbyname(socket.gethostname()) # finally: # if s is not None: # s.close() # return ip # # # def resolve_worker_id( # ip_worker_map: Optional[Dict[str, int]] = None, # override_ip: Optional[str] = None, # ) -> int: # """ # 根据主机 IP 推导 worker_id。 # - 优先读取环境变量 SNOWFLAKE_WORKER_IP 作为 IP。 # - 其次使用 override_ip。 # - 再次使用本机 IP。 # - 支持完整 IP 匹配和末段匹配(如 "192.168.0.251" 或 "251")。 # """ # ip_worker_map = ip_worker_map or DEFAULT_IP_WORKER_MAP # chosen_ip = os.getenv("SNOWFLAKE_WORKER_IP") or override_ip or _get_host_ip() # if chosen_ip in ip_worker_map: # return ip_worker_map[chosen_ip] # last_octet = chosen_ip.split(".")[-1] # if last_octet in ip_worker_map: # return ip_worker_map[last_octet] # raise ValueError( # f"未能为 IP {chosen_ip} 匹配到 worker_id,请检查映射或设置 SNOWFLAKE_WORKER_IP。" # ) # # # class SnowflakeGenerator: # """ # 线程安全的 Snowflake ID 生成器。 # 默认 datacenter_id=0;worker_id 将根据 IP 映射推导。 # """ # # def __init__( # self, # datacenter_id: int = 0, # worker_id: Optional[int] = None, # epoch_ms: int = EPOCH_MS, # ip_worker_map: Optional[Dict[str, int]] = None, # ): # self.datacenter_id = datacenter_id # self.worker_id = worker_id or resolve_worker_id(ip_worker_map) # self.epoch_ms = epoch_ms # # if not 0 <= self.worker_id <= MAX_WORKER_ID: # raise ValueError(f"worker_id 必须在 0-{MAX_WORKER_ID} 之间") # if not 0 <= self.datacenter_id <= MAX_DATACENTER_ID: # raise ValueError(f"datacenter_id 必须在 0-{MAX_DATACENTER_ID} 之间") # # self.sequence = 0 # self.last_timestamp = -1 # self._lock = threading.Lock() # # def next_id(self) -> int: # """生成下一个全局唯一 ID。""" # with self._lock: # timestamp = _now_ms() # # if timestamp < self.last_timestamp: # # 时钟回拨保护 # raise RuntimeError("检测到系统时钟回拨,停止发号。") # # if timestamp == self.last_timestamp: # self.sequence = (self.sequence + 1) & SEQUENCE_MASK # if self.sequence == 0: # timestamp = _wait_next_ms(self.last_timestamp) # else: # self.sequence = 0 # # self.last_timestamp = timestamp # return ( # ((timestamp - self.epoch_ms) << TIMESTAMP_SHIFT) # | (self.datacenter_id << DATACENTER_SHIFT) # | (self.worker_id << WORKER_SHIFT) # | self.sequence # ) # # # # 默认生成器:datacenter_id=0,worker_id 按 IP 映射自动推导 # def _build_default_generator() -> SnowflakeGenerator: # try: # return SnowflakeGenerator() # except ValueError as exc: # warnings.warn( # f"Snowflake 默认生成器未找到匹配的 IP,退回 worker_id=0。详情:{exc}", # RuntimeWarning, # ) # return SnowflakeGenerator(worker_id=0) # # # default_generator = _build_default_generator() # # # def next_id() -> int: # """便捷函数,返回默认生成器的下一个 ID。""" # return default_generator.next_id() # # #