Spaces:
Running
Running
File size: 3,794 Bytes
3371607 ce6ec74 b8c7707 ce6ec74 58bb8b6 697c68e ce6ec74 145bc51 ce6ec74 58bb8b6 ce6ec74 b8c7707 d2434e0 b8c7707 145bc51 ce6ec74 145bc51 ce6ec74 3371607 b8c7707 3371607 697c68e ce6ec74 77c2a8e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
import os
import base64
import hashlib
import hmac
from fastapi import FastAPI, Query
from fastapi.responses import JSONResponse
from Crypto.Cipher import AES
import requests
from tempfile import NamedTemporaryFile
app = FastAPI()
appInfo = {
"image": b"WhatsApp Image Keys",
"video": b"WhatsApp Video Keys",
"audio": b"WhatsApp Audio Keys",
"document": b"WhatsApp Document Keys",
"image/webp": b"WhatsApp Image Keys",
"image/jpeg": b"WhatsApp Image Keys",
"image/png": b"WhatsApp Image Keys",
"video/mp4": b"WhatsApp Video Keys",
"audio/aac": b"WhatsApp Audio Keys",
"audio/ogg": b"WhatsApp Audio Keys",
"audio/wav": b"WhatsApp Audio Keys",
}
extension = {
"image": "jpg",
"video": "mp4",
"audio": "ogg",
"document": "bin",
}
# HKDF Key Derivation Function
def HKDF(key, length, appInfo=b""):
key = hmac.new(b"\0"*32, key, hashlib.sha256).digest()
keyStream = b""
keyBlock = b""
blockIndex = 1
while len(keyStream) < length:
keyBlock = hmac.new(
key,
msg=keyBlock+appInfo + (chr(blockIndex).encode("utf-8")),
digestmod=hashlib.sha256).digest()
blockIndex += 1
keyStream += keyBlock
return keyStream[:length]
# AES Decryption Function
def AESUnpad(s):
return s[:-ord(s[len(s)-1:])]
def AESDecrypt(key, ciphertext, iv):
cipher = AES.new(key, AES.MODE_CBC, iv)
plaintext = cipher.decrypt(ciphertext)
return AESUnpad(plaintext)
# Decrypt and Upload Endpoint
@app.get("/decrypt_and_upload")
async def decrypt_and_upload(
url: str = Query(..., description="URL of the encrypted file"),
key: str = Query(..., description="Base64-encoded decryption key"),
media_type: str = Query("image", description="Media type (image, video, audio, document)")
):
try:
# Download file data from the URL
response = requests.get(url, stream=True)
if response.status_code != 200:
return JSONResponse(content={"error": "Failed to download the file."}, status_code=400)
mediaData = response.content
mediaKey = base64.b64decode(key)
# Derive decryption keys
mediaKeyExpanded = HKDF(mediaKey, 112, appInfo[media_type])
decrypted_data = AESDecrypt(
mediaKeyExpanded[16:48], mediaData[:-10], mediaKeyExpanded[:16]
)
# Save decrypted data to a temporary file
with NamedTemporaryFile(delete=False, suffix=f".{extension.get(media_type, 'bin')}") as temp_file:
temp_file.write(decrypted_data)
temp_file_path = temp_file.name
# Upload decrypted data to Uguu.se
with open(temp_file_path, 'rb') as file:
upload_response = requests.post("https://uguu.se/upload", files={'files[]': file})
# Clean up temporary file
os.remove(temp_file_path)
if upload_response.status_code != 200:
return JSONResponse(content={"error": "Failed to upload the file to Uguu.se."}, status_code=500)
# Extract direct link
upload_result = upload_response.json()
direct_link = upload_result.get("files", [{}])[0].get("url")
if not direct_link:
return JSONResponse(content={"error": "Direct link not found in Uguu.se response."}, status_code=500)
# Return direct download link and response
return JSONResponse(content={"direct_file_url": direct_link, "full_response": upload_result})
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
# Root endpoint
@app.get("/")
def root():
return "Welcome to the WhatsApp Media Decryption API with Uguu.se Upload"
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=7861)
|