import asyncio
from mcp_rank import MCPRankClient
async def access_user_gmail(user_id: str):
client = MCPRankClient(
api_key="sk_mcp_rank_...",
user_id=user_id,
)
# Step 1: Initialize - gets MIT from server
await client.initialize()
# Step 2: Check if user has connected Google
try:
info = await client.get_proxy_info("google")
can_access = info.get("can_access", False)
except Exception:
can_access = False
if not can_access:
# Step 3: Create auth session
session = await client.create_auth_session("google")
# Step 4: Display auth URL to user (your UI handles this)
auth_url = session["auth_url"]
session_id = session["session_id"]
# In a web app, you might redirect:
# return RedirectResponse(auth_url)
# In CLI, display the URL:
print(f"Please authenticate: {auth_url}")
# Step 5: Poll for completion
while True:
status = await client.check_session_status(session_id)
if status["status"] == "complete":
print("Authentication successful!")
break
elif status["status"] == "error":
raise Exception(f"Auth failed: {status.get('error')}")
await asyncio.sleep(1)
# Step 6: Refresh client after auth
client._initialized = False
await client.initialize()
# Step 7: Access user data
profile = await client.proxy_get("google", "gmail/v1/users/me/profile")
return profile
# Run it
profile = asyncio.run(access_user_gmail("user_123"))
print(f"Email: {profile['emailAddress']}")