Why pymessenger here?

parent a02dc146
import json
import six
from .bot import Bot
class Element(dict):
__acceptable_keys = ['title', 'item_url', 'image_url', 'subtitle', 'buttons']
def __init__(self, *args, **kwargs):
if six.PY2:
kwargs = {k: v for k, v in kwargs.iteritems() if k in self.__acceptable_keys}
else:
kwargs = {k: v for k, v in kwargs.items() if k in self.__acceptable_keys}
super(Element, self).__init__(*args, **kwargs)
def to_json(self):
return json.dumps({k: v for k, v in self.iteritems() if k in self.__acceptable_keys})
class Button(dict):
# TODO: Decide if this should do more
pass
import json
import requests
from requests_toolbelt import MultipartEncoder
from pymessenger.graph_api import FacebookGraphApi
class Bot(FacebookGraphApi):
def __init__(self, *args, **kwargs):
super(Bot, self).__init__(*args, **kwargs)
def send_text_message(self, recipient_id, message):
"""Send text messages to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/text-message
Input:
recipient_id: recipient id to send to
message: message to send
Output:
Response from API as <dict>
"""
payload = {
'recipient': {
'id': recipient_id
},
'message': {
'text': message
}
}
return self.send_raw(payload)
def send_message(self, recipient_id, message):
"""Send text messages to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/text-message
Input:
recipient_id: recipient id to send to
message: raw message to send
Output:
Response from API as <dict>
"""
payload = {
'recipient': {
'id': recipient_id
},
'message': message
}
return self.send_raw(payload)
def send_generic_message(self, recipient_id, elements):
"""Send generic messages to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/generic-template
Input:
recipient_id: recipient id to send to
elements: generic message elements to send
Output:
Response from API as <dict>
"""
payload = {
'recipient': {
'id': recipient_id
},
'message': {
"attachment": {
"type": "template",
"payload": {
"template_type": "generic",
"elements": elements
}
}
}
}
return self.send_raw(payload)
def send_button_message(self, recipient_id, text, buttons):
"""Send text messages to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/button-template
Input:
recipient_id: recipient id to send to
text: text of message to send
buttons: buttons to send
Output:
Response from API as <dict>
"""
payload = {
'recipient': {
'id': recipient_id
},
'message': {
"attachment": {
"type": "template",
"payload": {
"template_type": "button",
"text": text,
"buttons": buttons
}
}
}
}
return self.send_raw(payload)
def send_image(self, recipient_id, image_path):
"""Send an image to the specified recipient.
Image must be PNG or JPEG or GIF (more might be supported).
https://developers.facebook.com/docs/messenger-platform/send-api-reference/image-attachment
Input:
recipient_id: recipient id to send to
image_path: path to image to be sent
Output:
Response from API as <dict>
"""
payload = {
'recipient': json.dumps(
{
'id': recipient_id
}
),
'message': json.dumps(
{
'attachment': {
'type': 'image',
'payload': {}
}
}
),
'filedata': (image_path, open(image_path, 'rb'))
}
multipart_data = MultipartEncoder(payload)
multipart_header = {
'Content-Type': multipart_data.content_type
}
return requests.post(self.base_url, data=multipart_data, headers=multipart_header).json()
def send_image_url(self, recipient_id, image_url):
"""Send an image to specified recipient using URL.
Image must be PNG or JPEG or GIF (more might be supported).
https://developers.facebook.com/docs/messenger-platform/send-api-reference/image-attachment
Input:
recipient_id: recipient id to send to
image_url: url of image to be sent
Output:
Response from API as <dict>
"""
payload = {
'recipient': json.dumps(
{
'id': recipient_id
}
),
'message': json.dumps(
{
'attachment': {
'type': 'image',
'payload': {
'url': image_url
}
}
}
)
}
return self.send_raw(payload)
def send_action(self, recipient_id, action):
"""Send typing indicators or send read receipts to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/sender-actions
Input:
recipient_id: recipient id to send to
action: action type (mark_seen, typing_on, typing_off)
Output:
Response from API as <dict>
"""
payload = {
'recipient': {
'id': recipient_id
},
'sender_action': action
}
return self.send_raw(payload)
def send_audio(self, recipient_id, audio_path):
"""Send audio to the specified recipient.
Audio must be MP3 or WAV
https://developers.facebook.com/docs/messenger-platform/send-api-reference/audio-attachment
Input:
recipient_id: recipient id to send to
audio_path: path to audio to be sent
Output:
Response from API as <dict>
"""
payload = {
'recipient': json.dumps(
{
'id': recipient_id
}
),
'message': json.dumps(
{
'attachment': {
'type': 'audio',
'payload': {}
}
}
),
'filedata': (audio_path, open(audio_path, 'rb'))
}
multipart_data = MultipartEncoder(payload)
multipart_header = {
'Content-Type': multipart_data.content_type
}
return requests.post(self.base_url, data=multipart_data, headers=multipart_header).json()
def send_audio_url(self, recipient_id, audio_url):
"""Send audio to specified recipient using URL.
Audio must be MP3 or WAV
https://developers.facebook.com/docs/messenger-platform/send-api-reference/audio-attachment
Input:
recipient_id: recipient id to send to
audio_url: url of audio to be sent
Output:
Response from API as <dict>
"""
payload = {
'recipient': json.dumps(
{
'id': recipient_id
}
),
'message': json.dumps(
{
'attachment': {
'type': 'audio',
'payload': {
'url': audio_url
}
}
}
)
}
return self.send_raw(payload)
def send_video(self, recipient_id, video_path):
"""Send video to the specified recipient.
Video should be MP4 or MOV, but supports more (https://www.facebook.com/help/218673814818907).
https://developers.facebook.com/docs/messenger-platform/send-api-reference/video-attachment
Input:
recipient_id: recipient id to send to
video_path: path to video to be sent
Output:
Response from API as <dict>
"""
payload = {
'recipient': json.dumps(
{
'id': recipient_id
}
),
'message': json.dumps(
{
'attachment': {
'type': 'video',
'payload': {}
}
}
),
'filedata': (video_path, open(video_path, 'rb'))
}
multipart_data = MultipartEncoder(payload)
multipart_header = {
'Content-Type': multipart_data.content_type
}
return requests.post(self.base_url, data=multipart_data, headers=multipart_header).json()
def send_video_url(self, recipient_id, video_url):
"""Send video to specified recipient using URL.
Video should be MP4 or MOV, but supports more (https://www.facebook.com/help/218673814818907).
https://developers.facebook.com/docs/messenger-platform/send-api-reference/video-attachment
Input:
recipient_id: recipient id to send to
video_url: url of video to be sent
Output:
Response from API as <dict>
"""
payload = {
'recipient': json.dumps(
{
'id': recipient_id
}
),
'message': json.dumps(
{
'attachment': {
'type': 'video',
'payload': {
'url': video_url
}
}
}
)
}
return self.send_raw(payload)
def send_file(self, recipient_id, file_path):
"""Send file to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/file-attachment
Input:
recipient_id: recipient id to send to
file_path: path to file to be sent
Output:
Response from API as <dict>
"""
payload = {
'recipient': json.dumps(
{
'id': recipient_id
}
),
'message': json.dumps(
{
'attachment': {
'type': 'file',
'payload': {}
}
}
),
'filedata': (file_path, open(file_path, 'rb'))
}
multipart_data = MultipartEncoder(payload)
multipart_header = {
'Content-Type': multipart_data.content_type
}
return requests.post(self.base_url, data=multipart_data, headers=multipart_header).json()
def send_file_url(self, recipient_id, file_url):
"""Send file to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/file-attachment
Input:
recipient_id: recipient id to send to
file_url: url of file to be sent
Output:
Response from API as <dict>
"""
payload = {
'recipient': json.dumps(
{
'id': recipient_id
}
),
'message': json.dumps(
{
'attachment': {
'type': 'file',
'payload': {
'url': file_url
}
}
}
)
}
return self.send_raw(payload)
def send_raw(self, payload):
request_endpoint = '{0}/me/messages'.format(self.graph_url)
response = requests.post(
request_endpoint,
params=self.auth_args,
json=payload
)
result = response.json()
return result
def _send_payload(self, payload):
""" Deprecated, use send_raw instead """
return self.send_raw(payload)
import pymessenger.utils as utils
DEFAULT_API_VERSION = 2.6
class FacebookGraphApi(object):
def __init__(self, access_token, **kwargs):
'''
@required:
access_token
@optional:
api_version
app_secret
'''
self.api_version = kwargs.get('api_version') or DEFAULT_API_VERSION
self.app_secret = kwargs.get('app_secret')
self.graph_url = 'https://graph.facebook.com/v{0}'.format(self.api_version)
self.access_token = access_token
@property
def auth_args(self):
if not hasattr(self, '_auth_args'):
auth = {
'access_token': self.access_token
}
if self.app_secret is not None:
appsecret_proof = utils.generate_appsecret_proof(self.access_token, self.app_secret)
auth['appsecret_proof'] = appsecret_proof
self._auth_args = auth
return self._auth_args
import requests
from pymessenger.graph_api import FacebookGraphApi
class UserProfileApi(FacebookGraphApi):
def get(self, user_id, fields=None):
params = {}
if fields is not None and isinstance(fields, (list, tuple)):
params['fields'] = ",".join(fields)
params.update(self.auth_args)
request_endpoint = '{0}/{1}'.format(self.graph_url, user_id)
response = requests.get(request_endpoint, params=params)
if response.status_code == 200:
user_profile = response.json()
return user_profile
return None
import hashlib
import hmac
import six
def validate_hub_signature(app_secret, request_payload, hub_signature_header):
"""
@inputs:
app_secret: Secret Key for application
request_payload: request body
hub_signature_header: X-Hub-Signature header sent with request
@outputs:
boolean indicated that hub signature is validated
"""
try:
hash_method, hub_signature = hub_signature_header.split('=')
except:
pass
else:
digest_module = getattr(hashlib, hash_method)
hmac_object = hmac.new(str(app_secret), unicode(request_payload), digest_module)
generated_hash = hmac_object.hexdigest()
if hub_signature == generated_hash:
return True
return False
def generate_appsecret_proof(access_token, app_secret):
"""
@inputs:
access_token: page access token
app_secret_token: app secret key
@outputs:
appsecret_proof: HMAC-SHA256 hash of page access token
using app_secret as the key
"""
if six.PY2:
hmac_object = hmac.new(str(app_secret), unicode(access_token), hashlib.sha256)
else:
hmac_object = hmac.new(bytearray(app_secret, 'utf8'), str(access_token).encode('utf8'), hashlib.sha256)
generated_hash = hmac_object.hexdigest()
return generated_hash
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment