Spaces:
Running
Running
import os | |
import base64 | |
import hashlib | |
import hmac | |
from fastapi import FastAPI, Query | |
from fastapi.responses import JSONResponse | |
from Crypto.Cipher import AES | |
import requests | |
from tempfile import NamedTemporaryFile | |
app = FastAPI() | |
appInfo = { | |
"image": b"WhatsApp Image Keys", | |
"video": b"WhatsApp Video Keys", | |
"audio": b"WhatsApp Audio Keys", | |
"document": b"WhatsApp Document Keys", | |
"image/webp": b"WhatsApp Image Keys", | |
"image/jpeg": b"WhatsApp Image Keys", | |
"image/png": b"WhatsApp Image Keys", | |
"video/mp4": b"WhatsApp Video Keys", | |
"audio/aac": b"WhatsApp Audio Keys", | |
"audio/ogg": b"WhatsApp Audio Keys", | |
"audio/wav": b"WhatsApp Audio Keys", | |
} | |
extension = { | |
"image": "jpg", | |
"video": "mp4", | |
"audio": "ogg", | |
"document": "bin", | |
} | |
# HKDF Key Derivation Function | |
def HKDF(key, length, appInfo=b""): | |
key = hmac.new(b"\0"*32, key, hashlib.sha256).digest() | |
keyStream = b"" | |
keyBlock = b"" | |
blockIndex = 1 | |
while len(keyStream) < length: | |
keyBlock = hmac.new( | |
key, | |
msg=keyBlock+appInfo + (chr(blockIndex).encode("utf-8")), | |
digestmod=hashlib.sha256).digest() | |
blockIndex += 1 | |
keyStream += keyBlock | |
return keyStream[:length] | |
# AES Decryption Function | |
def AESUnpad(s): | |
return s[:-ord(s[len(s)-1:])] | |
def AESDecrypt(key, ciphertext, iv): | |
cipher = AES.new(key, AES.MODE_CBC, iv) | |
plaintext = cipher.decrypt(ciphertext) | |
return AESUnpad(plaintext) | |
# Decrypt and Upload Endpoint | |
async def decrypt_and_upload( | |
url: str = Query(..., description="URL of the encrypted file"), | |
key: str = Query(..., description="Base64-encoded decryption key"), | |
media_type: str = Query("image", description="Media type (image, video, audio, document)") | |
): | |
try: | |
# Download file data from the URL | |
response = requests.get(url, stream=True) | |
if response.status_code != 200: | |
return JSONResponse(content={"error": "Failed to download the file."}, status_code=400) | |
mediaData = response.content | |
mediaKey = base64.b64decode(key) | |
# Derive decryption keys | |
mediaKeyExpanded = HKDF(mediaKey, 112, appInfo[media_type]) | |
decrypted_data = AESDecrypt( | |
mediaKeyExpanded[16:48], mediaData[:-10], mediaKeyExpanded[:16] | |
) | |
# Save decrypted data to a temporary file | |
with NamedTemporaryFile(delete=False, suffix=f".{extension.get(media_type, 'bin')}") as temp_file: | |
temp_file.write(decrypted_data) | |
temp_file_path = temp_file.name | |
# Upload decrypted data to Uguu.se | |
with open(temp_file_path, 'rb') as file: | |
upload_response = requests.post("https://uguu.se/upload", files={'files[]': file}) | |
# Clean up temporary file | |
os.remove(temp_file_path) | |
if upload_response.status_code != 200: | |
return JSONResponse(content={"error": "Failed to upload the file to Uguu.se."}, status_code=500) | |
# Extract direct link | |
upload_result = upload_response.json() | |
direct_link = upload_result.get("files", [{}])[0].get("url") | |
if not direct_link: | |
return JSONResponse(content={"error": "Direct link not found in Uguu.se response."}, status_code=500) | |
# Return direct download link and response | |
return JSONResponse(content={"direct_file_url": direct_link, "full_response": upload_result}) | |
except Exception as e: | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
# Root endpoint | |
def root(): | |
return "Welcome to the WhatsApp Media Decryption API with Uguu.se Upload" | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run("main:app", host="0.0.0.0", port=7861) | |