"""
Module with methods to create a quiz with a telegram bot
"""
import asyncio
import logging
import pickle
from argon2 import PasswordHasher
from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove
from telegram.constants import ChatAction
from telegram.ext import ConversationHandler
from quizbot.quiz.question_factory import QuestionBool, QuestionChoice,\
QuestionChoiceSingle, QuestionNumber, QuestionString
from quizbot.quiz.quiz import Quiz
from quizbot.bot.models import QuizModel
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
# Dict with string and associated question class
dict_question_types = {
'Ask for a number': QuestionNumber,
'Ask for a string': QuestionString,
'Ask for a boolean value': QuestionBool,
'Ask a multiple choice question': QuestionChoice,
'Ask a multiple choice question with one correct answer': QuestionChoiceSingle
}
[docs]
async def start(update, context):
"""
Starts a conversation about quiz creation.
Welcomes the user and asks for the type of the first question.
"""
logger.info('[%s] Creation initialized', update.message.from_user.username)
if context.user_data.get('quiz') is not None:
# user is in the middle of a quiz and cant attempt to a second one
logger.info('[%s] Creation canceled, because the user is in the middle of a creation.',
update.message.from_user.username)
await update.message.reply_text(
"You're in the middle of a creation 😉 "
"You can't create a second one at the same time 😁\n"
'If you want to cancel your creation, enter /cancelCreate.',
reply_markup=ReplyKeyboardRemove()
)
return ConversationHandler.END
# Init Quiz for user
context.user_data['quiz'] = Quiz(update.message.from_user.username)
# Asks for type of first question
list_question = [[el] for el in list(dict_question_types.keys())]
await update.message.reply_text(
"Hi 😃 Let's create a new quiz!\n"
"What type of question should the first one be?\n"
'If you want to cancel your creation, enter /cancelCreate.',
reply_markup=ReplyKeyboardMarkup(
list_question, one_time_keyboard=True)
)
return 'ENTER_TYPE'
[docs]
async def cancel(update, context):
"""
Cancels a creation ofa quiz by deleting the users' entries.
"""
logger.info('[%s] Creation canceled by user',
update.message.from_user.username)
# Delete user data
context.user_data.clear()
await update.message.reply_text(
"I canceled the creation process. See you next time. 🙋♂️",
reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
[docs]
async def enter_type(update, context):
"""
After entering the new question type, it asks for the question itself,
if the entered string isn't 'Enter'.
Otherwise, it asks if the question should be displayed in random order.
"""
if update.message.text == "Enter":
# User dont want to add more questions
# Asks for randomness
await update.message.reply_text(
"Should the questions be displayed in random order? 🤔",
reply_markup=ReplyKeyboardMarkup(
[['Yes', 'No']], one_time_keyboard=True)
)
logger.info('[%s] Completed question creation',
update.message.from_user.username)
return 'ENTER_RANDOMNESS_QUIZ'
# TODO What if type doesnt exisit
# Save question type
context.user_data['questtype'] = dict_question_types[update.message.text]
await update.message.reply_text("What is the question? 🤔")
return 'ENTER_QUESTION'
[docs]
async def enter_question(update, context):
"""
Asks for the correct answer to the question after entering the question itself.
"""
# Save question in user_data
context.user_data['question'] = update.message.text
logger.info('[%s] Entered new question type "%s"',
update.message.from_user.username, update.message.text)
# Ask for correct answer in different ways
if context.user_data['questtype'] == QuestionChoiceSingle:
reply_text = "Please enter ONE correct answer ☝️"
elif context.user_data['questtype'] == QuestionChoice:
reply_text = "Please enter the correct answers separated by ', ' 🙆♂️"
else:
reply_text = "Please enter the correct answer 🙆♂️"
await update.message.reply_text(reply_text)
return 'ENTER_ANSWER'
[docs]
async def enter_answer(update, context):
"""
After entering the correct answer it tries to process it.
If it fails, it asks for the correct answer again.
Otherwise, it asks for additional possible answers,
if the question is an instance of QuestionChoice.
Otherwise, it adds the question to the quiz and asks for the type of the next question.
"""
# Save correct answer in user_data
context.user_data['answer'] = update.message.text
# Try to init question instance
QuestionType = context.user_data['questtype']
try:
context.user_data['questionInstance'] = QuestionType(context.user_data['question'],
context.user_data['answer'])
except AssertionError:
# TODO specify exceptions
# Error because it isnt a number, no entry, not True/False,...
await update.message.reply_text(
"Sorry. Something went wrong by entering your answer. Please try again. 😕")
logger.info('[%s] Entering correct answer "%s" failed',
update.message.from_user.username, update.message.text)
return 'ENTER_ANSWER'
logger.info('[%s] Entering correct answer "%s" accepted',
update.message.from_user.username, update.message.text)
if isinstance(context.user_data['questionInstance'], QuestionChoice):
# If QuestionChoice instance, ask for additional possible answers
await update.message.reply_text(
"Please enter additional possible answers separated by ', ' 😁")
return 'ENTER_POSSIBLE_ANSWER'
# Add question to quiz
context.user_data['quiz'].add_question(
context.user_data['questionInstance'])
# Asks for type of next question
list_question = [[el] for el in list(dict_question_types.keys())]
await update.message.reply_text(
"What type of question should the next one be? "
"If you don't have more questions, press 'Enter'.",
reply_markup=ReplyKeyboardMarkup(
list_question + [['Enter']], one_time_keyboard=True)
)
return 'ENTER_TYPE'
[docs]
async def enter_possible_answer(update, context):
"""
After entering additional possible answers, it asks whether the order of the answers
should be random.
"""
list_possible_answers = update.message.text.split(', ')
# Add possible answers to question
for answer in list_possible_answers:
context.user_data['questionInstance'].add_possible_answer(answer)
logger.info('[%s] Entered additional possible answers',
update.message.from_user.username)
# Ask for
await update.message.reply_text(
"Should the answers be displayed in random order? 🤔",
reply_markup=ReplyKeyboardMarkup(
[['Yes', 'No']], one_time_keyboard=True)
)
return 'ENTER_RANDOMNESS_QUESTION'
[docs]
async def enter_randomness_question(update, context):
"""
After entering whether the order if the answers should be random,
it adds the question to the quiz.
After that, it asks for the type of next question.
"""
# Check for correct input
if update.message.text not in ('Yes', 'No'):
await update.message.reply_text(
"Thats not a 'Yes' or a 'No' 😕"
"Should the answers be displayed in random order?",
reply_markup=ReplyKeyboardMarkup(
[['Yes', 'No']], one_time_keyboard=True)
)
return 'ENTER_RANDOMNESS_QUESTION'
context.user_data['questionInstance'].is_random = update.message.text == 'Yes'
logger.info('[%s] Entered randomness of the order of possible answers',
update.message.from_user.username)
# Add question to quiz
context.user_data['quiz'].add_question(
context.user_data['questionInstance'])
logger.info('[%s] Added the question to the quiz',
update.message.from_user.username)
# Asks for type of next question
list_question = [[el] for el in list(dict_question_types.keys())]
await update.message.reply_text(
"What type of question should the next one be? "
"If you don't have more questions, press 'Enter'.",
reply_markup=ReplyKeyboardMarkup(
list_question + [['Enter']], one_time_keyboard=True)
)
return 'ENTER_TYPE'
[docs]
async def enter_randomness_quiz(update, context):
"""
After entering whether the order if the questions should be random,
it asks if the result of the question be displayed after the question itself.
"""
# Check for correct input
if update.message.text not in ('Yes', 'No'):
await update.message.reply_text(
"Thats not a 'Yes' or a 'No' 😕"
"Should the questions be displayed in random order?",
reply_markup=ReplyKeyboardMarkup(
[['Yes', 'No']], one_time_keyboard=True)
)
return 'ENTER_RANDOMNESS_QUIZ'
# Process input
context.user_data['quiz'].is_random = update.message.text == 'Yes'
# Ask for displaying result after question
await update.message.reply_text(
"Should the result of the question be displayed after the question?",
reply_markup=ReplyKeyboardMarkup(
[['Yes', 'No']], one_time_keyboard=True)
)
return 'ENTER_RESULT_AFTER_QUESTION'
[docs]
async def enter_result_after_question(update, context):
"""
After entering whether the result of the question should be displayed after the question itself,
it asks if the result of every question be displayed after the quiz.
"""
# Check for correct input
if update.message.text not in ('Yes', 'No'):
await update.message.reply_text(
"Thats not a 'Yes' or a 'No' 😕"
"Should the result of the question be displayed after the question?",
reply_markup=ReplyKeyboardMarkup(
[['Yes', 'No']], one_time_keyboard=True)
)
return 'ENTER_RESULT_AFTER_QUESTION'
# Process input
context.user_data['quiz'].show_results_after_question = update.message.text == 'Yes'
# Ask for displaying result of every question after quiz
await update.message.reply_text(
"Should the result of every question be displayed after the quiz?",
reply_markup=ReplyKeyboardMarkup(
[['Yes', 'No']], one_time_keyboard=True)
)
return 'ENTER_RESULT_AFTER_QUIZ'
[docs]
async def enter_result_after_quiz(update, context):
"""
After entering whether the result of every question should be displayed after the quiz,
it asks for the name of the quiz?
"""
# Check for correct input
if update.message.text not in ('Yes', 'No'):
await update.message.reply_text(
"Thats not a 'Yes' or a 'No' 😕"
"Should the result of every question be displayed after the quiz?",
reply_markup=ReplyKeyboardMarkup(
[['Yes', 'No']], one_time_keyboard=True)
)
return 'ENTER_RESULT_AFTER_QUIZ'
# Process input
context.user_data['quiz'].show_results_after_quiz = update.message.text == 'Yes'
# Ask for name of quiz
await update.message.reply_text(
"Great! 😃 I created a new quiz!\nHow should I name it? ✏️"
)
return 'ENTER_QUIZ_NAME'
[docs]
async def enter_quiz_name(update, context):
"""
After entering the name of the quiz, it looks up if the quiz name is occupied.
If unique, asks whether the user wants to set a password.
"""
logger.info('[%s] Completed quiz creation',
update.message.from_user.username)
quizname = update.message.text
# Bot is typing during database query
await context.bot.send_chat_action(
chat_id=update.effective_message.chat_id, action=ChatAction.TYPING)
# Query for question with input name
username = update.message.from_user.username
Session = context.bot_data['Session']
session = Session()
try:
result = await asyncio.to_thread(
session.query(QuizModel).filter_by(username=username, quizname=quizname).first
)
finally:
session.close()
if result is not None:
# Quiz with quizname already exists
await update.message.reply_text(
"Sorry. You already have a quiz named {} 😕\nPlease try something else".format(
quizname)
)
logger.info('[%s] Quiz with name "%s" already exists',
update.message.from_user.username, update.message.text)
return 'ENTER_QUIZ_NAME'
# Store quizname for later save
context.user_data['quizname'] = quizname
# Ask if the user wants to set a password
await update.message.reply_text(
"Do you want to set a password for this quiz? 🔒\n"
"Other users will need it to attempt the quiz. You can always attempt without it.",
reply_markup=ReplyKeyboardMarkup(
[['Yes', 'No']], one_time_keyboard=True)
)
return 'ENTER_PASSWORD_CHOICE'
[docs]
async def enter_password_choice(update, context):
"""
After choosing whether to set a password, either asks for the password
or saves the quiz without one.
"""
if update.message.text == 'No':
return await _save_quiz(update, context, password=None)
if update.message.text == 'Yes':
await update.message.reply_text("Please enter the password 🔑")
return 'ENTER_PASSWORD'
# Invalid input
await update.message.reply_text(
"Thats not a 'Yes' or a 'No' 😕"
"Do you want to set a password for this quiz?",
reply_markup=ReplyKeyboardMarkup(
[['Yes', 'No']], one_time_keyboard=True)
)
return 'ENTER_PASSWORD_CHOICE'
[docs]
async def enter_password(update, context):
"""
Hashes the password and saves the quiz to the database.
"""
ph = PasswordHasher()
hashed = ph.hash(update.message.text)
return await _save_quiz(update, context, password=hashed)
async def _save_quiz(update, context, password=None):
"""
Saves the quiz to the database with an optional password hash.
"""
username = update.message.from_user.username
quizname = context.user_data['quizname']
Session = context.bot_data['Session']
session = Session()
try:
quiz_row = QuizModel(
username=username,
quizname=quizname,
quizinstance=pickle.dumps(context.user_data['quiz']),
password=password,
)
session.add(quiz_row)
await asyncio.to_thread(session.commit)
finally:
session.close()
await update.message.reply_text(
"Great! 🥳 I saved your new quiz."
"You can attempt to it by the name {}.".format(quizname),
reply_markup=ReplyKeyboardRemove()
)
logger.info('[%s] Quiz saved as "%s"',
update.message.from_user.username, quizname)
# Delete user data
context.user_data.clear()
return ConversationHandler.END