add add_contacts, delete_contacts, get_contacts

This commit is contained in:
RussFP 2018-02-20 14:20:34 +03:00
parent 027f843047
commit 7d72738153

View File

@ -45,14 +45,17 @@ from pyrogram.api.errors import (
from pyrogram.api.types import (
User, Chat, Channel,
PeerUser, PeerChannel,
InputUser,
InputPeerEmpty, InputPeerSelf,
InputPeerUser, InputPeerChat, InputPeerChannel
InputPeerUser, InputPeerChat, InputPeerChannel,
InputPhoneContact
)
from pyrogram.crypto import AES
from pyrogram.session import Auth, Session
from pyrogram.session.internals import MsgId
from .input_media import InputMedia
from .style import Markdown, HTML
from typing import List, Union
log = logging.getLogger(__name__)
@ -2306,3 +2309,100 @@ class Client:
self.download_queue.put((media, file_name, done))
done.wait()
def add_contacts(self,
phone: Union[int, str] = None,
first_name: str = None,
last_name: str = None,
input_phone_contact_list: List[InputPhoneContact] = None):
if (phone is None or first_name is None) and \
input_phone_contact_list is None:
log.warning("(phone and first_name) or input_phone_contact_list "
"must be not None")
return None
if phone is not None and first_name is not None:
if str(phone)[0] != '+':
phone = '+' + str(phone)
input_phone_contact_list = []
input_phone_contact = InputPhoneContact(client_id=0,
phone=phone,
first_name=first_name,
last_name=last_name or '')
input_phone_contact_list.append(input_phone_contact)
# make sure that we send only InputPhoneContact
inner_input_phone_contact_list = []
for contact in input_phone_contact_list:
if isinstance(contact, InputPhoneContact):
inner_input_phone_contact_list.append(contact)
imported_contacts = self.send(
functions.contacts.ImportContacts(inner_input_phone_contact_list))
for user in imported_contacts.users:
if isinstance(user, User):
if user.id in self.peers_by_id:
continue
if user.access_hash is None:
continue
input_peer = InputPeerUser(
user_id=user.id,
access_hash=user.access_hash
)
self.peers_by_id[user.id] = input_peer
if user.username is not None:
self.peers_by_username[user.username] = input_peer
return imported_contacts
def delete_contacts(self, _id: int = None,
ids_list: Union[
List[int], List[InputUser]] = None):
if _id is None and ids_list is None:
log.warning('id or ids_list must be not None')
return False
contacts = self.get_contacts()
if _id is not None:
if not isinstance(_id, int):
log.warning('id is not int')
return False
input_user = None
for user in contacts.users:
if isinstance(user, User):
if _id == user.id:
input_user = InputUser(user_id=user.id,
access_hash=user.access_hash)
break
ids_list = [input_user]
inner_ids_list = []
for _id in ids_list:
if isinstance(_id, InputUser):
inner_ids_list.append(_id)
if isinstance(_id, int):
input_user = None
for user in contacts.users:
if isinstance(user, User):
if _id == user.id:
input_user = InputUser(
user_id=user.id,
access_hash=user.access_hash)
break
inner_ids_list.append(input_user)
res = self.send(functions.contacts.DeleteContacts(inner_ids_list))
return res
def get_contacts(self, _hash: int = 0):
return self.send(functions.contacts.GetContacts(_hash))