videoupload / main.py
tachibanaa710's picture
Update main.py
b8c7707 verified
import os
import base64
import hashlib
import hmac
from fastapi import FastAPI, Query
from fastapi.responses import JSONResponse
from Crypto.Cipher import AES
import requests
from tempfile import NamedTemporaryFile
app = FastAPI()
appInfo = {
"image": b"WhatsApp Image Keys",
"video": b"WhatsApp Video Keys",
"audio": b"WhatsApp Audio Keys",
"document": b"WhatsApp Document Keys",
"image/webp": b"WhatsApp Image Keys",
"image/jpeg": b"WhatsApp Image Keys",
"image/png": b"WhatsApp Image Keys",
"video/mp4": b"WhatsApp Video Keys",
"audio/aac": b"WhatsApp Audio Keys",
"audio/ogg": b"WhatsApp Audio Keys",
"audio/wav": b"WhatsApp Audio Keys",
}
extension = {
"image": "jpg",
"video": "mp4",
"audio": "ogg",
"document": "bin",
}
# HKDF Key Derivation Function
def HKDF(key, length, appInfo=b""):
key = hmac.new(b"\0"*32, key, hashlib.sha256).digest()
keyStream = b""
keyBlock = b""
blockIndex = 1
while len(keyStream) < length:
keyBlock = hmac.new(
key,
msg=keyBlock+appInfo + (chr(blockIndex).encode("utf-8")),
digestmod=hashlib.sha256).digest()
blockIndex += 1
keyStream += keyBlock
return keyStream[:length]
# AES Decryption Function
def AESUnpad(s):
return s[:-ord(s[len(s)-1:])]
def AESDecrypt(key, ciphertext, iv):
cipher = AES.new(key, AES.MODE_CBC, iv)
plaintext = cipher.decrypt(ciphertext)
return AESUnpad(plaintext)
# Decrypt and Upload Endpoint
@app.get("/decrypt_and_upload")
async def decrypt_and_upload(
url: str = Query(..., description="URL of the encrypted file"),
key: str = Query(..., description="Base64-encoded decryption key"),
media_type: str = Query("image", description="Media type (image, video, audio, document)")
):
try:
# Download file data from the URL
response = requests.get(url, stream=True)
if response.status_code != 200:
return JSONResponse(content={"error": "Failed to download the file."}, status_code=400)
mediaData = response.content
mediaKey = base64.b64decode(key)
# Derive decryption keys
mediaKeyExpanded = HKDF(mediaKey, 112, appInfo[media_type])
decrypted_data = AESDecrypt(
mediaKeyExpanded[16:48], mediaData[:-10], mediaKeyExpanded[:16]
)
# Save decrypted data to a temporary file
with NamedTemporaryFile(delete=False, suffix=f".{extension.get(media_type, 'bin')}") as temp_file:
temp_file.write(decrypted_data)
temp_file_path = temp_file.name
# Upload decrypted data to Uguu.se
with open(temp_file_path, 'rb') as file:
upload_response = requests.post("https://uguu.se/upload", files={'files[]': file})
# Clean up temporary file
os.remove(temp_file_path)
if upload_response.status_code != 200:
return JSONResponse(content={"error": "Failed to upload the file to Uguu.se."}, status_code=500)
# Extract direct link
upload_result = upload_response.json()
direct_link = upload_result.get("files", [{}])[0].get("url")
if not direct_link:
return JSONResponse(content={"error": "Direct link not found in Uguu.se response."}, status_code=500)
# Return direct download link and response
return JSONResponse(content={"direct_file_url": direct_link, "full_response": upload_result})
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
# Root endpoint
@app.get("/")
def root():
return "Welcome to the WhatsApp Media Decryption API with Uguu.se Upload"
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=7861)