videoupload / main.py
Akane710's picture
Update main.py
3371607 verified
raw
history blame
4.85 kB
import asyncio
import os
from enkacard import encbanner
import concurrent.futures
import requests
from fastapi import FastAPI
from io import BytesIO
from fastapi.responses import JSONResponse
import enkacard
import starrailcard
import enkanetwork
import uvicorn
import cloudinary
import cloudinary.uploader
from cloudinary.utils import cloudinary_url
app = FastAPI()
# Cloudinary configuration
cloudinary.config(
cloud_name="dob6buqxd",
api_key="397945971979515",
api_secret="nRMIOSr10xiLo443LLMNDxJBB-Q",
secure=True
)
# Genshin Impact card creation
async def genshin_card(id, designtype):
async with encbanner.ENC(uid=str(id)) as encard:
return await encard.creat(akasha=True, template=(2 if str(designtype) == "2" else 1))
# Star Rail card creation
async def starrail_card(id, designtype):
async with starrailcard.Card(seeleland=True, remove_logo=True) as card:
return await card.create(id, style=(2 if str(designtype) == "2" else 1))
# Star Rail profile creation
async def starrail_profile(id):
async with starrailcard.Card(remove_logo=True, seeleland=True) as card:
return await card.create_profile(id, style=2)
# Route for Genshin Impact
@app.get("/genshin/{id}")
async def genshin_characters(id: int, design: str = "1"):
try:
result = await genshin_card(id, design)
characters = process_images(result, id)
return JSONResponse(content={'response': characters})
except enkanetwork.exception.VaildateUIDError:
return JSONResponse(content={'error': 'Invalid UID. Please check your UID.'}, status_code=400)
except enkacard.enc_error.ENCardError:
return JSONResponse(content={'error': 'Enable display of the showcase in the game or add characters there.'}, status_code=400)
except Exception as e:
return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
# Route for Star Rail
@app.get("/starrail/{id}")
async def starrail_characters(id: int, design: str = "1"):
try:
result = await starrail_card(id, design)
characters = process_images(result, id)
return JSONResponse(content={'response': characters})
except Exception as e:
return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
# Route for Star Rail profile
@app.get("/starrail/profile/{id}")
async def starrail_profile_route(id: int):
try:
result = await starrail_profile(id)
profile_data = process_profile(result, id)
return JSONResponse(content={'response': profile_data})
except Exception as e:
return JSONResponse(content={'error': 'UNKNOWN ERR: ' + str(e)}, status_code=500)
@app.get("/")
def root():
return "Welcome to the API!"
# Helper function to upload the image to Cloudinary
def upload_image(data, character_id, player_id):
try:
# Set the public_id to include the character ID and the given player ID
public_id = f"{character_id}_{player_id}"
# Upload image to Cloudinary with the specified public_id
upload_result = cloudinary.uploader.upload(data, folder="card_images", public_id=public_id,invalidate = True)
# Get the secure URL of the uploaded image
return upload_result["secure_url"]
except Exception as e:
raise Exception(f"Cloudinary upload error: {str(e)}")
# Process individual image card
def process_image(dt, player_id):
with BytesIO() as byte_io:
dt.card.save(byte_io, "PNG")
byte_io.seek(0)
# Upload the image using the character's ID and player ID
image_url = upload_image(byte_io, dt.id, player_id)
return {
"name": dt.name,
"id": dt.id,
"url": image_url
}
# Process the profile image
def process_profile(profile_card, player_id):
with BytesIO() as byte_io:
profile_card.card.save(byte_io, "PNG")
byte_io.seek(0)
# Use the character ID and player ID for the profile card
image_url = upload_image(byte_io, profile_card.character_id, player_id)
return {
"name": profile_card.character_name,
"url": image_url,
"character_ids": profile_card.character_id
}
# Process all the images returned
def process_images(result, player_id):
characters = []
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(process_image, dt, player_id) for dt in result.card]
for future in concurrent.futures.as_completed(futures):
try:
characters.append(future.result())
except Exception as e:
print(f"Error processing image: {e}")
return characters
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=7860, workers=8, timeout_keep_alive=60000)