Replace config process & improve oauth

* Replaces config.ini with config.json
  and initiates a workflow to check for
  presence of config file. If not ask
  user to create it and stores it in
  HOME/USER/.config/inopy directory

* Improves oauth access token obtention
  with Flask framework depending on
  production or development status

* Improves also oauth refresh workflow
  when access token has expired
master
Alexandre Racine 2023-06-10 17:55:50 +02:00
parent b3d77eb3ab
commit f647a4fdee
5 changed files with 339 additions and 153 deletions

115
config.py Normal file
View File

@ -0,0 +1,115 @@
import json
import os
def get_config(config_path, config_file_path):
if os.path.exists(config_path):
if os.path.exists(config_file_path):
pass
else:
create_file(config_file_path)
else:
os.mkdir(config_path)
print(f'{config_path} created!')
create_file(config_file_path)
# Load the config file
with open(config_file_path) as config_file:
config = json.load(config_file)
bearer = config['oauth']['bearer']
refresh_token = config['oauth']['refresh_token']
endpoint = config['oauth']['endpoint']
client_id = config['oauth']['client_id']
client_secret = config['oauth']['client_secret']
callback = config['oauth']['callback']
scope = config['oauth']['scope']
csrf = config['oauth']['csrf']
home_url = config['oauth']['home_url']
unread_counts_url = config['inoapi']['unread_counts_url']
feeds_list_url = config['inoapi']['feeds_list_url']
summary = config['notification']['summary']
singular_article = config['notification']['singular_article']
plural_articles = config['notification']['plural_articles']
prod_status = config['prod']['status']
browser_path = config['prod']['browser_path']
host = config['prod']['host']
port = config['prod']['port']
variables = locals()
return variables
def create_file(config_file_path):
config = {}
# Ask user for input
print("\nEnter details about OAuth authentication: \n")
endpoint = input("Enter OAuth endpoint: ")
client_id = input("Enter your client id: ")
client_secret = input("Enter your client secret: ")
callback = input("Enter your callback URL: ")
scope = input("Enter the API scope (e.g. read OR read write): ")
print("\nEnter details about Inoreader API: \n")
unread_counts_url = input("Enter URL for unread articles: ")
feeds_list_url = input("Enter URL for feeds lists: ")
print("\nEnter details about notification message: \n")
summary = input("Enter summary (title) for notification: ")
singular_article = input("Enter singular label if there is only one unread article (e.g. new article in feed): ")
plural_articles = input("Enter plural label if there are many unread articles (e.g. new articles in feed): ")
# Create nested JSON structure
config["oauth"] = {
"bearer": "",
"refresh_token": "",
"endpoint": endpoint,
"client_id": client_id,
"client_secret": client_secret,
"callback": callback,
"scope": scope,
"csrf": "4902358490258",
"home_url": "http://localhost:5000"
}
config["inoapi"] = {
"unread_counts_url": unread_counts_url,
"feeds_list_url": feeds_list_url
}
config["notification"] = {
"summary": summary,
"singular_article": singular_article,
"plural_articles": plural_articles
}
config["prod"] = {
"status": "true",
"browser_path": "/usr/bin/firefox",
"host": "0.0.0.0",
"port": "5000"
}
# Save config to a file
with open(config_file_path, "w") as file:
json.dump(config, file, indent=4)
print(f"{config_file_path} created successfully!")
def config():
config_path = os.path.join(os.environ['HOME'], '.config/inopy')
config_file = 'config.json'
config_file_path = os.path.join(config_path, config_file)
data = get_config(config_path, config_file_path)
return data

72
ino.py
View File

@ -30,20 +30,13 @@
import requests
import json
import configparser
import refresh
import notif
import time
import sys
import os
from config import config
from test2 import app, run_app
# Configuration parser for reading the config file
config = configparser.ConfigParser()
config.read('config.ini')
# Initiate Summary and message for the notification
summary = config.get('Notification', 'summary')
message = ""
from refresh import refresh
"""
Read the configuration file.
@ -56,12 +49,11 @@ message = ""
and send a GET request to the API.
"""
def APIrequest(endpoint):
bearer = config.get('Oauth', 'bearer')
def APIrequest(url, bearer):
bearer_string = 'Bearer {}'.format(bearer)
headers = {'Authorization': bearer_string}
url = config.get('InoAPI', endpoint)
response = requests.get(url, headers=headers)
print(response.status_code)
return response
# Parse the response as JSON
@ -75,15 +67,22 @@ def getData(response):
in the config
"""
def replace():
(refreshed_bearer, new_refresh_token) = refresh.refresh()
config.set('Oauth', 'bearer', refreshed_bearer)
config.set('Oauth', 'refresh_token', new_refresh_token)
with open('config.ini', 'w') as config_file:
config.write(config_file)
message = ""
config = config()
bearer = config['bearer']
unread_counts_url = config['unread_counts_url']
feeds_list_url = config['feeds_list_url']
config_path = config['config_file_path']
endpoint = config['endpoint']
client_id = config['client_id']
client_secret = config['client_secret']
refresh_token = config['refresh_token']
summary = config['summary']
# Make a request to get unread counts
unread_response = APIrequest('unread_counts_url')
unread_response = APIrequest(unread_counts_url, bearer)
"""
If unauthorized (401) status code
@ -91,14 +90,27 @@ unread_response = APIrequest('unread_counts_url')
and make a new request with the updated token.
"""
if unread_response.status_code == 401:
replace()
unread_response = APIrequest('unread_counts_url')
elif unread_response.status_code == 403:
if unread_response.status_code == 403:
print(unread_response.status_code)
run_app()
config.read('config.ini')
unread_response = APIrequest('unread_counts_url')
#new_config = config()
#bearer = new_config['bearer']
with open(config_path) as config_file:
new_config = json.load(config_file)
bearer = new_config['oauth']['bearer']
print(bearer)
unread_response = APIrequest(unread_counts_url, bearer)
print(unread_response.text)
elif unread_response.status_code == 401:
refresh(config_path, endpoint, client_id, client_secret, refresh_token)
#new_config = config()
#bearer = new_config['bearer']
with open(config_path) as config_file:
new_config = json.load(config_file)
bearer = new_config['oauth']['bearer']
print(bearer)
unread_response = APIrequest(unread_counts_url, bearer)
elif unread_response.status_code == 200:
pass
@ -109,7 +121,7 @@ elif unread_response.status_code == 200:
Parse the unread counts data
"""
feeds_list_response = APIrequest('feeds_list_url')
feeds_list_response = APIrequest(feeds_list_url, bearer)
print(feeds_list_response)
feeds_list_data = getData(feeds_list_response)
unread_data = getData(unread_response)
@ -155,9 +167,9 @@ for item in unread_data['unreadcounts']:
"""
if count == 1:
new_articles = config.get('Notification', 'singular_article')
new_articles = config['singular_article']
else:
new_articles = config.get('Notification', 'plural_articles')
new_articles = config['plural_articles']
count = str(count)
message = message + count + " " + new_articles + " " + ID + "\n"
else:

155
oauth.py Normal file
View File

@ -0,0 +1,155 @@
from flask import Flask, request, redirect, render_template
from config import config
from waitress import serve
import requests
import os
import signal
import webbrowser
import time
import json
import subprocess
import threading
config = config()
endpoint = config['endpoint']
client_id = config['client_id']
client_secret = config['client_secret']
callback = config['callback']
scope = config['scope']
CSRF = config['csrf']
home_url = config['home_url']
prod_status = config['prod_status']
browser_path = config['browser_path']
host = config['host']
port = config['port']
config_file_path = config['config_file_path']
url = 'https://www.inoreader.com/oauth2/auth?client_id={}&redirect_uri={}&response_type=code&scope={}&state={}'.format(client_id, callback, scope, CSRF)
app = Flask(__name__)
@app.route('/')
def index():
return redirect(url)
@app.route('/oauth-callback')
def oauth_callback():
# Get the authorization code from the request URL
authorization_code = request.args.get('code')
csrf_check = request.args.get('state')
error_param = request.args.get('error')
csrf = True if csrf_check == CSRF else False
error = True if error_param != None else False
if csrf == True and error != True:
# Exchange the authorization code for an access token
access_token_url = endpoint
payload = {
'grant_type': 'authorization_code',
'code': authorization_code,
'client_id': client_id,
'client_secret': client_secret,
'redirect_uri': callback
}
response = requests.post(access_token_url, data=payload)
# Parse the response to get the access token
if response.status_code == 200:
access_token = response.json()['access_token']
refresh_token = response.json()['refresh_token']
with open(config_file_path, 'r+') as config_file:
# Load the JSON data from the file
config = json.load(config_file)
# Update the token value in the config data
config['oauth']['bearer'] = access_token
config['oauth']['refresh_token'] = refresh_token
# Move the file pointer back to the beginning of the file
config_file.seek(0)
# Write the updated config data to the file
json.dump(config, config_file, indent=4)
config_file.truncate()
return render_template('success.html', response=(access_token, refresh_token))
else:
# Redirect the user to a desired URL
if csrf != True:
return render_template('csrf-failed.html', response=(CSRF, csrf_check))
elif error == True:
error_content = request.args.get('error_description')
return render_template('oauth-error.html', response=(error_param, error_content))
else:
pass
@app.route('/shutdown')
def shutdown():
# Shutting down the Flask app gracefully
#return ('proccess ended', time.sleep(5), os.kill(os.getpid(), signal.SIGINT))
request.environ.get('werkzeug.server.shutdown')
return 'Close this browser to terminate the process!'
'''def run_prod():
# Create a new Firefox profile
subprocess.run([browser_path, "-CreateProfile", "new_profile", "-no-remote"])
# Launch Firefox with the new profile and open the URL
subprocess.run([browser_path, "-P", "new_profile", "-no-remote", home_url])
serve(app, host=host, port=port)'''
def run_prod():
# Function to start the Flask server
def start_server():
serve(app, host=host, port=port)
# Create a new thread for the Flask server
server_thread = threading.Thread(target=start_server)
# Start the Flask server thread
server_thread.start()
# Wait for the Flask server to start (adjust the delay as needed)
time.sleep(2)
# Create a new Firefox profile
subprocess.run([browser_path, "-CreateProfile", "new_profile", "-no-remote"])
# Launch Firefox with the new profile and open the URL
subprocess.run([browser_path, "-P", "new_profile", "-no-remote", home_url])
def run_app():
if prod_status == "true":
print(prod_status)
run_prod()
else:
print(prod_status)
webbrowser.open(home_url)
app.run()
if __name__ == '__main__':
#app.run()
run_app()

View File

@ -26,33 +26,23 @@
import requests
import json
import configparser
# Initialize a ConfigParser object
config = configparser.ConfigParser()
# Read the configuration file 'config.ini'
config.read('config.ini')
# Get the 'endpoint' value from the 'Oauth' section in the configuration file
url = config.get('Oauth', 'endpoint')
# Prepare the payload for the request
payload = {
"client_id": config.get('Oauth', 'client_id'),
"client_secret": config.get('Oauth', 'client_secret'),
"grant_type": "refresh_token",
"refresh_token": config.get('Oauth', 'refresh_token')
}
# Set the headers for the request
headers = {"Content-type": "application/x-www-form-urlencoded"}
# Define a function named 'refresh' that handles the token refresh logic
def refresh():
def refresh(config_path, endpoint, client_id, client_secret, refresh_token):
# Set the headers for the request
headers = {"Content-type": "application/x-www-form-urlencoded"}
# Prepare the payload for the request
payload = {
"client_id": client_id,
"client_secret": client_secret,
"grant_type": "refresh_token",
"refresh_token": refresh_token
}
# Send a POST request to the specified URL with the payload and headers
response = requests.post(url, data=payload, headers=headers)
response = requests.post(endpoint, data=payload, headers=headers)
# Check the response status code
if response.status_code == 200:
@ -67,9 +57,17 @@ def refresh():
refreshed_bearer = data['access_token']
new_refresh_token = data['refresh_token']
'''
Return the refreshed bearer token and the new refresh token
in order to use it in the replace() function of the main
ino.py module.
'''
return (refreshed_bearer, new_refresh_token)
with open(config_path, 'r+') as config_file:
# Load the JSON data from the file
config = json.load(config_file)
# Update the token value in the config data
config['oauth']['bearer'] = refreshed_bearer
config['oauth']['refresh_token'] = new_refresh_token
# Move the file pointer back to the beginning of the file
config_file.seek(0)
# Write the updated config data to the file
json.dump(config, config_file, indent=4)
config_file.truncate()

View File

@ -1,94 +0,0 @@
from flask import Flask, request, redirect, render_template
import requests
import os
import signal
import configparser
import webbrowser
import time
# Configuration parser for reading the config file
config = configparser.ConfigParser()
config.read('config.ini')
endpoint = config.get('Oauth', 'endpoint')
client_id = config.get('Oauth', 'client_id')
client_secret = config.get('Oauth', 'client_secret')
callback = config.get('Oauth', 'callback')
scope = config.get('Oauth', 'scope')
CSRF = config.get('Oauth', 'CSRF')
url = 'https://www.inoreader.com/oauth2/auth?client_id={}&redirect_uri={}&response_type=code&scope={}&state={}'.format(client_id, callback, scope, CSRF)
app = Flask(__name__)
@app.route('/')
def index():
return redirect(url)
@app.route('/oauth-callback')
def oauth_callback():
# Get the authorization code from the request URL
authorization_code = request.args.get('code')
csrf_check = request.args.get('state')
error_param = request.args.get('error')
csrf = True if csrf_check == CSRF else False
error = True if error_param != None else False
if csrf == True and error != True:
# Exchange the authorization code for an access token
access_token_url = endpoint
payload = {
'grant_type': 'authorization_code',
'code': authorization_code,
'client_id': client_id,
'client_secret': client_secret,
'redirect_uri': callback
}
response = requests.post(access_token_url, data=payload)
# Parse the response to get the access token
if response.status_code == 200:
access_token = response.json()['access_token']
refresh_token = response.json()['refresh_token']
config.set('Oauth', 'bearer', access_token)
config.set('Oauth', 'refresh_token', refresh_token)
with open('config.ini', 'w') as config_file:
config.write(config_file)
return render_template('success.html', response=(access_token, refresh_token))
else:
# Redirect the user to a desired URL
if csrf != True:
return render_template('csrf-failed.html', response=(CSRF, csrf_check))
elif error == True:
error_content = request.args.get('error_description')
return render_template('oauth-error.html', response=(error_param, error_content))
else:
pass
@app.route('/shutdown')
def shutdown():
# Shutting down the Flask app gracefully
return ('proccess ended', time.sleep(5), os.kill(os.getpid(), signal.SIGINT))
def run_app():
# Open the browser and start the Flask app
webbrowser.open('http://localhost:5000')
app.run()
if __name__ == '__main__':
#app.run()
run_app()