PagerMaid-Pyro/pagermaid/modules/mixpanel.py

234 lines
7.1 KiB
Python
Raw Permalink Normal View History

2022-09-07 08:16:37 +00:00
import contextlib
import datetime
import json
import random
2022-09-07 08:16:37 +00:00
import time
import uuid
from asyncio import sleep
from typing import Union
from pyrogram.raw.functions.messages import (
GetSponsoredMessages,
ViewSponsoredMessage,
ClickSponsoredMessage,
)
from pyrogram.raw.types import InputPeerChannel
from pyrogram.raw.types.messages import SponsoredMessages, SponsoredMessagesEmpty
2022-09-07 08:16:37 +00:00
from pagermaid.config import Config
2022-08-01 16:04:45 +00:00
from pagermaid.enums import Client, Message
from pagermaid.hook import Hook
from pagermaid.services import client as request, scheduler, bot as userbot
from pagermaid.utils import logs
2022-08-01 16:04:45 +00:00
2022-09-07 08:16:37 +00:00
class DatetimeSerializer(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
2023-03-12 03:56:01 +00:00
fmt = "%Y-%m-%dT%H:%M:%S"
2022-09-07 08:16:37 +00:00
return obj.strftime(fmt)
return json.JSONEncoder.default(self, obj)
class Mixpanel:
def __init__(self, token: str):
self._token = token
self._serializer = DatetimeSerializer
self._request = request
self.api_host = "api.mixpanel.com"
self.is_people_set = False
2022-09-07 08:16:37 +00:00
@staticmethod
def _now():
return time.time()
@staticmethod
def _make_insert_id():
return uuid.uuid4().hex
@staticmethod
def json_dumps(data, cls=None):
# Separators are specified to eliminate whitespace.
2023-03-12 03:56:01 +00:00
return json.dumps(data, separators=(",", ":"), cls=cls)
2022-09-07 08:16:37 +00:00
async def api_call(self, endpoint, json_message):
_endpoints = {
2023-03-12 03:56:01 +00:00
"events": f"https://{self.api_host}/track",
"people": f"https://{self.api_host}/engage",
2022-09-07 08:16:37 +00:00
}
request_url = _endpoints.get(endpoint)
if request_url is None:
return
params = {
2023-03-12 03:56:01 +00:00
"data": json_message,
"verbose": 1,
"ip": 0,
2022-09-07 08:16:37 +00:00
}
start = self._now()
with contextlib.suppress(Exception):
2023-03-12 03:56:01 +00:00
await self._request.post(request_url, data=params, timeout=10.0)
2022-09-07 08:16:37 +00:00
logs.debug(f"Mixpanel request took {self._now() - start} seconds")
2024-02-05 13:16:25 +00:00
async def people_set(
self, distinct_id: str, properties: dict, force_update: bool = False
):
if self.is_people_set and (not force_update):
return
2022-09-07 08:16:37 +00:00
message = {
2023-03-12 03:56:01 +00:00
"$distinct_id": distinct_id,
"$set": properties,
2022-09-07 08:16:37 +00:00
}
2023-03-12 03:56:01 +00:00
record = {"$token": self._token, "$time": self._now()}
2022-09-15 08:25:33 +00:00
# sourcery skip: dict-assign-update-to-union
record.update(message)
res = await self.api_call(
2023-03-12 03:56:01 +00:00
"people", self.json_dumps(record, cls=self._serializer)
)
self.is_people_set = True
return res
2022-09-07 08:16:37 +00:00
async def track(self, distinct_id: str, event_name: str, properties: dict):
all_properties = {
2023-03-12 03:56:01 +00:00
"token": self._token,
"distinct_id": distinct_id,
"time": self._now(),
"$insert_id": self._make_insert_id(),
"mp_lib": "python",
"$lib_version": "4.10.0",
2022-09-07 08:16:37 +00:00
}
if properties:
2022-09-15 08:25:33 +00:00
# sourcery skip: dict-assign-update-to-union
all_properties.update(properties)
2022-09-07 08:16:37 +00:00
event = {
2023-03-12 03:56:01 +00:00
"event": event_name,
"properties": all_properties,
2022-09-07 08:16:37 +00:00
}
2023-03-12 03:56:01 +00:00
return await self.api_call(
"events", self.json_dumps(event, cls=self._serializer)
)
2022-09-07 08:16:37 +00:00
mp = Mixpanel(Config.MIXPANEL_API)
2022-08-01 16:04:45 +00:00
async def set_people(bot: Client, force_update: bool = False):
if not Config.ALLOW_ANALYTIC:
return
if mp.is_people_set and (not force_update):
return
2022-09-07 08:16:37 +00:00
if not bot.me:
bot.me = await bot.get_me()
data = {"$first_name": bot.me.first_name, "is_premium": bot.me.is_premium}
2022-09-07 08:16:37 +00:00
if bot.me.username:
data["username"] = bot.me.username
bot.loop.create_task(mp.people_set(str(bot.me.id), data, force_update=force_update))
@Hook.on_startup()
async def mixpanel_init_id(bot: Client):
if not Config.ALLOW_ANALYTIC:
return
await set_people(bot)
await log_sponsored_clicked()
2022-08-01 16:04:45 +00:00
@Hook.command_postprocessor()
async def mixpanel_report(bot: Client, message: Message, command, sub_command):
2022-08-01 16:04:45 +00:00
if not Config.ALLOW_ANALYTIC:
return
await set_people(bot)
2022-09-07 08:16:37 +00:00
if not bot.me:
bot.me = await bot.get_me()
2022-08-01 16:04:45 +00:00
sender_id = message.from_user.id if message.from_user else ""
sender_id = message.sender_chat.id if message.sender_chat else sender_id
if sender_id < 0 and message.outgoing:
2022-09-07 08:16:37 +00:00
sender_id = bot.me.id
properties = {"command": command, "bot_id": bot.me.id}
if sub_command:
properties["sub_command"] = sub_command
2023-03-12 03:56:01 +00:00
bot.loop.create_task(
mp.track(
str(sender_id),
f"Function {command}",
properties,
2023-03-12 03:56:01 +00:00
)
)
async def get_sponsored(
bot: Client, channel: "InputPeerChannel"
) -> Union["SponsoredMessages", "SponsoredMessagesEmpty"]:
result = await bot.invoke(GetSponsoredMessages(peer=channel))
logs.debug(f"Get sponsored messages: {type(result)}")
return result
async def read_sponsored(
bot: Client, channel: "InputPeerChannel", random_id: bytes
) -> bool:
result = await bot.invoke(
ViewSponsoredMessage(peer=channel, random_id=random_id)
)
if result:
bot.loop.create_task(
mp.track(
str(bot.me.id),
"Sponsored Read",
{"channel_id": channel.channel_id, "bot_id": bot.me.id},
)
)
logs.debug(f"Read sponsored message {random_id}: {result}")
return result
async def click_sponsored(
bot: Client, channel: "InputPeerChannel", random_id: bytes
) -> bool:
result = await bot.invoke(
ClickSponsoredMessage(peer=channel, random_id=random_id)
)
if result:
bot.loop.create_task(
mp.track(
str(bot.me.id),
"Sponsored Click",
{"channel_id": channel.channel_id, "bot_id": bot.me.id},
)
)
logs.debug(f"Click sponsored message {random_id}: {result}")
return result
async def log_sponsored_clicked_one(username: str):
channel = await userbot.resolve_peer(username)
sponsored = await get_sponsored(userbot, channel)
if isinstance(sponsored, SponsoredMessagesEmpty):
return
for message in sponsored.messages:
await sleep(random.randint(1, 5))
if message.random_id:
with contextlib.suppress(Exception):
await read_sponsored(userbot, channel, message.random_id)
await sleep(random.randint(1, 5))
with contextlib.suppress(Exception):
await click_sponsored(userbot, channel, message.random_id)
async def log_sponsored_clicked():
add_log_sponsored_clicked_task()
if not Config.ALLOW_ANALYTIC:
return
await set_people(userbot)
if not userbot.me:
userbot.me = await userbot.get_me()
if (not userbot.me) or userbot.me.is_premium:
return
for username in ["PagerMaid_Modify"]:
await log_sponsored_clicked_one(username)
def add_log_sponsored_clicked_task():
# run random time between 1 and 5 hours
scheduler.add_job(log_sponsored_clicked, "interval", hours=random.randint(1, 5))