Posts

Showing posts with the label fast api

multiuser websocket fastapi

Image
  from fastapi import APIRouter, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse app = APIRouter() html = """ <!DOCTYPE html> <html>     <head>         <title>Chat</title>     </head>     <body>         <h1>WebSocket Chat</h1>         <h2>Your ID: <span id="ws-id"></span></h2>         <form action="" onsubmit="sendMessage(event)">             <input type="text" id="messageText" autocomplete="off"/>             <button>Send</button>         </form>         <ul id='messages'>   ...

fastapi apis rate limiter

from typing import Union from fastapi import FastAPI, Request from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded limiter = Limiter(key_func=get_remote_address) app = FastAPI() app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) @app.get("/") @limiter.limit("5/minute") def read_root(request: Request):     return {"Hello": "World"}

upload files in fast api

Image
 main.py from typing import Union from fastapi import FastAPI # imprting required modules from fastapi import UploadFile, File, status import shutil, uuid, json from fastapi.staticfiles import StaticFiles from fastapi.exceptions import HTTPException app = FastAPI() # mounting the 'files' folder app.mount('/files', StaticFiles(directory='files'),'files') # upload file into 'files/' folder @app.post("/upload-file") def upload_files(uploaded_file: UploadFile = File(...)):     # will generate a unique file hashname     unique_filename = str(uuid.uuid4()) + "_" + uploaded_file.filename     path = f"files/{unique_filename}"     with open(path, 'w+b') as file:         shutil.copyfileobj(uploaded_file.file, file)     return {         "message": "file uploaded",         'file': unique_filename,         'content': uploaded_file.content_type,       ...

basic CRUD with FASTAPI + MONGODB + beanie

 main.py from typing import Union from fastapi import FastAPI from routes import crud from database.connection import init_db app = FastAPI() @app.on_event("startup") async def connect():     await init_db() app.include_router(crud.router) ----------------------------------------------- routes/crud.py from fastapi import APIRouter, status from database.models import Student from database.schemas import Students, UpdateStudent from bson.objectid import ObjectId router = APIRouter() @router.post("/student/", status_code=status.HTTP_200_OK, tags=['student']) async def create_student(users: Students):     try:         stu_obj = Student(name=users.name, phone=users.phone, email=users.email, address=users.address)         await stu_obj.insert()         return {"status":"record inserted"}     except Exception as e:         print(e)         return {"status":"err...