tts3 / app.py
Opera8's picture
Update app.py
81590e8 verified
import os
import uuid
import requests
import threading
import time
import sqlite3
import tempfile
import ffmpeg
import json
from datetime import datetime
from flask import Flask, request, jsonify, render_template, send_from_directory, send_file, after_this_request, Response
from deep_translator import GoogleTranslator
app = Flask(__name__)
# --- مپینگ‌های کمکی برای مانیتورینگ زنده اکشن‌ها ---
RUN_ACCOUNT_MAPPING = {}
LAST_RUN_CHECK = {}
# --- پیکربندی حساب‌های گیت‌هاب (سیستم توزیع بار یا Load Balancer) ---
GITHUB_ACCOUNTS_CONFIG = [
{"repo": "Hamed744/vid-test", "token_env": "GITHUB_TOKEN1"},
{"repo": "hajiliker4-cpu/a", "token_env": "GITHUB_TOKEN3"},
{"repo": "asrasahar744-ops/a", "token_env": "GITHUB_TOKEN4"},
{"repo": "elias207207/a", "token_env": "GITHUB_TOKEN5"},
{"repo": "mobin207207/a", "token_env": "GITHUB_TOKEN6"},
]
GITHUB_ACCOUNTS = []
for config in GITHUB_ACCOUNTS_CONFIG:
token = os.environ.get(config["token_env"])
if token:
GITHUB_ACCOUNTS.append({
"repo": config["repo"],
"token": token,
"token_env": config["token_env"]
})
print(f"✅ تعداد {len(GITHUB_ACCOUNTS)} حساب گیت‌هاب با موفقیت شناسایی و به سیستم متصل شد.", flush=True)
for i, acc in enumerate(GITHUB_ACCOUNTS):
token_masked = acc['token'][:4] + "..." + acc['token'][-4:] if len(acc['token']) > 8 else "***"
print(f" 🔹 حساب {i+1}: مخزن {acc['repo']} متصل با متغیر {acc['token_env']} ({token_masked})", flush=True)
account_lock = threading.Lock()
account_index = 0
def get_next_account():
global account_index
if not GITHUB_ACCOUNTS:
return None
with account_lock:
acc = GITHUB_ACCOUNTS[account_index % len(GITHUB_ACCOUNTS)]
account_index += 1
return acc
SPACE_HOST = os.environ.get("SPACE_HOST", "")
SPACE_URL = f"https://{SPACE_HOST}"
# --- کلمات ممنوعه ---
FORBIDDEN_KEYWORDS = {"sex", "sexy", "porn", "nude", "erotic", "سکس", "سکسی", "پورن", "شهوانی", "برهنه", "لخت"}
# --- تنظیمات دیتابیس SQLite ---
DB_DIR = "/data" if os.path.exists("/data") else "."
DB_PATH = os.path.join(DB_DIR, "users.db")
TEMP_DIR = "tmp"
USAGE_LIMIT = 5
ONE_WEEK_SECONDS = 7 * 24 * 60 * 60
ONE_DAY_SECONDS = 24 * 60 * 60
DB_IMAGE_PATH = os.path.join(DB_DIR, "users_image.db")
IMAGE_USAGE_LIMIT = 5
DB_IMAGE_EDIT_PATH = os.path.join(DB_DIR, "users_image_edit.db")
EDIT_USAGE_LIMIT = 5
DB_CHAT_QUEUE_PATH = os.path.join(DB_DIR, "chat_queue.db")
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs("static/images", exist_ok=True)
os.makedirs("yml", exist_ok=True)
# --- سیستم پاکسازی خودکار فایل‌های قدیمی ---
def cleanup_old_files():
while True:
time.sleep(3600)
now = time.time()
max_age_seconds = 24 * 60 * 60
for folder in ["static/images", "tmp"]:
if os.path.exists(folder):
for filename in os.listdir(folder):
file_path = os.path.join(folder, filename)
if os.path.isfile(file_path):
if os.stat(file_path).st_mtime < now - max_age_seconds:
try:
os.remove(file_path)
except Exception:
pass
threading.Thread(target=cleanup_old_files, daemon=True).start()
# تابع تولید کارت تصویری خطا
def create_error_image(run_id, message):
try:
from PIL import Image, ImageDraw
img = Image.new('RGB', (800, 500), color='#1e293b')
draw = ImageDraw.Draw(img)
draw.rectangle([20, 20, 780, 480], outline="#f43f5e", width=4)
draw.text((50, 60), "⚠️ PROCESS ERROR / خطای اجرای فرآیند", fill="#f43f5e")
draw.text((50, 110), f"Run ID: {run_id}", fill="#94a3b8")
y_offset = 160
for line in message.split('\n'):
draw.text((50, y_offset), line, fill="#f1f5f9")
y_offset += 40
draw.text((50, y_offset + 30), "برای مشاهده لاگ‌ها، به تب Actions در مخزن گیت هاب خود بروید.", fill="#38bdf8")
img.save(f"static/images/{run_id}.png")
print(f"🎨 [کارت خطا] کارت تصویری خطا با موفقیت در static/images/{run_id}.png ایجاد شد.", flush=True)
except Exception as e:
print(f"Failed to generate error image: {e}", flush=True)
# سیستم مانیتورینگ زنده (تنها برای ثبت لاگ در کنسول ادمین و بدون دخالت در رکوئست‌ها)
def check_github_run_status_and_log(run_id):
acc = RUN_ACCOUNT_MAPPING.get(run_id)
if not acc:
acc = get_next_account()
if not acc:
return
headers = {
"Accept": "application/vnd.github.v3+json",
"Authorization": f"token {acc['token']}"
}
# ۱. بررسی مشخصات مخزن
try:
repo_url = f"https://api.github.com/repos/{acc['repo']}"
repo_r = requests.get(repo_url, headers=headers, timeout=10)
if repo_r.status_code == 200:
repo_info = repo_r.json()
default_branch = repo_info.get("default_branch", "نامشخص")
is_private = repo_info.get("private", False)
print(f" ℹ️ [مشخصات مخزن] شاخه پیش‌فرض گیت‌هاب: {default_branch} | نوع مخزن: {'خصوصی (Private)' if is_private else 'عمومی (Public)'}", flush=True)
except Exception as e:
print(f" ⚠️ [مانیتور مخزن] خطا در ارتباط با گیت‌هاب: {e}", flush=True)
# ۲. بررسی آخرین وضعیت اجراها (تنها جهت ثبت لاگ در ترمینال ادمین)
url = f"https://api.github.com/repos/{acc['repo']}/actions/runs?per_page=5"
try:
r = requests.get(url, headers=headers, timeout=10)
if r.status_code == 200:
runs_data = r.json()
runs = runs_data.get('workflow_runs', [])
total_runs = runs_data.get('total_count', 0)
print(f"🔍 [رهگیری زنده اکشنز] بررسی مخزن {acc['repo']} برای شناسه فرآیند {run_id[:8]}... تعداد کل اجراهای یافت شده: {total_runs}", flush=True)
for run in runs:
created_at_dt = datetime.strptime(run['created_at'], "%Y-%m-%dT%H:%M:%SZ")
time_diff = (datetime.utcnow() - created_at_dt).total_seconds()
print(f" 🔸 اکشن شماره {run['id']} | نام: {run['name']} | وضعیت: {run['status']} | نتیجه: {run.get('conclusion')} | زمان ایجاد: {run['created_at']} ({int(time_diff)} ثانیه پیش)", flush=True)
else:
print(f" ⚠️ [مانیتورینگ اکشنز] دریافت وضعیت با خطای API مواجه شد: {r.status_code}", flush=True)
except Exception as e:
print(f" ⚠️ [خطای سیستم مانیتورینگ] خطا در دریافت لیست فرآیندها: {e}", flush=True)
# ایجاد خودکار اسکریپت‌های مورد نیاز
def write_default_scripts():
scripts = {
"ghibli.py": """
import os, sys, requests, base64
from gradio_client import Client, handle_file
raw_prompt = os.environ.get('PROMPT', '')
run_id = os.environ.get('RUN_ID', '')
space_url = os.environ.get('SPACE_URL', '')
github_run_id = os.environ.get('GITHUB_RUN_ID', '')
def report_failure(error_msg):
try:
requests.post(
f"{space_url}/api/webhook/fail",
json={
"run_id": run_id,
"error": error_msg,
"event_type": "ghibli",
"client_payload": {
"prompt": raw_prompt,
"run_id": run_id,
"space_url": space_url
},
"github_run_id": github_run_id
},
timeout=15
)
except Exception as e:
print(f"Failed to report failure: {e}")
print('1. Decoding packed configurations from safe payload...')
if not raw_prompt.startswith("GHIBLICONFIG_"):
err_str = "Error: Invalid ghibli configuration payload signature."
print(err_str)
report_failure(err_str)
sys.exit(1)
config_str = raw_prompt[len("GHIBLICONFIG_"):]
parts = config_str.split("_")
config = {}
i = 0
while i < len(parts) - 1:
key = parts[i]
val = parts[i+1]
config[key] = val
i += 2
user_run_id = config.get("userRunId", run_id)
img_ext = config.get("imgExt", "png")
try:
b64_style = config.get("style", "")
style = base64.b64decode(b64_style).decode('utf-8')
except Exception as e:
style = "Studio Ghibli"
img_url = f"{space_url}/static/images/{user_run_id}_ghibli_img.{img_ext}"
local_img = f"input_img.{img_ext}"
print('2. Downloading reference image from your Space...')
try:
req_img = requests.get(img_url, timeout=45)
if req_img.status_code != 200:
raise Exception(f"Image download failed. Status code: {req_img.status_code}")
with open(local_img, 'wb') as f:
f.write(req_img.content)
except Exception as download_err:
err_str = f"Error downloading source files: {download_err}"
print(err_str)
report_failure(err_str)
sys.exit(1)
print('3. Connecting to Yuanshi/OminiControl_Art Space...')
try:
client = Client("Yuanshi/OminiControl_Art")
result = client.predict(
style,
handle_file(local_img),
"High Quality",
1.5,
"Auto",
True,
42,
20,
fn_index=7
)
image_path = None
if isinstance(result, list) and len(result) > 0:
image_path = result[0].get('image') or result[0].get('path') if isinstance(result[0], dict) else result[0]
elif isinstance(result, tuple) and len(result) > 0:
image_path = result[0]
elif isinstance(result, dict):
image_path = result.get('image') or result.get('path')
else:
image_path = result
if not image_path or not os.path.exists(str(image_path)):
raise Exception("Generated output image file not found or invalid.")
with open(image_path, 'rb') as f:
res_upload = requests.post(
f'{space_url}/api/webhook/upload',
data={'run_id': run_id, 'github_run_id': github_run_id, 'ext': 'webp'},
files={'file': f}
)
if res_upload.status_code == 200:
print('SUCCESS!')
else:
sys.exit(1)
except Exception as e:
err_str = str(e)
report_failure(err_str)
sys.exit(1)
""",
"generate-flux.py": """
import os, sys, random, requests
from gradio_client import Client
def get_env_int(name, default=0):
val = os.environ.get(name, '')
try:
return int(val) if val.strip() else default
except ValueError:
return default
prompt = os.environ.get('PROMPT', '')
width = get_env_int('WIDTH', 1024)
height = get_env_int('HEIGHT', 1024)
run_id = os.environ.get('RUN_ID')
space_url = os.environ.get('SPACE_URL')
github_run_id = os.environ.get('GITHUB_RUN_ID')
def report_failure(error_msg):
try:
requests.post(
f"{space_url}/api/webhook/fail",
json={
"run_id": run_id,
"error": error_msg,
"event_type": "generate-flux",
"client_payload": {
"prompt": prompt,
"width": width,
"height": height,
"run_id": run_id,
"space_url": space_url
},
"github_run_id": github_run_id
},
timeout=15
)
except Exception as e:
print(f"Failed to report failure: {e}")
print('1. Connecting to Flux...')
try:
client = Client('black-forest-labs/FLUX.1-dev')
seed = random.randint(1, 2147483647)
result = client.predict(prompt, seed, True, width, height, 3.5, 28, api_name='/infer')
image_path = result[0] if isinstance(result, (tuple, list)) else result
print(f'2. Uploading back to Docker Space...')
with open(image_path, 'rb') as f:
res = requests.post(f'{space_url}/api/webhook/upload', data={'run_id': run_id, 'github_run_id': github_run_id}, files={'file': f})
if res.status_code == 200:
print('3. SUCCESS!')
else:
sys.exit(1)
except Exception as e:
err_str = str(e)
print(f'ERROR: {err_str}')
report_failure(err_str)
sys.exit(1)
""",
"generate-flux2.py": """
import os, sys, random, requests, json
from gradio_client import Client, handle_file
def get_env_int(name, default=0):
val = os.environ.get(name, '')
try:
return int(val) if val.strip() else default
except ValueError:
return default
def get_env_float(name, default=0.0):
val = os.environ.get(name, '')
try:
return float(val) if val.strip() else default
except ValueError:
return default
prompt = os.environ.get('PROMPT', '')
image_urls_raw = os.environ.get('IMAGE_URLS', '[]')
width = get_env_int('WIDTH', 1024)
height = get_env_int('HEIGHT', 1024)
steps = get_env_int('STEPS', 30)
guidance = get_env_float('GUIDANCE', 4.0)
seed = get_env_int('SEED', 0)
randomize_seed = os.environ.get('RANDOMIZE_SEED', 'true').lower() == 'true'
prompt_upsampling = os.environ.get('PROMPT_UPSAMPLING', 'true').lower() == 'true'
run_id = os.environ.get('RUN_ID')
space_url = os.environ.get('SPACE_URL')
github_run_id = os.environ.get('GITHUB_RUN_ID')
hf_token = os.environ.get('HF_TOKEN', '')
print(f'Starting direct Flux 2 generation for run_id: {run_id}')
print('1. Parsing and downloading images...')
image_urls = []
if image_urls_raw:
try:
cleaned_json = image_urls_raw.replace('null', '').strip()
if cleaned_json and cleaned_json != '[]':
image_urls = json.loads(cleaned_json)
except Exception as e:
print(f'JSON parsing error: {e}')
def report_failure(error_msg):
try:
requests.post(
f"{space_url}/api/webhook/fail",
json={
"run_id": run_id,
"error": error_msg,
"event_type": "generate-flux2",
"client_payload": {
"data": {
"prompt": prompt,
"image_urls": image_urls,
"width": width,
"height": height,
"steps": steps,
"guidance": guidance,
"seed": seed,
"randomize_seed": randomize_seed,
"prompt_upsampling": prompt_upsampling,
"run_id": run_id,
"space_url": space_url,
"hf_token": hf_token
}
},
"github_run_id": github_run_id
},
timeout=15
)
except Exception as e:
print(f"Failed to report failure: {e}")
files_to_send = []
for idx, url in enumerate(image_urls):
if url:
print(f'Downloading input image {idx}: {url}')
ext = url.split('.')[-1] if '.' in url else 'png'
local_filename = f'input_{idx}.{ext}'
try:
req = requests.get(url, timeout=30)
with open(local_filename, 'wb') as f:
f.write(req.content)
files_to_send.append({'image': handle_file(local_filename), 'caption': None})
except Exception as e:
print(f'Failed to download {url}: {e}')
print('2. Connecting to FLUX.2-dev Space...')
try:
if hf_token:
client = Client('black-forest-labs/FLUX.2-dev', token=hf_token)
else:
client = Client('black-forest-labs/FLUX.2-dev')
if randomize_seed:
seed = random.randint(1, 2147483647)
print(f'Predicting with seed={seed}, width={width}, height={height}, steps={steps}, guidance={guidance}, upsampling={prompt_upsampling}...')
result = client.predict(
prompt,
files_to_send,
seed,
randomize_seed,
width,
height,
steps,
guidance,
prompt_upsampling,
api_name='/infer'
)
image_path = result[0] if isinstance(result, (tuple, list)) else result
seed_used = result[1] if (isinstance(result, (tuple, list)) and len(result) > 1) else seed
print('3. Uploading metadata seed back...')
with open('seed.txt', 'w') as sf:
sf.write(str(seed_used))
requests.post(
f'{space_url}/api/webhook/upload',
data={'run_id': f'{run_id}_seed', 'github_run_id': github_run_id, 'ext': 'txt'},
files={'file': open('seed.txt', 'rb')}
)
print('4. Uploading generated image back...')
with open(image_path, 'rb') as f:
upload_res = requests.post(
f'{space_url}/api/webhook/upload',
data={'run_id': run_id, 'github_run_id': github_run_id},
files={'file': f}
)
if upload_res.status_code == 200:
print('SUCCESS!')
else:
print(f'Upload failed with status {upload_res.status_code}: {upload_res.text}')
sys.exit(1)
except Exception as e:
err_str = str(e)
print(f'Generation error: {err_str}')
report_failure(err_str)
sys.exit(1)
""",
"generate-cartoon.py": """
import os, sys, random, requests
from gradio_client import Client
def get_env_int(name, default=0):
val = os.environ.get(name, '')
try:
return int(val) if val.strip() else default
except ValueError:
return default
prompt = os.environ.get('PROMPT', '')
width = get_env_int('WIDTH', 1024)
height = get_env_int('HEIGHT', 1024)
run_id = os.environ.get('RUN_ID')
space_url = os.environ.get('SPACE_URL')
github_run_id = os.environ.get('GITHUB_RUN_ID')
def report_failure(error_msg):
try:
requests.post(
f"{space_url}/api/webhook/fail",
json={
"run_id": run_id,
"error": error_msg,
"event_type": "generate-cartoon",
"client_payload": {
"prompt": prompt,
"width": width,
"height": height,
"run_id": run_id,
"space_url": space_url
},
"github_run_id": github_run_id
},
timeout=15
)
except Exception as e:
print(f"Failed to report failure: {e}")
print('1. Connecting to LoRA DLC Space...')
try:
client = Client('prithivMLmods/FLUX-LoRA-DLC')
print('2. Loading Cartoon Model (Long Toons)...')
client.predict('prithivMLmods/Flux-Long-Toon-LoRA', api_name='/add_custom_lora')
print('3. Generating Cartoon Image...')
result = client.predict(
prompt=f'Long toons {prompt}',
image_input=None,
cfg_scale=3.5,
steps=28,
randomize_seed=True,
seed=random.randint(1, 2147483647),
width=width,
height=height,
lora_scale=0.8,
api_name='/run_lora'
)
image_path = result[0] if isinstance(result, (tuple, list)) else result
print('4. Uploading back to Docker Space...')
with open(image_path, 'rb') as f:
res = requests.post(f'{space_url}/api/webhook/upload',
data={'run_id': run_id, 'github_run_id': github_run_id},
files={'file': f})
if res.status_code == 200:
print('5. SUCCESS!')
else:
sys.exit(1)
except Exception as e:
err_str = str(e)
print(f'CRITICAL ERROR: {err_str}')
report_failure(err_str)
sys.exit(1)
""",
"generate-midjourney.py": """
import os, sys, random, requests
from gradio_client import Client
def get_env_int(name, default=0):
val = os.environ.get(name, '')
try:
return int(val) if val.strip() else default
except ValueError:
return default
raw_prompt = os.environ.get('PROMPT', '')
final_prompt = f'MJ v6 {raw_prompt}'
width = get_env_int('WIDTH', 1024)
height = get_env_int('HEIGHT', 1024)
run_id = os.environ.get('RUN_ID')
space_url = os.environ.get('SPACE_URL')
github_run_id = os.environ.get('GITHUB_RUN_ID')
def report_failure(error_msg):
try:
requests.post(
f"{space_url}/api/webhook/fail",
json={
"run_id": run_id,
"error": error_msg,
"event_type": "generate-midjourney",
"client_payload": {
"prompt": raw_prompt,
"width": width,
"height": height,
"run_id": run_id,
"space_url": space_url
},
"github_run_id": github_run_id
},
timeout=15
)
except Exception as e:
print(f"Failed to report failure: {e}")
print('1. Connecting to LoRA DLC Space...')
try:
client = Client('prithivMLmods/FLUX-LoRA-DLC')
print('2. Loading Midjourney Model...')
client.predict('strangerzonehf/Flux-Midjourney-Mix2-LoRA', api_name='/add_custom_lora')
print(f'3. Generating Image... Prompt: {final_prompt[:50]}...')
result = client.predict(
prompt=final_prompt,
image_input=None,
cfg_scale=3.5,
steps=28,
randomize_seed=True,
seed=random.randint(1, 2147483647),
width=width,
height=height,
lora_scale=0.8,
api_name='/run_lora'
)
image_path = result[0] if isinstance(result, (tuple, list)) else result
print('4. Uploading back to Docker Space...')
with open(image_path, 'rb') as f:
res = requests.post(f'{space_url}/api/webhook/upload',
data={'run_id': run_id, 'github_run_id': github_run_id},
files={'file': f})
if res.status_code == 200:
print('5. SUCCESS!')
else:
sys.exit(1)
except Exception as e:
err_str = str(e)
print(f'CRITICAL ERROR: {err_str}')
report_failure(err_str)
sys.exit(1)
""",
"chat-gemma.py": """
import os, sys, requests
from gradio_client import Client, handle_file
prompt = os.environ.get('PROMPT', '')
file_url = os.environ.get('FILE_URL', '')
run_id = os.environ.get('RUN_ID')
space_url = os.environ.get('SPACE_URL')
gh_run_id = os.environ.get('GITHUB_RUN_ID')
files_to_send = []
if file_url:
print(f'Downloading attachment: {file_url}')
ext = file_url.split('.')[-1] if '.' in file_url else 'bin'
local_filename = f'attachment.{ext}'
try:
req = requests.get(file_url, timeout=30)
with open(local_filename, 'wb') as f:
f.write(req.content)
files_to_send.append(handle_file(local_filename))
except Exception as e:
print(f'Failed to download file: {e}')
print('Connecting to Gemma-4 Space...')
bot_reply = ''
try:
client = Client('huggingface-projects/gemma-4-e4b-it')
message_payload = {'text': prompt, 'files': files_to_send}
result = client.predict(
message=message_payload,
thinking=False,
max_new_tokens=4000,
max_soft_tokens=1120,
system_prompt='شما یک دستیار هوش مصنوعی بسیار باهوش، مفید و مودب به زبان فارسی هستید.',
api_name='/generate'
)
bot_reply = result[0] if isinstance(result, (list, tuple)) else result
if isinstance(bot_reply, dict):
bot_reply = bot_reply.get('text', str(bot_reply))
except Exception as e:
err_str = str(e)
print(f'Generation Error: {err_str}')
if 'ZeroGPU quota' in err_str or 'quota' in err_str.lower():
bot_reply = '⚠️ **پردازش متوقف شد:**\\nدرخواست شما طولانی و پرحجم است (پردازش فایل‌های صوتی یا حجیم نیازمند منابع بالایی است و سهمیه سرور موقتاً پر شده است).\\n\\nلطفاً چند ساعت دیگر امتحان کنید یا درخواست و فایل سبک‌تری ارسال نمایید.'
else:
bot_reply = f'⚠️ خطای سرور پردازش: {err_str}'
print('Uploading response back to Docker Space...')
try:
with open('response.txt', 'w', encoding='utf-8') as f:
f.write(bot_reply)
with open('response.txt', 'rb') as f:
res = requests.post(f'{space_url}/api/webhook/upload',
data={'run_id': run_id, 'github_run_id': gh_run_id, 'ext': 'txt'},
files={'file': f})
if res.status_code == 200:
print('SUCCESS!')
else:
sys.exit(1)
except Exception as e:
sys.exit(1)
""",
"generate-editor.py": """
import os, sys, requests, re, time
import httpx
original_client_init = httpx.Client.__init__
def patched_client_init(self, *args, **kwargs):
kwargs['timeout'] = httpx.Timeout(300.0)
original_client_init(self, *args, **kwargs)
httpx.Client.__init__ = patched_client_init
original_async_init = httpx.AsyncClient.__init__
def patched_async_init(self, *args, **kwargs):
kwargs['timeout'] = httpx.Timeout(300.0)
original_async_init(self, *args, **kwargs)
httpx.AsyncClient.__init__ = patched_async_init
from gradio_client import Client, handle_file
prompt = os.environ.get('PROMPT', '')
image_url = os.environ.get('IMAGE_URL', '')
run_id = os.environ.get('RUN_ID')
space_url = os.environ.get('SPACE_URL')
github_run_id = os.environ.get('GITHUB_RUN_ID')
def report_failure(error_msg):
try:
requests.post(
f"{space_url}/api/webhook/fail",
json={
"run_id": run_id,
"error": error_msg,
"event_type": "generate-editor",
"client_payload": {
"prompt": prompt,
"image_url": image_url,
"run_id": run_id,
"space_url": space_url
},
"github_run_id": github_run_id
},
timeout=15
)
except Exception as e:
print(f"Failed to report failure: {e}")
print('1. Downloading input image from Docker Space...')
try:
req = requests.get(image_url, timeout=30)
with open('input.png', 'wb') as f:
f.write(req.content)
print(' -> Input image ready.')
except Exception as e:
print(f'Download failed: {e}')
sys.exit(1)
print('2. Connecting to Omni Editor and Processing...')
result_str = ''
success = False
for attempt in range(3):
try:
print(f' -> Attempt {attempt + 1} of 3...')
client = Client('selfit-camera/omni-image-editor')
result = client.predict(
handle_file('input.png'),
prompt,
api_name='/edit_image_interface'
)
result_str = str(result)
if 'src=' in result_str or 'http' in result_str:
success = True
break
else:
print(f' -> Missing valid response format: {result_str[:100]}...')
except Exception as client_err:
print(f' -> Attempt {attempt + 1} failed: {client_err}')
time.sleep(5)
if not success:
print(f'CRITICAL ERROR: All attempts failed. Raw result: {result_str}')
sys.exit(1)
print('3. Parsing result string...')
urls = re.findall(r'src=[\'\"]([^\'\"]+)[\'\"]', result_str)
final_remote_url = None
for url in urls:
if url.startswith('http'):
final_remote_url = url
break
if not final_remote_url:
direct_urls = re.findall(r'(https?://[^\s\'\"]+\.(?:png|jpg|jpeg|webp|gif))', result_str)
if direct_urls:
final_remote_url = direct_urls[0]
if not final_remote_url:
print(f'ERROR: Could not find valid URL. Raw result: {result_str}')
sys.exit(1)
print(f' -> Found Final Image URL: {final_remote_url}')
print('4. Downloading final image from remote host...')
try:
img_req = requests.get(final_remote_url, timeout=60)
content_type = img_req.headers.get('Content-Type', '')
ext = 'png'
if 'image/webp' in content_type:
ext = 'webp'
elif 'image/jpeg' in content_type or 'image/jpg' in content_type:
ext = 'jpg'
filename = f'output.{ext}'
with open(filename, 'wb') as f:
f.write(img_req.content)
print(f' -> Saved locally as {filename}')
except Exception as dl_err:
print(f'Failed to download processed image: {dl_err}')
sys.exit(1)
print(f'5. Uploading back to Docker Space: {space_url}')
try:
with open(filename, 'rb') as f:
res = requests.post(
f'{space_url}/api/webhook/upload',
data={'run_id': run_id, 'github_run_id': github_run_id, 'ext': ext},
files={'file': f},
timeout=60
)
if res.status_code == 200:
print('6. SUCCESS!')
else:
sys.exit(1)
except Exception as e:
err_str = str(e)
report_failure(err_str)
sys.exit(1)
""",
"generate-video.py": """
import os, sys, random, requests, time
from gradio_client import Client, handle_file
from PIL import Image, ImageOps
prompt = os.environ.get('PROMPT', '').strip()
if not prompt:
prompt = 'make this image come alive, cinematic motion, smooth animation'
else:
prompt = f'{prompt}, cinematic motion, smooth animation, high quality'
negative_prompt = '色调艳丽, 过曝, 静态, 细节模糊不清, 字幕, 风格, 作品, 画作, 画面, 静止, 整体发灰, 最差质量, 低质量, JPEG压缩残留, 丑陋의, 残缺의, 多余的手指, 画得不好的手部, 画得不好的脸部, 畸形の, 毁容의, 形态畸形の肢体, 手指融合, 静止不动的画面, 杂乱의 背景, 三条腿, 背景人很多, 倒着走'
image_url = os.environ.get('IMAGE_URL', '')
run_id = os.environ.get('RUN_ID')
space_url = os.environ.get('SPACE_URL')
github_run_id = os.environ.get('GITHUB_RUN_ID')
try:
duration = float(os.environ.get('DURATION', 5.0))
except Exception:
duration = 5.0
def report_failure(error_msg):
try:
requests.post(
f"{space_url}/api/webhook/fail",
json={
"run_id": run_id,
"error": error_msg,
"event_type": "generate-video",
"client_payload": {
"prompt": prompt,
"duration": duration,
"image_url": image_url,
"run_id": run_id,
"space_url": space_url
},
"github_run_id": github_run_id
},
timeout=15
)
except Exception as e:
print(f"Failed to report failure: {e}")
print('1. Downloading input image from Docker Space...')
try:
req = requests.get(image_url, timeout=30)
with open('input_raw.png', 'wb') as f:
f.write(req.content)
except Exception as e:
print(f'Download failed: {e}')
sys.exit(1)
print('2. Analyzing image dimensions to prevent stretching...')
img = Image.open('input_raw.png').convert('RGB')
orig_w, orig_h = img.size
ratio = orig_w / orig_h
if ratio > 1.2:
target_w, target_h = 832, 480
print(' -> Format: Landscape (16:9)')
elif ratio < 0.8:
target_w, target_h = 480, 832
print(' -> Format: Portrait (9:16)')
else:
target_w, target_h = 480, 480
print(' -> Format: Square (1:1)')
img_resized = ImageOps.fit(img, (target_w, target_h), Image.Resampling.LANCZOS)
img_resized.save('input.png')
print(f' -> Image resized to {target_w}x{target_h}')
print('3. Connecting to AI Video Space...')
video_path = None
space_id = 'zerogpu-aoti/wan2-2-fp8da-aoti-faster'
print(f' -> Loading Space: {space_id}')
try:
client = Client(space_id)
except Exception as e:
print(f'CRITICAL ERROR: Could not load space. {e}')
sys.exit(1)
max_attempts = 4
for attempt in range(max_attempts):
seed = random.randint(1, 2147483647)
print(f' -> Attempt {attempt + 1} of {max_attempts} with seed {seed} and duration {duration}s...')
try:
result = client.predict(
handle_file('input.png'),
prompt,
6,
negative_prompt,
duration,
1,
1,
seed,
True,
fn_index=0
)
if isinstance(result, list) and len(result) > 0:
video_path = result[0].get('video') or result[0].get('path') if isinstance(result[0], dict) else result[0]
elif isinstance(result, tuple) and len(result) > 0:
video_path = result[0]
elif isinstance(result, dict):
video_path = result.get('video') or result.get('path')
else:
video_path = result
if video_path and os.path.exists(str(video_path)):
print(' -> Success! Video generated.')
break
except Exception as e:
err_msg = str(e)
print(f' -> Failed on attempt {attempt + 1}: {err_msg}')
time.sleep(5)
if not video_path or not os.path.exists(str(video_path)):
err_str = "Video generation failed on all attempts"
report_failure(err_str)
sys.exit(1)
print('4. Uploading back to your Docker Space...')
try:
with open(video_path, 'rb') as f:
res = requests.post(f'{space_url}/api/webhook/upload',
data={'run_id': run_id, 'github_run_id': github_run_id, 'ext': 'mp4'},
files={'file': f})
if res.status_code == 200:
print('5. SUCCESS!')
else:
sys.exit(1)
except Exception as e:
err_str = str(e)
report_failure(err_str)
sys.exit(1)
""",
"generate-audio.py": """
import os, sys, requests, time
from gradio_client import Client, handle_file
AUDIO_TYPE = '___AUDIO_TYPE___'
NEGATIVE_PROMPT = '''___NEGATIVE_PROMPT___'''
prompt = os.environ.get('PROMPT', '')
try: seed = int(os.environ.get('SEED', '-1'))
except: seed = -1
try: duration = float(os.environ.get('DURATION', '8'))
except: duration = 8.0
file_url = os.environ.get('FILE_URL', '')
run_id = os.environ.get('RUN_ID')
space_url = os.environ.get('SPACE_URL')
github_run_id = os.environ.get('GITHUB_RUN_ID')
def report_failure(error_msg):
try:
requests.post(
f"{space_url}/api/webhook/fail",
json={
"run_id": run_id,
"error": error_msg,
"event_type": "generate-audio",
"client_payload": {
"data": {
"prompt": prompt,
"duration": duration,
"seed": seed,
"audio_type": AUDIO_TYPE,
"negative_prompt": NEGATIVE_PROMPT,
"file_url": file_url,
"run_id": run_id,
"space_url": space_url
}
},
"github_run_id": github_run_id
},
timeout=15
)
except Exception as e:
print(f"Failed to report failure: {e}")
try:
print('1. Connecting to Audio Space...')
client = Client("hkchengrex/MMAudio")
if AUDIO_TYPE == 'video':
print('2. Downloading video for audio generation...')
req = requests.get(file_url, timeout=30)
ext = file_url.split('.')[-1] if '.' in file_url else 'mp4'
local_vid = f'input.{ext}'
with open(local_vid, 'wb') as f:
f.write(req.content)
print('3. Processing Video-to-Audio...')
result = client.predict(
video=handle_file(local_vid),
prompt=prompt,
negative_prompt=NEGATIVE_PROMPT,
seed=seed,
num_steps=25,
cfg_strength=4.5,
duration=duration,
api_name="/video_to_audio"
)
else:
print('2. Processing Text-to-Audio...')
result = client.predict(
prompt=prompt,
negative_prompt=NEGATIVE_PROMPT,
seed=seed,
num_steps=25,
cfg_strength=4.5,
duration=duration,
api_name="/text_to_audio"
)
print('4. Preparing File Data...')
audio_path = None
if isinstance(result, list) and len(result) > 0:
audio_path = result[0].get('video') or result[0].get('path') or result[0].get('url') if isinstance(result[0], dict) else result[0]
elif isinstance(result, tuple) and len(result) > 0:
audio_path = result[0]
elif isinstance(result, dict):
audio_path = result.get('video') or result.get('path') or result.get('url')
else:
audio_path = result
ext = 'mp4'
if str(audio_path).endswith('.wav'): ext = 'wav'
if str(audio_path).endswith('.mp3'): ext = 'mp3'
print('5. Uploading back to Space...')
with open(audio_path, 'rb') as f:
res = requests.post(
f'{space_url}/api/webhook/upload',
data={'run_id': run_id, 'github_run_id': github_run_id, 'ext': ext},
files={'file': f}
)
if res.status_code == 200:
print('SUCCESS!')
else:
sys.exit(1)
except Exception as e:
err_str = str(e)
print(f'Generation error: {err_str}')
report_failure(err_str)
sys.exit(1)
"""
}
for name, content in scripts.items():
path = os.path.join("yml", name)
if os.path.exists(path):
try:
with open(path, "r", encoding="utf-8") as f:
old_data = f.read()
if "report_failure" not in old_data:
os.remove(path)
except:
pass
if not os.path.exists(path):
with open(path, "w", encoding="utf-8") as f:
f.write(content.strip())
write_default_scripts()
# پایگاه داده SQLite
def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS users
(id TEXT PRIMARY KEY, count INTEGER, week_start REAL)''')
c.execute('''CREATE TABLE IF NOT EXISTS retries
(run_id TEXT PRIMARY KEY, attempt INTEGER, event_type TEXT, client_payload TEXT, last_failed_github_run_id INTEGER)''')
for col, col_type in [("event_type", "TEXT"), ("client_payload", "TEXT"), ("last_failed_github_run_id", "INTEGER")]:
try:
c.execute(f"ALTER TABLE retries ADD COLUMN {col} {col_type}")
except sqlite3.OperationalError:
pass
conn.commit()
conn.close()
def init_image_db():
conn = sqlite3.connect(DB_IMAGE_PATH)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS users
(id TEXT PRIMARY KEY, count INTEGER, week_start REAL)''')
conn.commit()
conn.close()
def init_image_edit_db():
conn = sqlite3.connect(DB_IMAGE_EDIT_PATH)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS users
(id TEXT PRIMARY KEY, count INTEGER, week_start REAL)''')
conn.commit()
conn.close()
def init_chat_queue_db():
conn = sqlite3.connect(DB_CHAT_QUEUE_PATH)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS queue
(run_id TEXT PRIMARY KEY, prompt TEXT, file_url TEXT, file_mime TEXT, status TEXT, timestamp REAL)''')
conn.commit()
conn.close()
init_db()
init_image_db()
init_image_edit_db()
init_chat_queue_db()
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def get_image_db_connection():
conn = sqlite3.connect(DB_IMAGE_PATH)
conn.row_factory = sqlite3.Row
return conn
def get_image_edit_db_connection():
conn = sqlite3.connect(DB_IMAGE_EDIT_PATH)
conn.row_factory = sqlite3.Row
return conn
def get_chat_queue_db_connection():
conn = sqlite3.connect(DB_CHAT_QUEUE_PATH)
conn.row_factory = sqlite3.Row
return conn
# --- توابع کمکی ---
def delete_github_run(gh_run_id):
time.sleep(300)
for acc in GITHUB_ACCOUNTS:
url = f"https://api.github.com/repos/{acc['repo']}/actions/runs/{gh_run_id}"
headers = {"Accept": "application/vnd.github.v3+json", "Authorization": f"token {acc['token']}"}
r = requests.delete(url, headers=headers)
if r.status_code in [204, 200]:
print(f"🧹 [حذف موفق اکشن] شناسه {gh_run_id} از روی مخزن {acc['repo']} پاک شد.", flush=True)
break
def get_user_identifier(data):
fingerprint = data.get('fingerprint')
if fingerprint: return str(fingerprint)
if request.headers.getlist("X-Forwarded-For"):
return request.headers.getlist("X-Forwarded-For")[0].split(',')[0].strip()
return request.remote_addr
def has_forbidden_words(text):
text_lower = text.lower()
return any(keyword in text_lower for keyword in FORBIDDEN_KEYWORDS)
def get_github_runs():
all_runs = []
for acc in GITHUB_ACCOUNTS:
url = f"https://api.github.com/repos/{acc['repo']}/actions/runs"
headers = {"Accept": "application/vnd.github.v3+json", "Authorization": f"token {acc['token']}"}
try:
r = requests.get(f"{url}?per_page=100", headers=headers)
if r.status_code == 200:
runs = r.json().get('workflow_runs', [])
for run in runs:
run['_account'] = acc
all_runs.extend(runs)
except Exception as e:
print("Error fetching workflow runs:", e, flush=True)
all_runs.sort(key=lambda x: x.get('created_at', ''), reverse=True)
return all_runs
def is_today(created_at_str):
try:
run_date_str = created_at_str.split('T')[0]
today_str = datetime.utcnow().strftime('%Y-%m-%d')
return run_date_str == today_str
except Exception:
return False
# سیستم یکپارچه نجات و اجرای مجدد رانرهای موازی با استفاده از دیتابیس
def trigger_retry(run_id, github_run_id, error_msg):
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute("SELECT * FROM retries WHERE run_id = ?", (run_id,))
row = c.fetchone()
if not row:
conn.close()
return "No retry record"
attempt = row['attempt'] if row['attempt'] is not None else 0
last_failed = row['last_failed_github_run_id']
event_type = row['event_type']
client_payload_raw = row['client_payload']
# جلوگیری از دوبرابر شدن درخواست به ازای شناسه رانر یکسان در نخ‌های موازی
if github_run_id and last_failed == int(github_run_id):
conn.close()
return "Already processed"
new_attempt = attempt + 1
max_retries = 20
# بروزرسانی تعداد دفعات تلاش و شناسه شکست خورده جاری
c.execute("UPDATE retries SET attempt = ?, last_failed_github_run_id = ? WHERE run_id = ?",
(new_attempt, github_run_id, run_id))
conn.commit()
conn.close()
if new_attempt <= max_retries:
try:
client_payload = json.loads(client_payload_raw)
except Exception:
client_payload = {}
# بررسی و تعدیل تطبیقی ابعاد برای عبور از فیلترهای محدودیت زمانی سخت‌افزار
is_duration_error = any(term in error_msg.lower() for term in ["duration", "larger", "max", "quota", "capacity", "limit"])
if is_duration_error:
print(f"📉 [تنظیم تطبیقی پارامترها] تعدیل پارامترها برای تلاش شماره {new_attempt}...", flush=True)
if "data" in client_payload:
original_steps = int(client_payload["data"].get("steps", 30))
original_width = int(client_payload["data"].get("width", 1024))
original_height = int(client_payload["data"].get("height", 1024))
if new_attempt == 1:
client_payload["data"]["steps"] = min(20, original_steps)
client_payload["data"]["width"] = min(896, original_width)
client_payload["data"]["height"] = min(896, original_height)
elif new_attempt == 2:
client_payload["data"]["steps"] = min(15, original_steps)
client_payload["data"]["width"] = min(768, original_width)
client_payload["data"]["height"] = min(768, original_height)
else:
client_payload["data"]["steps"] = min(12, original_steps)
client_payload["data"]["width"] = min(512, original_width)
client_payload["data"]["height"] = min(512, original_height)
print(f"🔄 [ نجات خودکار - تلاش مجدد ] اجرای تلاش {new_attempt}/{max_retries} برای فرآیند {run_id} به دلیل خطا: {error_msg}", flush=True)
dispatch_github_workflow(event_type, client_payload)
return "Retrying"
else:
print(f"❌ [اتمام سقف تلاش مجدد] فرآیند {run_id} به سقف تلاش رسید ({max_retries}). تولید کارت تصویری خطا...", flush=True)
create_error_image(run_id, f"Error: {error_msg}")
return "Max retries reached"
# تابع ارسال رویداد داینامیک به گیت‌هاب با پشتیبانی از دیتابیس
def dispatch_github_workflow(event_type, client_payload):
acc = get_next_account()
if not acc:
print("❌ [خطای دیسپچ] هیچ حساب گیت‌هابی پیکربندی نشده یا فعال نیست!", flush=True)
dummy_response = requests.models.Response()
dummy_response.status_code = 500
dummy_response._content = b'No GitHub accounts configured'
return dummy_response
run_id = client_payload.get("run_id") or client_payload.get("data", {}).get("run_id")
if run_id:
RUN_ACCOUNT_MAPPING[run_id] = acc
# ذخیره یا بازیابی فراداده در پایگاه داده نجات
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("INSERT OR IGNORE INTO retries (run_id, attempt, event_type, client_payload) VALUES (?, 0, ?, ?)",
(run_id, event_type, json.dumps(client_payload)))
c.execute("UPDATE retries SET event_type = ?, client_payload = ? WHERE run_id = ?",
(event_type, json.dumps(client_payload), run_id))
conn.commit()
conn.close()
script_path = os.path.join("yml", f"{event_type}.py")
script_content = ""
if os.path.exists(script_path):
with open(script_path, "r", encoding="utf-8") as f:
script_content = f.read()
if event_type == "generate-audio":
data_dict = client_payload.get("data", client_payload)
audio_type = data_dict.get('audio_type', 'text')
negative_prompt = data_dict.get('negative_prompt', 'music')
script_content = script_content.replace("___AUDIO_TYPE___", str(audio_type))
script_content = script_content.replace("___NEGATIVE_PROMPT___", str(negative_prompt))
if "data" in client_payload:
client_payload["data"]["script"] = script_content
else:
client_payload["script"] = script_content
url = f"https://api.github.com/repos/{acc['repo']}/dispatches"
headers = {
"Accept": "application/vnd.github.v3+json",
"Authorization": f"token {acc['token']}"
}
print(f"🚀 [ارسال دیسپچ] در حال ارسال درخواست رویداد '{event_type}' به مخزن '{acc['repo']}'...", flush=True)
try:
r = requests.post(url, headers=headers, json={
"event_type": event_type,
"client_payload": client_payload
}, timeout=20)
print(f"📩 [پاسخ دیسپچ گیت‌هاب] مخزن: {acc['repo']} | کد وضعیت: {r.status_code} | متن پاسخ: {r.text[:500]}", flush=True)
return r
except Exception as e:
print(f"❌ [خطای ارتباطی با گیت‌هاب] ارسال دیسپچ به {acc['repo']} ناموفق بود: {e}", flush=True)
dummy = requests.models.Response()
dummy.status_code = 502
dummy._content = str(e).encode('utf-8')
return dummy
# --- مسیرهای صفحات HTML ---
@app.route('/')
def index():
return render_template('index.html')
@app.route('/<page_name>')
def serve_html(page_name):
if not page_name.endswith('.html'):
page_name += '.html'
try:
return render_template(page_name)
except:
return "Page not found", 404
# --- API مخصوص مدیریت گیت‌هاب اکشنز در داشبورد ---
@app.route('/api/actions/active', methods=['GET'])
def get_active_actions():
if not GITHUB_ACCOUNTS:
return jsonify({"status": "error", "message": "Tokens missing"}), 500
runs = get_github_runs()
active = [
{
"id": run["id"],
"name": run["name"],
"status": run["status"],
"html_url": run["html_url"],
"created_at": run["created_at"]
}
for run in runs if run["status"] in ["queued", "in_progress", "waiting"]
]
return jsonify({"status": "success", "actions": active})
@app.route('/api/actions/inactive-today', methods=['GET'])
def get_inactive_today():
if not GITHUB_ACCOUNTS:
return jsonify({"status": "error", "message": "Tokens missing"}), 500
runs = get_github_runs()
inactive_today = [
{
"id": run["id"],
"name": run["name"],
"status": run["status"],
"conclusion": run.get("conclusion"),
"html_url": run["html_url"],
"created_at": run["created_at"]
}
for run in runs if run["status"] == "completed" and is_today(run["created_at"])
]
return jsonify({"status": "success", "actions": inactive_today})
@app.route('/api/actions/completed-today-count', methods=['GET'])
def get_completed_today_count():
if not GITHUB_ACCOUNTS:
return jsonify({"status": "error", "message": "Tokens missing"}), 500
runs = get_github_runs()
completed_today = [
run for run in runs if run["status"] == "completed" and run.get("conclusion") == "success" and is_today(run["created_at"])
]
return jsonify({"status": "success", "count": len(completed_today)})
@app.route('/api/actions/delete-inactive', methods=['POST'])
def delete_inactive_actions():
if not GITHUB_ACCOUNTS:
return jsonify({"status": "error", "message": "Tokens missing"}), 500
runs = get_github_runs()
deleted_count = 0
for run in runs:
if run["status"] == "completed":
acc = run['_account']
headers = {"Accept": "application/vnd.github.v3+json", "Authorization": f"token {acc['token']}"}
delete_url = f"https://api.github.com/repos/{acc['repo']}/actions/runs/{run['id']}"
r = requests.delete(delete_url, headers=headers)
if r.status_code in [204, 200]:
deleted_count += 1
return jsonify({"status": "success", "deleted_count": deleted_count})
@app.route('/api/actions/delete-failed-completed', methods=['POST'])
def delete_failed_completed():
if not GITHUB_ACCOUNTS:
return jsonify({"status": "error", "message": "Tokens missing"}), 500
runs = get_github_runs()
deleted_count = 0
for run in runs:
if run["status"] == "completed" and run.get("conclusion") in ["success", "failure", "cancelled"]:
acc = run['_account']
headers = {"Accept": "application/vnd.github.v3+json", "Authorization": f"token {acc['token']}"}
delete_url = f"https://api.github.com/repos/{acc['repo']}/actions/runs/{run['id']}"
r = requests.delete(delete_url, headers=headers)
if r.status_code in [204, 200]:
deleted_count += 1
return jsonify({"status": "success", "deleted_count": deleted_count})
@app.route('/api/actions/cancel-delete-active', methods=['POST'])
def cancel_delete_active_actions():
if not GITHUB_ACCOUNTS:
return jsonify({"status": "error", "message": "Tokens missing"}), 500
runs = get_github_runs()
cancelled_count = 0
deleted_count = 0
for run in runs:
if run["status"] in ["queued", "in_progress", "waiting"]:
acc = run['_account']
headers = {"Accept": "application/vnd.github.v3+json", "Authorization": f"token {acc['token']}"}
cancel_url = f"https://api.github.com/repos/{acc['repo']}/actions/runs/{run['id']}/cancel"
requests.post(cancel_url, headers=headers)
cancelled_count += 1
time.sleep(0.5)
delete_url = f"https://api.github.com/repos/{acc['repo']}/actions/runs/{run['id']}"
r_del = requests.delete(delete_url, headers=headers)
if r_del.status_code in [204, 200]:
deleted_count += 1
return jsonify({
"status": "success",
"cancelled_count": cancelled_count,
"deleted_count": deleted_count
})
# وب‌هوک خطای ورک‌فلوها با یکپارچه‌سازی سیستم نجات
@app.route('/api/webhook/fail', methods=['POST'])
def webhook_fail():
data = request.get_json()
if not data:
print("⚠️ [وب‌هوک خطا] پارامترهای ناقص یا خالی ارسال شده است.", flush=True)
return "Invalid request", 400
run_id = data.get('run_id')
error_msg = data.get('error', '')
gh_run_id = data.get('github_run_id')
print(f"⚠️ [گزارش شکست فرآیند در اکشنز] شناسه: {run_id} | خطا: {error_msg}", flush=True)
if gh_run_id:
print(f"🧹 [پاک‌سازی خودکار] ثبت فرآیند حذف اجرای ناموفق {gh_run_id} در پس‌زمینه...", flush=True)
threading.Thread(target=delete_github_run, args=(gh_run_id,)).start()
if not run_id:
return "Missing run_id", 400
res = trigger_retry(run_id, gh_run_id, error_msg)
return res, 200
# استریم چت زنده (SSE) برای تحلیل پاسخ‌های متنی گما
@app.route('/api/stream_chat/<run_id>')
def stream_chat(run_id):
def generate_stream():
import json
file_path = f"static/images/{run_id}.txt"
waited = 0
while waited < 120:
if os.path.exists(file_path):
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read().strip()
yield f"data: {json.dumps({'text': content})}\n\n"
yield "data: DONE\n\n"
return
except Exception:
pass
time.sleep(1)
waited += 1
yield f"data: {json.dumps({'text': '⚠️ خطای زمان انتظار: سرور در زمان مقرر پاسخ نداد.'})}\n\n"
yield "data: DONE\n\n"
return Response(generate_stream(), mimetype='text/event-stream')
# --- API برای ساخت عکس و شبیه‌سازی صدا ---
@app.route('/api/generate', methods=['POST'])
def generate():
if not GITHUB_ACCOUNTS:
print("❌ [خطای امنیتی] درخواست متوقف شد: هیچ توکنی متصل نیست.", flush=True)
return jsonify({"status": "error", "message": "Secret GITHUB_TOKENs are missing!"}), 500
data = request.json
persian_prompt = data.get('prompt', '')
# بررسی جهت شناسایی و تفکیک درخواست‌های شبیه‌ساز صدا
is_voice_config = persian_prompt.startswith("VOICECONFIG_")
# بررسی کلمات نامناسب تنها برای درخواست‌های عادی (غیر از شبیه‌ساز صدا)
if not is_voice_config and has_forbidden_words(persian_prompt):
print(f"⚠️ [محتوای غیرمجاز] پرامپت فیلتر شد: {persian_prompt}", flush=True)
return jsonify({"status": "error", "message": "متن ورودی شامل کلمات نامناسب است."}), 400
print(f"📝 [ثبت درخواست جدید] پرامپت خام: {persian_prompt[:100]}...", flush=True)
if is_voice_config:
# کدهای تنظیمات شبیه‌ساز صدا نباید ترجمه شوند تا ساختار کاراکترها بدون تغییر باقی بماند
english_prompt = persian_prompt
print("🎙️ [شبیه‌ساز صدا] دور زدن مترجم گوگل جهت جلوگیری از تغییر کاراکترهای پیکربندی صوتی.", flush=True)
else:
try:
english_prompt = GoogleTranslator(source='auto', target='en').translate(persian_prompt)
print(f"🔄 [ترجمه با موفقیت انجام شد] فارسی: {persian_prompt} -> انگلیسی: {english_prompt}", flush=True)
except Exception as e:
print(f"⚠️ [ترجمه ناموفق] مترجم گوگل با خطا مواجه شد: {e}. استفاده از پرامپت خام.", flush=True)
english_prompt = persian_prompt
run_id = str(uuid.uuid4())
client_payload = {
"prompt": english_prompt,
"width": data.get('width', 1024),
"height": data.get('height', 1024),
"run_id": run_id,
"space_url": SPACE_URL
}
r = dispatch_github_workflow(data.get('action_name', 'generate-flux'), client_payload)
if r.status_code == 204:
print(f"✅ [درخواست دیسپچ شد] فرآیند با شناسه {run_id} با موفقیت در صف گیت‌هاب قرار گرفت.", flush=True)
return jsonify({
"status": "success",
"run_id": run_id,
"translated_prompt": english_prompt,
"translated": english_prompt
})
else:
print(f"❌ [توقف درخواست] گیت‌هاب پاسخ غیرمنتظره داد (کد {r.status_code}): {r.text}", flush=True)
return jsonify({"status": "error", "message": f"GitHub dispatch error: {r.status_code}"}), 400
# --- API مخصوص ویرایش عکس (AI Photoshop) ---
@app.route('/api/edit', methods=['POST'])
def edit_image():
if not GITHUB_ACCOUNTS:
print("❌ [خطای ویرایش] توکنی برای ارتباط وجود ندارد.", flush=True)
return jsonify({"status": "error", "message": "Tokens missing"}), 500
if 'image' not in request.files:
print("❌ [خطای ویرایش] فایلی ارسال نشده است.", flush=True)
return jsonify({"status": "error", "message": "Image required"}), 400
file = request.files['image']
persian_prompt = request.form.get('prompt', '')
if has_forbidden_words(persian_prompt):
print(f"⚠️ [محتوای غیرمجاز] پرامپت ویرایش فیلتر شد: {persian_prompt}", flush=True)
return jsonify({"status": "error", "message": "متن ورودی شامل کلمات نامناسب است."}), 400
print(f"📝 [درخواست ویرایش عکس] دستور فارسی: {persian_prompt}", flush=True)
try:
english_prompt = GoogleTranslator(source='auto', target='en').translate(persian_prompt)
print(f"🔄 [ترجمه ویرایش با موفقیت انجام شد] فارسی: {persian_prompt} -> انگلیسی: {english_prompt}", flush=True)
except Exception as e:
print(f"⚠️ [ترجمه ناموفق ویرایش] استفاده از پرامپت خام: {e}", flush=True)
english_prompt = persian_prompt
run_id = str(uuid.uuid4())
input_filename = f"{run_id}_input.png"
file.save(f"static/images/{input_filename}")
image_public_url = f"{SPACE_URL}/static/images/{input_filename}"
client_payload = {
"prompt": english_prompt,
"image_url": image_public_url,
"run_id": run_id,
"space_url": SPACE_URL
}
r = dispatch_github_workflow("generate-editor", client_payload)
if r.status_code == 204:
print(f"✅ [درخواست ویرایش عکس دیسپچ شد] فرآیند با شناسه {run_id} با موفقیت در صف گیت‌هاب قرار گرفت.", flush=True)
return jsonify({"status": "success", "run_id": run_id, "translated": english_prompt})
else:
print(f"❌ [توقف ویرایش عکس] گیت‌هاب پاسخ خطا داد (کد {r.status_code}): {r.text}", flush=True)
return jsonify({"status": "error", "message": f"GitHub dispatch error: {r.status_code}"}), 400
# --- API مخصوص تولید تصویر FLUX.2 ---
@app.route('/api/generate-flux2', methods=['POST'])
def generate_flux2():
if not GITHUB_ACCOUNTS:
print("❌ [خطای FLUX2] حسابی یافت نشد.", flush=True)
return jsonify({"status": "error", "message": "Tokens missing"}), 500
persian_prompt = request.form.get('prompt', '')
if has_forbidden_words(persian_prompt):
print(f"⚠️ [محتوای غیرمجاز] پرامپت Flux2 فیلتر شد: {persian_prompt}", flush=True)
return jsonify({"status": "error", "message": "متن ورودی شامل کلمات نامناسب است."}), 400
print(f"📝 [ثبت درخواست جدید Flux2] پرامپت فارسی: {persian_prompt}", flush=True)
try:
english_prompt = GoogleTranslator(source='auto', target='en').translate(persian_prompt)
print(f"🔄 [ترجمه Flux2 انجام شد] فارسی: {persian_prompt} -> انگلیسی: {english_prompt}", flush=True)
except Exception as e:
print(f"⚠️ [ترجمه ناموفق Flux2] استفاده از پرامپت خام: {e}", flush=True)
english_prompt = persian_prompt
run_id = str(uuid.uuid4())
image_urls = []
files = request.files.getlist('images')
for idx, file in enumerate(files):
if file and file.filename:
ext = file.filename.split('.')[-1] if '.' in file.filename else 'png'
input_filename = f"{run_id}_input_{idx}.{ext}"
file.save(f"static/images/{input_filename}")
image_urls.append(f"{SPACE_URL}/static/images/{input_filename}")
try: width = int(request.form.get('width', 1024))
except ValueError: width = 1024
try: height = int(request.form.get('height', 1024))
except ValueError: height = 1024
try: steps = int(request.form.get('steps', 30))
except ValueError: steps = 30
try: guidance = float(request.form.get('guidance', 4.0))
except ValueError: guidance = 4.0
try:
seed_raw = request.form.get('seed', '0')
seed = int(seed_raw) if seed_raw.strip() else 0
except ValueError:
seed = 0
client_payload = {
"data": {
"prompt": english_prompt,
"image_urls": image_urls,
"width": width,
"height": height,
"steps": steps,
"guidance": guidance,
"seed": seed,
"randomize_seed": request.form.get('randomizeSeed') == 'true',
"prompt_upsampling": request.form.get('promptUpsampling') == 'true',
"run_id": run_id,
"space_url": SPACE_URL,
"hf_token": request.form.get('hfToken', '')
}
}
r = dispatch_github_workflow("generate-flux2", client_payload)
if r.status_code == 204:
print(f"✅ [درخواست Flux2 دیسپچ شد] فرآیند با شناسه {run_id} با موفقیت در صف گیت‌هاب قرار گرفت.", flush=True)
return jsonify({"status": "success", "run_id": run_id, "translated": english_prompt})
else:
print(f"❌ [توقف درخواست Flux2] خطا در ارسال درخواست دیسپچ: {r.status_code}", flush=True)
return jsonify({"status": "error", "message": f"GitHub dispatch error: {r.status_code}"}), 400
# --- API مخصوص تبدیل عکس به ویدیو ---
@app.route('/api/generate-video', methods=['POST'])
def generate_video():
if not GITHUB_ACCOUNTS:
print("❌ [خطای ویدیو] حسابی جهت دیسپچ متصل نیست.", flush=True)
return jsonify({"status": "error", "message": "Tokens missing"}), 500
if 'image' not in request.files:
print("❌ [خطای ویدیو] تصویری به سرور ارسال نشده است.", flush=True)
return jsonify({"status": "error", "message": "Image required"}), 400
file = request.files['image']
user_prompt = request.form.get('prompt', '').strip()
duration = request.form.get('duration', 5.0)
if has_forbidden_words(user_prompt):
print(f"⚠️ [محتوای غیرمجاز] پرامپت ویدیو فیلتر شد: {user_prompt}", flush=True)
return jsonify({"status": "error", "message": "متن ورودی شامل کلمات نامناسب است."}), 400
run_id = str(uuid.uuid4())
input_filename = f"{run_id}_video_input.png"
local_image_path = os.path.join("static/images", input_filename)
file.save(local_image_path)
image_public_url = f"{SPACE_URL}/static/images/{input_filename}"
master_prompt = """You are an expert AI Animation Planner. Your absolute highest priority is to faithfully and creatively execute the user's specific request based on the provided image.
1. If the user prompt is empty or generic (like "animate this"), add subtle, high-quality, believable cinematic motion (e.g., slow zoom, water flowing, wind in hair).
2. If the user gives specific directions, focus ENTIRELY on executing that command perfectly. If the action is not visible in-frame, use cinematic camera movements to reveal it.
3. You must output ONLY a highly detailed, descriptive animation prompt in ENGLISH. Do not translate literally; ENHANCE the prompt for a text-to-video AI model.
4. MUST Include keywords at the end: cinematic, photorealistic, high detail, smooth motion, 8k.
CRITICAL RULE: DO NOT say "Here is the prompt" or give any conversational explanations. DO NOT output JSON. Output ONLY the raw English animation prompt text and NOTHING ELSE."""
english_prompt = ""
gemma_success = False
prompt_for_gemma = user_prompt if user_prompt else "لطفاً این تصویر را به یک ویدیوی سینمایی بسیار جذاب و واقع‌گرایانه متحرک کن."
combined_prompt = f"{master_prompt}\n\nUser request: {prompt_for_gemma}"
print(f"🎬 [متحرک‌سازی تصویر] آغاز هماهنگی سناریو با پلانر تصویری Gemma...", flush=True)
for attempt in range(1):
try:
gemma_run_id = f"gemma_{run_id}_{attempt}"
gemma_txt_path = os.path.join("static/images", f"{gemma_run_id}.txt")
client_payload_gemma = {
"prompt": combined_prompt,
"file_url": image_public_url,
"file_mime": "image/png",
"run_id": gemma_run_id,
"space_url": SPACE_URL
}
r_gemma = dispatch_github_workflow("chat-gemma", client_payload_gemma)
if r_gemma.status_code == 204:
waited = 0
while waited < 50:
if os.path.exists(gemma_txt_path):
with open(gemma_txt_path, "r", encoding="utf-8") as f:
english_prompt = f.read().strip()
gemma_success = True
break
time.sleep(3)
waited += 3
if gemma_success:
if "```" in english_prompt:
english_prompt = english_prompt.replace("```text", "").replace("```", "").replace("```json", "").strip()
if "پردازش متوقف شد" in english_prompt or "سهمیه سرور موقتاً پر شده" in english_prompt:
print(f"⚠️ پلانر تصویری Gemma با محدودیت سخت‌افزاری روبرو شد. تلاش مجدد...", flush=True)
gemma_success = False
else:
break
except Exception as e:
print(f"⚠️ خطای تلاش متحرک‌سازی در پلانر Gemma: {e}", flush=True)
if not gemma_success:
time.sleep(4)
if not gemma_success:
print("🔄 [پلانر تصویر با خطا متوقف شد] ناتوانی در دریافت پرامپت بهینه. بازگشت به مترجم ساده...", flush=True)
try:
if user_prompt:
english_prompt = GoogleTranslator(source='auto', target='en').translate(user_prompt)
english_prompt += ", cinematic motion, photorealistic, high detail, smooth animation, 8k"
else:
english_prompt = "cinematic motion, photorealistic, high detail, smooth animation, 8k"
except:
english_prompt = "cinematic motion, photorealistic, high detail, smooth animation, 8k"
print(f"🎥 [پرامپت ساخت نهایی ویدیو] {english_prompt}", flush=True)
client_payload_video = {
"prompt": english_prompt,
"duration": duration,
"image_url": image_public_url,
"run_id": run_id,
"space_url": SPACE_URL
}
r_vid = dispatch_github_workflow("generate-video", client_payload_video)
if r_vid.status_code == 204:
print(f"✅ [درخواست ساخت ویدیو دیسپچ شد] فرآیند با شناسه {run_id} با موفقیت در صف گیت‌هاب قرار گرفت.", flush=True)
return jsonify({"status": "success", "run_id": run_id, "translated": english_prompt})
else:
print(f"❌ [توقف متحرک‌سازی عکس] دیسپچ با کد {r_vid.status_code} رد شد.", flush=True)
return jsonify({"status": "error", "message": f"GitHub dispatch error: {r_vid.status_code}"}), 400
# ==========================================
# API اختصاصی جهت پردازش فایل صوتی در گیت هاب
# ==========================================
@app.route('/api/generate-audio', methods=['POST'])
def generate_audio():
if not GITHUB_ACCOUNTS:
print("❌ [خطای صوتی] توکن‌های گیت‌هاب یافت نشدند.", flush=True)
return jsonify({"status": "error", "message": "Tokens missing"}), 500
audio_type = request.form.get('type', 'text')
persian_prompt = request.form.get('prompt', '')
persian_negative = request.form.get('negative_prompt', 'music')
duration = request.form.get('duration', '8')
seed = request.form.get('seed', '-1')
if has_forbidden_words(persian_prompt):
print(f"⚠️ [محتوای غیرمجاز] پرامپت صوتی فیلتر شد: {persian_prompt}", flush=True)
return jsonify({"status": "error", "message": "متن ورودی شامل کلمات نامناسب است."}), 400
print(f"📝 [ثبت درخواست صوتی جدید] نوع: {audio_type} | پرامپت فارسی: {persian_prompt}", flush=True)
try:
english_prompt = GoogleTranslator(source='auto', target='en').translate(persian_prompt) if persian_prompt else ""
english_negative = GoogleTranslator(source='auto', target='en').translate(persian_negative) if persian_negative else "music"
print(f"🔄 [ترجمه با موفقیت انجام شد] فارسی: {persian_prompt} -> انگلیسی: {english_prompt}", flush=True)
except Exception as e:
print(f"⚠️ [ترجمه ناموفق صوتی] استفاده از پرامپت صوتی خام: {e}", flush=True)
english_prompt = persian_prompt
english_negative = persian_negative
run_id = str(uuid.uuid4())
file_url = ""
if audio_type == 'video':
file = request.files.get('file')
if not file:
print("❌ [خطای صوتی] فایل ویدیویی دریافت نشد.", flush=True)
return jsonify({"status": "error", "message": "Video file required"}), 400
ext = file.filename.split('.')[-1] if '.' in file.filename else 'mp4'
filename = f"{run_id}_input.{ext}"
file.save(f"static/images/{filename}")
file_url = f"{SPACE_URL}/static/images/{filename}"
client_payload = {
"data": {
"prompt": english_prompt,
"negative_prompt": english_negative,
"duration": duration,
"seed": seed,
"audio_type": audio_type,
"file_url": file_url,
"run_id": run_id,
"space_url": SPACE_URL
}
}
r = dispatch_github_workflow("generate-audio", client_payload)
if r.status_code == 204:
print(f"✅ [درخواست صوتی دیسپچ شد] فرآیند با شناسه {run_id} با موفقیت در صف گیت‌هاب قرار گرفت.", flush=True)
return jsonify({"status": "success", "run_id": run_id, "translated": english_prompt})
else:
print(f"❌ [توقف درخواست صوتی] دیسپچ با خطا مواجه شد: {r.status_code}", flush=True)
return jsonify({"status": "error", "message": f"GitHub dispatch error: {r.status_code}"}), 400
# ==========================================
# سیستم جدید چت بات (ارسال مستقیم و بدون تاخیر به گیت‌هاب)
# ==========================================
@app.route('/api/chat', methods=['POST'])
def chat_api():
if not GITHUB_ACCOUNTS:
print("❌ [خطای چت] توکن گیت‌هاب جهت چت یافت نشد.", flush=True)
return jsonify({"status": "error", "message": "Tokens missing"}), 500
text_prompt = request.form.get('prompt', '')
if has_forbidden_words(text_prompt):
print(f"⚠️ [محتوای غیرمجاز] پرامپت چت فیلتر شد: {text_prompt}", flush=True)
return jsonify({"status": "error", "message": "متن ورودی شامل کلمات نامناسب است."}), 400
file_obj = request.files.get('file')
run_id = str(uuid.uuid4())
file_url = ""
mime_type = ""
print(f"💬 [ثبت چت] پرامپت کاربر: {text_prompt[:100]}...", flush=True)
if file_obj:
ext = file_obj.filename.split('.')[-1] if '.' in file_obj.filename else 'bin'
mime_type = file_obj.mimetype
filename = f"{run_id}_chat_attach.{ext}"
file_obj.save(f"static/images/{filename}")
file_url = f"{SPACE_URL}/static/images/{filename}"
print(f" 📎 پیوست چت ثبت شد در: {file_url}", flush=True)
client_payload = {
"prompt": text_prompt,
"file_url": file_url,
"file_mime": mime_type,
"run_id": run_id,
"space_url": SPACE_URL
}
r = dispatch_github_workflow("chat-gemma", client_payload)
if r.status_code == 204:
print(f"✅ [درخواست چت دیسپچ شد] چت با شناسه {run_id} ارسال شد.", flush=True)
return jsonify({"status": "success", "run_id": run_id})
else:
print(f"❌ [توقف چت] گیت‌هاب به دیسپچ چت پاسخ خطا داد: {r.status_code}", flush=True)
return jsonify({"status": "error", "message": f"GitHub dispatch error: {r.status_code}"}), 400
@app.route('/api/worker/poll', methods=['POST'])
def worker_poll():
return jsonify({"status": "empty"})
# --- API میکس ویدیوها سمت سرور ---
@app.route('/api/merge-videos', methods=['POST'])
def merge_videos():
if 'base_video' not in request.files or 'new_clip' not in request.files:
return jsonify({"error": "هر دو فایل ویدیویی برای میکس لازم هستند."}), 400
base_video_file = request.files['base_video']
new_clip_file = request.files['new_clip']
temp_files_to_clean = []
try:
base_video_path = os.path.join(TEMP_DIR, f"{uuid.uuid4().hex}.mp4")
new_clip_path = os.path.join(TEMP_DIR, f"{uuid.uuid4().hex}.mp4")
output_path = os.path.join(TEMP_DIR, f"{uuid.uuid4().hex}.mp4")
temp_files_to_clean.extend([base_video_path, new_clip_path, output_path])
base_video_file.save(base_video_path)
new_clip_file.save(new_clip_path)
probe = ffmpeg.probe(base_video_path)
video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
width = int(video_stream['width'])
height = int(video_stream['height'])
input1 = ffmpeg.input(base_video_path)
input2 = ffmpeg.input(new_clip_path).filter('scale', width, height).filter('setsar', '1')
merged_video = ffmpeg.concat(input1, input2, v=1, a=0).output(
output_path,
crf=18,
preset='slow',
pix_fmt='yuv420p'
)
merged_video.run(overwrite_output=True, quiet=True)
@after_this_request
def cleanup(response):
try:
for f_path in temp_files_to_clean:
if os.path.exists(f_path):
os.remove(f_path)
except:
pass
return response
return send_file(output_path, mimetype='video/mp4', as_attachment=True, download_name='merged_video.mp4')
except ffmpeg.Error as e:
for f_path in temp_files_to_clean:
if os.path.exists(f_path):
os.remove(f_path)
return jsonify({"error": "خطا در پردازش و میکس ویدیو در سرور."}), 500
except Exception as e:
for f_path in temp_files_to_clean:
if os.path.exists(f_path):
os.remove(f_path)
return jsonify({"error": "خطای پیش‌بینی نشده در سرور."}), 500
# --- مدیریت اعتبارها ---
@app.route('/api/check-credit', methods=['POST'])
def check_credit():
data = request.get_json()
if not data: return jsonify({"error": "Invalid request"}), 400
user_id = get_user_identifier(data)
conn = get_db_connection()
c = conn.cursor()
c.execute("SELECT * FROM users WHERE id = ?", (user_id,))
user_record = c.fetchone()
now = time.time()
credits_remaining = USAGE_LIMIT
limit_reached = False
reset_timestamp = 0
if user_record:
if user_record['week_start'] < (now - ONE_WEEK_SECONDS):
c.execute("UPDATE users SET count = 0, week_start = ? WHERE id = ?", (now, user_id))
conn.commit()
credits_remaining = USAGE_LIMIT
else:
credits_remaining = max(0, USAGE_LIMIT - user_record['count'])
if credits_remaining == 0:
limit_reached = True
reset_timestamp = user_record['week_start'] + ONE_WEEK_SECONDS
conn.close()
return jsonify({
"credits_remaining": credits_remaining,
"limit_reached": limit_reached,
"reset_timestamp": reset_timestamp
})
@app.route('/api/use-credit', methods=['POST'])
def use_credit():
data = request.get_json()
if not data: return jsonify({"error": "Invalid request"}), 400
user_id = get_user_identifier(data)
conn = get_db_connection()
c = conn.cursor()
c.execute("SELECT * FROM users WHERE id = ?", (user_id,))
user_record = c.fetchone()
now = time.time()
if user_record:
if user_record['week_start'] < (now - ONE_WEEK_SECONDS):
c.execute("UPDATE users SET count = 1, week_start = ? WHERE id = ?", (now, user_id))
else:
if user_record['count'] >= USAGE_LIMIT:
conn.close()
reset_timestamp = user_record['week_start'] + ONE_WEEK_SECONDS
return jsonify({"status": "limit_reached", "credits_remaining": 0, "reset_timestamp": reset_timestamp}), 429
c.execute("UPDATE users SET count = count + 1 WHERE id = ?", (user_id,))
else:
c.execute("INSERT INTO users (id, count, week_start) VALUES (?, 1, ?)", (user_id, now))
conn.commit()
c.execute("SELECT count FROM users WHERE id = ?", (user_id,))
new_count = c.fetchone()['count']
credits_remaining = USAGE_LIMIT - new_count
conn.close()
return jsonify({"status": "success", "credits_remaining": credits_remaining})
@app.route('/api/check-image-credit', methods=['POST'])
def check_image_credit():
data = request.get_json()
if not data: return jsonify({"error": "Invalid request"}), 400
user_id = get_user_identifier(data)
conn = get_image_db_connection()
c = conn.cursor()
c.execute("SELECT * FROM users WHERE id = ?", (user_id,))
user_record = c.fetchone()
now = time.time()
credits_remaining = IMAGE_USAGE_LIMIT
limit_reached = False
reset_timestamp = 0
if user_record:
if user_record['week_start'] < (now - ONE_DAY_SECONDS):
c.execute("UPDATE users SET count = 0, week_start = ? WHERE id = ?", (now, user_id))
conn.commit()
credits_remaining = IMAGE_USAGE_LIMIT
else:
credits_remaining = max(0, IMAGE_USAGE_LIMIT - user_record['count'])
if credits_remaining == 0:
limit_reached = True
reset_timestamp = user_record['week_start'] + ONE_DAY_SECONDS
conn.close()
return jsonify({
"credits_remaining": credits_remaining,
"limit_reached": limit_reached,
"reset_timestamp": reset_timestamp
})
@app.route('/api/use-image-credit', methods=['POST'])
def use_image_credit():
data = request.get_json()
if not data: return jsonify({"error": "Invalid request"}), 400
user_id = get_user_identifier(data)
conn = get_image_db_connection()
c = conn.cursor()
c.execute("SELECT * FROM users WHERE id = ?", (user_id,))
user_record = c.fetchone()
now = time.time()
if user_record:
if user_record['week_start'] < (now - ONE_DAY_SECONDS):
c.execute("UPDATE users SET count = 1, week_start = ? WHERE id = ?", (now, user_id))
else:
if user_record['count'] >= IMAGE_USAGE_LIMIT:
conn.close()
reset_timestamp = user_record['week_start'] + ONE_DAY_SECONDS
return jsonify({"status": "limit_reached", "credits_remaining": 0, "reset_timestamp": reset_timestamp}), 429
c.execute("UPDATE users SET count = count + 1 WHERE id = ?", (user_id,))
else:
c.execute("INSERT INTO users (id, count, week_start) VALUES (?, 1, ?)", (user_id, now))
conn.commit()
c.execute("SELECT count FROM users WHERE id = ?", (user_id,))
new_count = c.fetchone()['count']
credits_remaining = IMAGE_USAGE_LIMIT - new_count
conn.close()
return jsonify({"status": "success", "credits_remaining": credits_remaining})
@app.route('/api/check-edit-credit', methods=['POST'])
def check_edit_credit():
data = request.get_json()
if not data: return jsonify({"error": "Invalid request"}), 400
user_id = get_user_identifier(data)
conn = get_image_edit_db_connection()
c = conn.cursor()
c.execute("SELECT * FROM users WHERE id = ?", (user_id,))
user_record = c.fetchone()
now = time.time()
credits_remaining = EDIT_USAGE_LIMIT
limit_reached = False
reset_timestamp = 0
if user_record:
if user_record['week_start'] < (now - ONE_WEEK_SECONDS):
c.execute("UPDATE users SET count = 0, week_start = ? WHERE id = ?", (now, user_id))
conn.commit()
credits_remaining = EDIT_USAGE_LIMIT
else:
credits_remaining = max(0, EDIT_USAGE_LIMIT - user_record['count'])
if credits_remaining == 0:
limit_reached = True
reset_timestamp = ONE_WEEK_SECONDS + user_record['week_start']
conn.close()
return jsonify({
"credits_remaining": credits_remaining,
"limit_reached": limit_reached,
"reset_timestamp": reset_timestamp
})
@app.route('/api/use-edit-credit', methods=['POST'])
def use_edit_credit():
data = request.get_json()
if not data: return jsonify({"error": "Invalid request"}), 400
user_id = get_user_identifier(data)
conn = get_image_edit_db_connection()
c = conn.cursor()
c.execute("SELECT * FROM users WHERE id = ?", (user_id,))
user_record = c.fetchone()
now = time.time()
if user_record:
if user_record['week_start'] < (now - ONE_WEEK_SECONDS):
c.execute("UPDATE users SET count = 1, week_start = ? WHERE id = ?", (now, user_id))
else:
if user_record['count'] >= EDIT_USAGE_LIMIT:
conn.close()
reset_timestamp = user_record['week_start'] + ONE_WEEK_SECONDS
return jsonify({"status": "limit_reached", "credits_remaining": 0, "reset_timestamp": reset_timestamp}), 429
c.execute("UPDATE users SET count = count + 1 WHERE id = ?", (user_id,))
else:
c.execute("INSERT INTO users (id, count, week_start) VALUES (?, 1, ?)", (user_id, now))
conn.commit()
c.execute("SELECT count FROM users WHERE id = ?", (user_id,))
new_count = c.fetchone()['count']
credits_remaining = EDIT_USAGE_LIMIT - new_count
conn.close()
return jsonify({"status": "success", "credits_remaining": credits_remaining})
# --- دریافت و ارسال فایل ---
@app.route('/api/webhook/upload', methods=['POST'])
def webhook_upload():
run_id = request.form.get('run_id')
gh_run_id = request.form.get('github_run_id')
ext = request.form.get('ext', 'webp')
if 'file' not in request.files or not run_id:
print("⚠️ [وب‌هوک آپلود] آپلود ناموفق به دلیل کسری پارامتر.", flush=True)
return "Invalid request", 400
file = request.files['file']
file_path = f"static/images/{run_id}.{ext}"
file.save(file_path)
print(f"📥 [فایل دریافت شد] ذخیره‌سازی موفق خروجی در مسیر: {file_path}", flush=True)
if gh_run_id:
print(f"🧹 [پاک‌سازی خودکار] ثبت فرآیند حذف اجرای {gh_run_id} در پس‌زمینه...", flush=True)
threading.Thread(target=delete_github_run, args=(gh_run_id,)).start()
return "OK", 200
@app.route('/api/status/<run_id>')
def check_status(run_id):
# بررسی وجود فایل گزارش خطای نهایی
error_path = f"static/images/{run_id}.txt"
if os.path.exists(error_path):
try:
with open(error_path, "r", encoding="utf-8") as ef:
error_msg = ef.read().strip()
return jsonify({"status": "failed", "message": error_msg})
except Exception:
pass
valid_extensions = [
'png', 'jpg', 'jpeg', 'webp', 'gif',
'mp4', 'webm', 'mov', 'mkv', 'avi',
'mp3', 'wav', 'ogg', 'm4a'
]
for ext in valid_extensions:
file_path = f"static/images/{run_id}.{ext}"
if os.path.exists(file_path):
seed_val = None
seed_path = f"static/images/{run_id}_seed.txt"
if os.path.exists(seed_path):
try:
with open(seed_path, 'r') as sf:
seed_val = sf.read().strip()
except Exception:
pass
resp = {"status": "ready", "url": f"/static/images/{run_id}.{ext}"}
if seed_val:
resp["seed"] = seed_val
return jsonify(resp)
# نظارت بر وضعیت جاری در گیت‌هاب در صورت نبود خروجی
now = time.time()
last_check = LAST_RUN_CHECK.get(run_id, 0)
if now - last_check > 12:
LAST_RUN_CHECK[run_id] = now
threading.Thread(target=check_github_run_status_and_log, args=(run_id,)).start()
return jsonify({"status": "processing"})
@app.route('/static/images/<filename>')
def serve_file(filename):
return send_from_directory('static/images', filename)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)