Program documentation

* Comments the code

* Includes a README file
master
Alexandre Racine 2023-06-16 22:51:47 +02:00
parent 442a89b81b
commit f4fe6e717e
8 changed files with 659 additions and 304 deletions

1
.gitignore vendored
View File

@ -1,2 +1 @@
tokens.txt
__pycache__/ __pycache__/

62
README.md Normal file
View File

@ -0,0 +1,62 @@
# Inopy
Inopy is a Python application that retrieves unread articles from the [Inoreader](https://www.inoreader.com) API and sends a notification if there are any.
## Table of Contents
- [Overview](#overview)
- [Installation](#installation)
- [Usage](#usage)
- [License](#license)
## Overview
It uses OAuth authentication for accessing the [API](https://www.inoreader.com/developers) and stores the authentication data in a JSON configuration file located at `/HOME/USER/.config/inopy/config.json`. The program is divided into multiple modules and functions for making API requests, parsing response data, refreshing tokens, and sending notifications.
The code is organized into the following modules:
- `ino.py`: The main module that retrieves unread articles, handles token refreshing, and sends notifications.
- `config.py`: Contains configuration settings used by other modules.
- `oauth.py`: Implements the OAuth authentication flow using Flask.
- `refresh.py`: Contains a `refresh` function for refreshing OAuth access and refresh tokens and updating the configuration file.
- `notif.py`: Provides a function for sending notifications using D-Bus (only tested with Cinnamon desktop environment).
## Installation
To use Inopy, you need to follow these steps:
1. Install the required dependencies by running the following command:
```bash
pip install requests pydbus flask waitress
```
2. Clone or download this repository.
3. Run the `ino.py` module the first time and set up the configuration file by providing the necessary OAuth, API endpoint and notifications details.
4. Run the `ino.py` module to retrieve unread articles and receive notifications.
## Usage
The first time `ino.py` module is run, it will check if `config.json` exists in `/HOME/USER/.config/inopy/`. If not it will prompt user for configuration details and create the `config.json` file. The file should contain the OAuth endpoint, client ID, client secret, callback URL, scope, CSRF value and home URL.
It should also contain the Inoreader API endpoints, notification labels, production status, browser path, host and port.
Once the configuration is set up, you can adapt some of the default values. Typically, check and if necessary adapt the `prod` section of the file. It defines whether the program is run in production or development mode.
To set a cron in Linux triggering the program for a notification, create a bash script containing the following code
```bash
#!/bin/bash
export DISPLAY=:0.0
export XAUTHORITY=/home/user/.Xauthority # adapt with your username
export DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/1000/bus
python path_to_your_ino.py # adapt with the path to your program directory
```
and define the cron job pointing to the bash script created.
## License
Inopy is released under the GNU General Public License version 3 or later. You can redistribute it and/or modify it under the terms of the license. For more details, please refer to the [GNU General Public License](https://www.gnu.org/licenses/).

View File

@ -1,6 +1,53 @@
"""
=========================================================================================
Copyright © 2023 Alexandre Racine <https://alex-racine.ch>
This file is part of Inopy.
Inopy is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
Inopy is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with Inopy. If not, see <https://www.gnu.org/licenses/>.
=========================================================================================
DISCLAIMER: parts of this code and comments blocks were created
with the help of ChatGPT developped by OpenAI <https://openai.com/>
Followed by human reviewing, refactoring and fine-tuning.
=========================================================================================
This code consists of two main functions: get_config and create_file.
The get_config function check if the config directory or the config file exist. If not it creates config directory and/or config file and then gets the configuration data.
The create_file function is called if the configuration file should be created. It prompts the user to enter configuration details and save them in the config file.
Finally the config function serves as a wrapper function that sets the paths and calls get_config to retrieve the configuration data.
=========================================================================================
"""
import json import json
import os import os
'''
================================================================
Create a function to check if the config directory
and the config file exist.
If the config file or the config directory don't exist
create them.
Read the config file and load its contents into a dictionary.
Extract the necessary values from the config dictionary,
create a dictionary of local variables and return it.
================================================================
'''
def get_config(config_path, config_file_path): def get_config(config_path, config_file_path):
if os.path.exists(config_path): if os.path.exists(config_path):
@ -9,16 +56,22 @@ def get_config(config_path, config_file_path):
pass pass
else: else:
# If the config file doesn't exist, create it.
create_file(config_file_path) create_file(config_file_path)
else: else:
# If the config directory doesn't exist, create it
os.mkdir(config_path) os.mkdir(config_path)
print(f'{config_path} created!') print(f'{config_path} created!')
create_file(config_file_path) create_file(config_file_path)
# Load the config file # load config content
with open(config_file_path) as config_file: with open(config_file_path) as config_file:
config = json.load(config_file) config = json.load(config_file)
# Extract the necessary values from the config dictionary
bearer = config['oauth']['bearer'] bearer = config['oauth']['bearer']
refresh_token = config['oauth']['refresh_token'] refresh_token = config['oauth']['refresh_token']
endpoint = config['oauth']['endpoint'] endpoint = config['oauth']['endpoint']
@ -42,14 +95,24 @@ def get_config(config_path, config_file_path):
port = config['prod']['port'] port = config['prod']['port']
variables = locals() variables = locals()
return variables return variables
'''
==========================================================
Create a function to prompt the user to enter details
for the configuration file.
Create the configuration dictionary.
Write the configuration dictionary to the config file.
==========================================================
'''
def create_file(config_file_path): def create_file(config_file_path):
config = {} config = {}
# Ask user for input # prompt user for configuration data
print("\nEnter details about OAuth authentication: \n") print("\nEnter details about OAuth authentication: \n")
endpoint = input("Enter OAuth endpoint: ") endpoint = input("Enter OAuth endpoint: ")
@ -69,8 +132,7 @@ def create_file(config_file_path):
singular_article = input("Enter singular label if there is only one unread article (e.g. new article in feed): ") singular_article = input("Enter singular label if there is only one unread article (e.g. new article in feed): ")
plural_articles = input("Enter plural label if there are many unread articles (e.g. new articles in feed): ") plural_articles = input("Enter plural label if there are many unread articles (e.g. new articles in feed): ")
# Create the configuration dictionary
# Create nested JSON structure
config["oauth"] = { config["oauth"] = {
"bearer": "", "bearer": "",
"refresh_token": "", "refresh_token": "",
@ -101,15 +163,32 @@ def create_file(config_file_path):
"port": "5000" "port": "5000"
} }
# Save config to a file # Write the config data to the config file
with open(config_file_path, "w") as file: with open(config_file_path, "w") as file:
json.dump(config, file, indent=4) json.dump(config, file, indent=4)
print(f"{config_file_path} created successfully!") print(f"{config_file_path} created successfully!")
'''
==============================================
Create a function to set the paths for the
config directory and file.
Get the configuration data and return it.
==============================================
'''
def config(): def config():
# Set the path of config file to
# /HOME/USER/.config/inopy/config.json
config_path = os.path.join(os.environ['HOME'], '.config/inopy') config_path = os.path.join(os.environ['HOME'], '.config/inopy')
config_file = 'config.json' config_file = 'config.json'
config_file_path = os.path.join(config_path, config_file) config_file_path = os.path.join(config_path, config_file)
# Get and return the configuration data
data = get_config(config_path, config_file_path) data = get_config(config_path, config_file_path)
return data return data
if __name__ == '__main__':
config = config()

BIN
icons/inoreader.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.9 KiB

198
ino.py
View File

@ -1,4 +1,6 @@
""" """
=========================================================================================
Copyright © 2023 Alexandre Racine <https://alex-racine.ch> Copyright © 2023 Alexandre Racine <https://alex-racine.ch>
This file is part of Inopy. This file is part of Inopy.
@ -9,13 +11,13 @@
You should have received a copy of the GNU General Public License along with Inopy. If not, see <https://www.gnu.org/licenses/>. You should have received a copy of the GNU General Public License along with Inopy. If not, see <https://www.gnu.org/licenses/>.
------------------------------------------------------------------------------------- =========================================================================================
DISCLAIMER: parts of this code and comments blocks were created DISCLAIMER: parts of this code and comments blocks were created
with the help of ChatGPT developped by OpenAI <https://openai.com/> with the help of ChatGPT developped by OpenAI <https://openai.com/>
Followed by human reviewing, refactoring and fine-tuning. Followed by human reviewing, refactoring and fine-tuning.
------------------------------------------------------------------------------------- =========================================================================================
Inopy retrieves unread articles from the Inoreader API <https://www.inoreader.com/developers> and sends a notification if there are any unread articles. Inopy retrieves unread articles from the Inoreader API <https://www.inoreader.com/developers> and sends a notification if there are any unread articles.
@ -26,48 +28,51 @@
Inopy is structured into functions and modules for making API requests, parsing response data, refreshing tokens and sending notifications. Inopy is structured into functions and modules for making API requests, parsing response data, refreshing tokens and sending notifications.
For more information about OAuth authentication, plase see <https://www.inoreader.com/developers/oauth> For more information about OAuth authentication, plase see <https://www.inoreader.com/developers/oauth>
=========================================================================================
""" """
import requests import requests
import json import json
import notif import notif
import time
import sys
import os
from config import config from config import config
from oauth import app, run_app from oauth import app, run_app
from refresh import refresh from refresh import refresh
""" '''
Read the configuration file. ===========================================
Define functions to make an API request
Get the bearer token from the config with bearer token and parse response
and set it accordingly to Inoreader API data as JSON
specifications. ===========================================
'''
Get the API endpoint URL from the config
and send a GET request to the API.
"""
def APIrequest(url, bearer): def APIrequest(url, bearer):
bearer_string = 'Bearer {}'.format(bearer) bearer_string = 'Bearer {}'.format(bearer)
headers = {'Authorization': bearer_string} headers = {'Authorization': bearer_string}
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
print(response.status_code)
return response return response
# Parse the response as JSON
def getData(response): def getData(response):
data = json.loads(response.text) data = json.loads(response.text)
return data return data
""" '''
Refresh the bearer token if it expired. ===================================
Update the bearer and refresh token Initialize an empty message
in the config string for the notification
""" ===================================
'''
message = "" message = ""
'''
===========================================
Load configuration settings and
retrieve necessary configuration values
===========================================
'''
config = config() config = config()
bearer = config['bearer'] bearer = config['bearer']
@ -81,102 +86,177 @@ refresh_token = config['refresh_token']
summary = config['summary'] summary = config['summary']
singular_article = config['singular_article'] singular_article = config['singular_article']
plural_articles = config['plural_articles'] plural_articles = config['plural_articles']
singular_article = config['singular_article']
plural_articles = config['plural_articles'] '''
=========================================
Create dictionaries and list to store
unread counts, subscriptions and feed
categories (folders)
=========================================
'''
unreadcounts = {} unreadcounts = {}
subscriptions = {} subscriptions = {}
categories = [] categories = []
# Make a request to get unread counts # Make API request to get unread counts
unread_response = APIrequest(unread_counts_url, bearer) unread_response = APIrequest(unread_counts_url, bearer)
""" '''
If unauthorized (401) status code ===================================================
is received, refresh the bearer token If the response status code is 403 (Forbidden):
and make a new request with the updated token.
"""
* Run the Flask app to get a bearer token
* Load the updated configuration file
* Update the bearer token with the new value
from the updated config and make a new API
request with the updated bearer token
===================================================
'''
'''
======================================================
If the response status code is 401 (Unauthorized):
* Refresh the bearer token
* Load the updated configuration file
* Update the bearer token with the new value
from the updated config and make a new API
request with the updated bearer token
======================================================
'''
'''
============================================
If the response status code is 200 (OK):
* proceed with the code execution
============================================
'''
# Check for 403 error case
if unread_response.status_code == 403: if unread_response.status_code == 403:
print(unread_response.status_code)
run_app() run_app()
#new_config = config()
#bearer = new_config['bearer']
with open(config_path) as config_file: with open(config_path) as config_file:
new_config = json.load(config_file) new_config = json.load(config_file)
bearer = new_config['oauth']['bearer']
print(bearer)
unread_response = APIrequest(unread_counts_url, bearer)
print(unread_response.text)
bearer = new_config['oauth']['bearer']
unread_response = APIrequest(unread_counts_url, bearer)
# Check for 401 error case
elif unread_response.status_code == 401: elif unread_response.status_code == 401:
refresh(config_path, endpoint, client_id, client_secret, refresh_token) refresh(config_path, endpoint, client_id, client_secret, refresh_token)
#new_config = config()
#bearer = new_config['bearer']
with open(config_path) as config_file: with open(config_path) as config_file:
new_config = json.load(config_file) new_config = json.load(config_file)
bearer = new_config['oauth']['bearer'] bearer = new_config['oauth']['bearer']
print(bearer)
unread_response = APIrequest(unread_counts_url, bearer) unread_response = APIrequest(unread_counts_url, bearer)
# Proceed with the code execution
elif unread_response.status_code == 200: elif unread_response.status_code == 200:
pass pass
""" # Make API request to get feeds list
Get the list of feeds
Parse the response data
Parse the unread counts data
"""
feeds_list_response = APIrequest(feeds_list_url, bearer) feeds_list_response = APIrequest(feeds_list_url, bearer)
print(feeds_list_response)
'''
=======================================
Parse the responses data as JSON
Iterate over the unread counts and
subscriptions and store them in the
respective dictionaries
If the subscription has categories
(is part of a folder) append the
category in the categories list
=======================================
'''
feeds_list_data = getData(feeds_list_response) feeds_list_data = getData(feeds_list_response)
unread_data = getData(unread_response) unread_data = getData(unread_response)
print(feeds_list_data)
print('\n\n')
print(unread_data)
for unread in unread_data['unreadcounts']: for unread in unread_data['unreadcounts']:
unread['count'] = int(unread['count']) unread['count'] = int(unread['count'])
if unread['count'] > 0: if unread['count'] > 0:
unreadcounts[unread['id']] = unread['count'] unreadcounts[unread['id']] = unread['count']
for subscribed in feeds_list_data['subscriptions']: for subscribed in feeds_list_data['subscriptions']:
if subscribed['categories']: if subscribed['categories']:
if subscribed['categories'][0]['id'] not in categories: if subscribed['categories'][0]['id'] not in categories:
categories.append(subscribed['categories'][0]['id']) categories.append(subscribed['categories'][0]['id'])
subscriptions[subscribed['id']] = subscribed['title'] subscriptions[subscribed['id']] = subscribed['title']
'''
==================================================
Iterate over the unreadcounts dictionary
Determine the appropriate singular or
plural notification label based on the
count (e.g. new article or new articles)
Include the unread feed in the notification
only if it is not in the categories list.
This is to avoid duplicates notifications
for the unread feed and the folder in which
the feed is.
Do not include the reading-list in the
notification
If the unread_id exists in the subscriptions
dictionary, get the title associated with it.
Else extract the title from the unread_id.
Finally append the count, new_articles label
and title to the message string
==================================================
'''
for unread_id, count in unreadcounts.items(): for unread_id, count in unreadcounts.items():
if count == 1: # Determine singular or plural notification label
new_articles = singular_article new_articles = singular_article if count == 1 else plural_articles
else:
new_articles = plural_articles
count = str(count) count = str(count)
# Do not include the categories and the reading-list in the notification
if not unread_id in categories: if not unread_id in categories:
if unread_id.split("/")[-1] == "reading-list": if unread_id.split("/")[-1] == "reading-list":
pass pass
else: else:
# Get the clean feed title
if unread_id in (k for k,v in subscriptions.items()): if unread_id in (k for k,v in subscriptions.items()):
title = next(v for k, v in subscriptions.items() if k == unread_id) title = next(v for k, v in subscriptions.items() if k == unread_id)
else: else:
title = unread_id.split("/")[-1] title = unread_id.split("/")[-1]
# Build the final notification message
message = message + count + " " + new_articles + " " + title + "\n" message = message + count + " " + new_articles + " " + title + "\n"
else: else:
pass pass
# Send notification if message is not empty. '''
====================================
Send the notification for unread
feeds only if the message string
is set
====================================
'''
if message != "": if message != "":
notif.send_notification(summary, message) notif.send_notification(summary, message)
else: else:
pass pass

View File

@ -1,4 +1,6 @@
""" """
=========================================================================================
Copyright © 2023 Alexandre Racine <https://alex-racine.ch> Copyright © 2023 Alexandre Racine <https://alex-racine.ch>
This file is part of Inopy. This file is part of Inopy.
@ -9,26 +11,34 @@
You should have received a copy of the GNU General Public License along with Inopy. If not, see <https://www.gnu.org/licenses/>. You should have received a copy of the GNU General Public License along with Inopy. If not, see <https://www.gnu.org/licenses/>.
------------------------------------------------------------------------------------- =========================================================================================
DISCLAIMER: parts of this code and comments blocks were created DISCLAIMER: parts of this code and comments blocks were created
with the help of ChatGPT developped by OpenAI <https://openai.com/> with the help of ChatGPT developped by OpenAI <https://openai.com/>
Followed by human reviewing, refactoring and fine-tuning. Followed by human reviewing, refactoring and fine-tuning.
------------------------------------------------------------------------------------- =========================================================================================
This code uses the pydbus library to send notifications. It defines a function send_notification() that takes summary and body as parameters. The aim of this module is to send a notification using the Inopy application. It takes a summary and body as input and utilizes D-Bus (Desktop Bus) to establish a session bus connection.
Then, it calls the Notify method to send a notification with the specified parameters. =========================================================================================
Note that this code assumes that the necessary dependencies are installed and that the D-Bus service for notifications is available on the system.
""" """
import os
from pydbus import SessionBus from pydbus import SessionBus
''' '''
Create a new session bus instance =========================================
Get the .Notifications interface object from the bus Define a function to send the
notification when a new unread
article is present in the feed
* Create a new session
bus instance
* Get the .Notifications interface
object from the bus
=========================================
''' '''
def send_notification(summary, body): def send_notification(summary, body):
@ -36,15 +46,21 @@ def send_notification(summary, body):
notifications = bus.get('.Notifications') notifications = bus.get('.Notifications')
''' '''
Call the Notify method on the notifications object to send a notification ==================================================================================
Call the Notify method on the notifications object to send a notification with
Parameters: Parameters:
- 'MyApp': The name of the application sending the notification
- 0: The ID of the notification (0 means a new notification) * 'MyApp': The name of the application sending the notification
- '': An optional icon name or path for the notification * 0: The ID of the notification (0 means a new notification)
- summary: The summary text of the notification * '': An optional icon name or path for the notification
- body: The body text of the notification * summary: The summary text of the notification
- []: A list of actions associated with the notification (empty in this case) * body: The body text of the notification
- {}: A dictionary of hints for the notification (empty in this case) * []: A list of actions associated with the notification
- 5000: The timeout duration in milliseconds for the notification (5000 ms = 5 seconds) (empty in this case)
* {}: A dictionary of hints for the notification (empty in this case)
* 5000: The timeout duration in milliseconds for the notification
(5000 ms = 5 seconds)
==================================================================================
''' '''
notifications.Notify('Inopy', 0, '/opt/chrome-apps-icons/inoreader.png', summary, body, [], {}, 5000) icon = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'icons/inoreader.png')
notifications.Notify('Inopy', 0, icon, summary, body, [], {}, 5000)

213
oauth.py
View File

@ -1,14 +1,53 @@
from flask import Flask, request, redirect, render_template """
from config import config =========================================================================================
from waitress import serve
Copyright © 2023 Alexandre Racine <https://alex-racine.ch>
This file is part of Inopy.
Inopy is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
Inopy is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with Inopy. If not, see <https://www.gnu.org/licenses/>.
=========================================================================================
DISCLAIMER: parts of this code and comments blocks were created
with the help of ChatGPT developped by OpenAI <https://openai.com/>
Followed by human reviewing, refactoring and fine-tuning.
=========================================================================================
This module aims to provide an OAuth authentication flow using Flask.
It sets up a web server that handles the authentication process, retrieves the access token and refresh token, and stores them in the configuration file.
The module can be run in either production or development mode, and it opens a web browser to complete the authentication process.
=========================================================================================
"""
import requests import requests
import os
import signal
import webbrowser import webbrowser
import time import time
import json import json
import subprocess import subprocess
import threading import threading
from flask import Flask, request, redirect, render_template
from config import config
from waitress import serve
'''
===================================
Load configuration values
Extract values from the config
dictionary
Build the URL for authorization
===================================
'''
config = config() config = config()
@ -19,39 +58,79 @@ callback = config['callback']
scope = config['scope'] scope = config['scope']
CSRF = config['csrf'] CSRF = config['csrf']
home_url = config['home_url'] home_url = config['home_url']
prod_status = config['prod_status'] prod_status = config['prod_status']
browser_path = config['browser_path'] browser_path = config['browser_path']
host = config['host'] host = config['host']
port = config['port'] port = config['port']
config_file_path = config['config_file_path'] config_file_path = config['config_file_path']
url = 'https://www.inoreader.com/oauth2/auth?client_id={}&redirect_uri={}&response_type=code&scope={}&state={}'.format(client_id, callback, scope, CSRF) url = 'https://www.inoreader.com/oauth2/auth?client_id={}&redirect_uri={}&response_type=code&scope={}&state={}'.format(client_id, callback, scope, CSRF)
'''
==========================
Initiate the Flask app
==========================
'''
app = Flask(__name__) app = Flask(__name__)
'''
==================================
When the home url is accessed:
* Redirect the user to the
authorization URL
==================================
'''
@app.route('/') @app.route('/')
def index(): def index():
return redirect(url) return redirect(url)
'''
=====================================================
When the oauth-callback url is accessed:
* Get the authorization code and other
parameters from the callback URL
* Check CSRF validation token and if there
is an error parameter in the URL
* Request bearer token and refresh token
* Save the bearer token and refresh token
to the config file
* If process is successfull:
* render the success template with the
response
* If CSRF failed:
* render the CSRF failure template
* If an error parameter is present in the URL:
* Render the OAuth error template
=====================================================
'''
@app.route('/oauth-callback') @app.route('/oauth-callback')
def oauth_callback(): def oauth_callback():
# Get the authorization code from the request URL
authorization_code = request.args.get('code') authorization_code = request.args.get('code')
csrf_check = request.args.get('state') csrf_check = request.args.get('state')
error_param = request.args.get('error') error_param = request.args.get('error')
csrf = True if csrf_check == CSRF else False csrf = True if csrf_check == CSRF else False
error = True if error_param != None else False error = True if error_param is not None else False
if csrf == True and error != True: if csrf and not error:
# Exchange the authorization code for an access token
access_token_url = endpoint access_token_url = endpoint
# Prepare data to request bearer token
payload = { payload = {
'grant_type': 'authorization_code', 'grant_type': 'authorization_code',
'code': authorization_code, 'code': authorization_code,
@ -60,108 +139,128 @@ def oauth_callback():
'redirect_uri': callback 'redirect_uri': callback
} }
# Request bearer token and refresh token
response = requests.post(access_token_url, data=payload) response = requests.post(access_token_url, data=payload)
# Parse the response to get the access token
if response.status_code == 200: if response.status_code == 200:
access_token = response.json()['access_token'] access_token = response.json()['access_token']
refresh_token = response.json()['refresh_token'] refresh_token = response.json()['refresh_token']
with open(config_file_path, 'r+') as config_file: with open(config_file_path, 'r+') as config_file:
# Load the JSON data from the file
config = json.load(config_file) config = json.load(config_file)
# Update the token value in the config data # Save the bearer token and refresh token to the config file
config['oauth']['bearer'] = access_token config['oauth']['bearer'] = access_token
config['oauth']['refresh_token'] = refresh_token config['oauth']['refresh_token'] = refresh_token
# Move the file pointer back to the beginning of the file
config_file.seek(0) config_file.seek(0)
# Write the updated config data to the file
json.dump(config, config_file, indent=4) json.dump(config, config_file, indent=4)
config_file.truncate() config_file.truncate()
return render_template('success.html', response=(access_token, refresh_token)) return render_template('success.html', response=(access_token, refresh_token))
else: else:
if not csrf:
# Redirect the user to a desired URL
if csrf != True:
return render_template('csrf-failed.html', response=(CSRF, csrf_check)) return render_template('csrf-failed.html', response=(CSRF, csrf_check))
elif error == True: elif error:
error_content = request.args.get('error_description') error_content = request.args.get('error_description')
return render_template('oauth-error.html', response=(error_param, error_content)) return render_template('oauth-error.html', response=(error_param, error_content))
else: else:
pass pass
'''
======================================
When the shutdown url is accessed:
* Shut down the Flask server
gracefully
======================================
'''
@app.route('/shutdown') @app.route('/shutdown')
def shutdown(): def shutdown():
# Shutting down the Flask app gracefully
#return ('proccess ended', time.sleep(5), os.kill(os.getpid(), signal.SIGINT))
request.environ.get('werkzeug.server.shutdown') request.environ.get('werkzeug.server.shutdown')
return 'Close this browser to terminate the process!' return 'Close this browser to terminate the process!'
'''
======================================
Define a function to start the
Flask server using Waitress in
production mode
======================================
'''
'''def run_prod():
# Create a new Firefox profile
subprocess.run([browser_path, "-CreateProfile", "new_profile", "-no-remote"])
# Launch Firefox with the new profile and open the URL
subprocess.run([browser_path, "-P", "new_profile", "-no-remote", home_url])
serve(app, host=host, port=port)'''
# Function to start the Flask server
def start_server(): def start_server():
serve(app, host=host, port=port) serve(app, host=host, port=port)
'''
======================================
Define a function to start the
production server inside a new
thread.
This is to ensure the server is
actually already running before
opening the web browser
======================================
'''
def run_prod(): def run_prod():
# Create a new thread for the Flask server
server_thread = threading.Thread(target=start_server) server_thread = threading.Thread(target=start_server)
# Start the Flask server thread
server_thread.start() server_thread.start()
# Wait for the Flask server to start (adjust the delay as needed)
time.sleep(2) time.sleep(2)
# Create a new Firefox profile # Launch a separate browser process with a new profile
subprocess.run([browser_path, "-CreateProfile", "new_profile", "-no-remote"]) subprocess.run([browser_path, "-CreateProfile", "new_profile", "-no-remote"])
# Launch Firefox with the new profile and open the URL
subprocess.run([browser_path, "-P", "new_profile", "-no-remote", home_url]) subprocess.run([browser_path, "-P", "new_profile", "-no-remote", home_url])
'''
======================================
Define a function to start the
development server inside a new
thread.
This is to ensure the server is
actually already running before
opening the web browser
======================================
'''
def run_dev(): def run_dev():
# Create a new thread for the Flask server
server_thread = threading.Thread(target=start_server) server_thread = threading.Thread(target=start_server)
# Start the Flask server thread
server_thread.start() server_thread.start()
# Wait for the Flask server to start (adjust the delay as needed)
time.sleep(2) time.sleep(2)
# Open the home URL in the default web browser
webbrowser.open(home_url) webbrowser.open(home_url)
app.run() app.run()
'''
======================================
Determine whether to run the
production or development server
based on the config
======================================
'''
def run_app(): def run_app():
if prod_status == "true": if prod_status == "true":
print(prod_status)
run_prod() run_prod()
else: else:
print(prod_status)
run_dev() run_dev()
'''
======================================
Run the application standalone
if the script is executed but not
imported
======================================
'''
if __name__ == '__main__': if __name__ == '__main__':
#app.run()
run_app() run_app()

View File

@ -1,4 +1,6 @@
""" """
=========================================================================================
Copyright © 2023 Alexandre Racine <https://alex-racine.ch> Copyright © 2023 Alexandre Racine <https://alex-racine.ch>
This file is part of Inopy. This file is part of Inopy.
@ -9,31 +11,57 @@
You should have received a copy of the GNU General Public License along with Inopy. If not, see <https://www.gnu.org/licenses/>. You should have received a copy of the GNU General Public License along with Inopy. If not, see <https://www.gnu.org/licenses/>.
------------------------------------------------------------------------------------- =========================================================================================
DISCLAIMER: parts of this code and comments blocks were created DISCLAIMER: parts of this code and comments blocks were created
with the help of ChatGPT developped by OpenAI <https://openai.com/> with the help of ChatGPT developped by OpenAI <https://openai.com/>
Followed by human reviewing, refactoring and fine-tuning. Followed by human reviewing, refactoring and fine-tuning.
------------------------------------------------------------------------------------- =========================================================================================
This code uses the values from the configuration file to construct a request payload. The aim of this module is to refresh both OAuth 2.0 access and refresh tokens and update the configuration file with the refreshed tokens.
The refresh function sends a POST request to the Inoreader OAuth endpoint <https://www.inoreader.com/developers/oauth> using the payload and headers to handle the refresh token process. =========================================================================================
Then it extracts the refreshed bearer token and the new refresh token in order to use it in the replace() function of the main ino.py module.
""" """
'''
=====================================================
Create a refresh function to refresh the bearer
and refresh token in the config data
* Set the headers and prepare the payload
data for the HTTP request
* Send a POST request to the specified
endpoint with the payload and headers
* Check the response status code to determine
if the request was successful
* Parse the response data as JSON and extract
the refreshed bearer token and new refresh
token from the response data
* Open the config file. Load the existing
config data from the file and update the
bearer token and refresh token in the config
data
* Move the file pointer to the beginning of
the file, write the updated config data back
to the file, overwriting the existing content
* Truncate the file to remove any remaining
content after the updated data
=====================================================
'''
import requests import requests
import json import json
# Define a function named 'refresh' that handles the token refresh logic
def refresh(config_path, endpoint, client_id, client_secret, refresh_token): def refresh(config_path, endpoint, client_id, client_secret, refresh_token):
# Set the headers for the request
headers = {"Content-type": "application/x-www-form-urlencoded"} headers = {"Content-type": "application/x-www-form-urlencoded"}
# Prepare the payload for the request
payload = { payload = {
"client_id": client_id, "client_id": client_id,
"client_secret": client_secret, "client_secret": client_secret,
@ -41,33 +69,25 @@ def refresh(config_path, endpoint, client_id, client_secret, refresh_token):
"refresh_token": refresh_token "refresh_token": refresh_token
} }
# Send a POST request to the specified URL with the payload and headers
response = requests.post(endpoint, data=payload, headers=headers) response = requests.post(endpoint, data=payload, headers=headers)
# Check the response status code
if response.status_code == 200: if response.status_code == 200:
print("Request was successful.") print("Request was successful.")
else: else:
print("Request failed with status code:", response.status_code) print("Request failed with status code:", response.status_code)
# Parse the response data as JSON
data = json.loads(response.text) data = json.loads(response.text)
# Extract the refreshed bearer token and new refresh token from the response data
refreshed_bearer = data['access_token'] refreshed_bearer = data['access_token']
new_refresh_token = data['refresh_token'] new_refresh_token = data['refresh_token']
with open(config_path, 'r+') as config_file: with open(config_path, 'r+') as config_file:
# Load the JSON data from the file
config = json.load(config_file) config = json.load(config_file)
# Update the token value in the config data
config['oauth']['bearer'] = refreshed_bearer config['oauth']['bearer'] = refreshed_bearer
config['oauth']['refresh_token'] = new_refresh_token config['oauth']['refresh_token'] = new_refresh_token
# Move the file pointer back to the beginning of the file
config_file.seek(0) config_file.seek(0)
# Write the updated config data to the file
json.dump(config, config_file, indent=4) json.dump(config, config_file, indent=4)
config_file.truncate() config_file.truncate()