Skip to content Skip to sidebar Skip to footer

How To Test If Email Was Sent After Executing Celery Task

I'm using Django 1.10 and Celery 4.1 I have a shared_task which sends an email to the user. # myapp/tasks.py @shared_task def notify_user(user_id): # TODO: send email and do ot

Solution 1:

I put it here for someone that will face the same problem.

I've solved it with

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

https://stackoverflow.com/a/46531472/7396169

I think this solution is suitable for unit testing.

Solution 2:

tasks.py

from django.core.mail import EmailMessage
from django.template.loader import render_to_string
from django.contrib.auth import get_user_model

from accounts.models import Token
from celery import shared_task

@shared_task(bind=True)defsend_login_email_task(self, email):
    try:
        uid = str(uuid.uuid4())
        Token.objects.create(email=email, uid=uid)
        current_site = 'localhost:8000'
        mail_subject = 'Activate your account.'
        message = render_to_string('accounts/login_activation_email.html', {
            'domain': current_site,
            'uid': uid
        })
        print('called')
        email = EmailMessage(mail_subject, message, to=[email])
        email.send()
    except Token.DoesNotExist:
        logging.warning(
            "Tried to send activation email to non-existing user '%s'", email)
    except smtplib.SMTPException as exc:
        raise self.retry(exc=exc)

test_tasks.py

from django.test import TestCase
from unittest.mock import patch
from django.contrib.auth import get_user_model
from celery.exceptions import Retry
from proj.celery import App
import smtplib
import uuid


import accounts.tasks
from accounts.models import Token
@patch('accounts.tasks.EmailMessage')deftest_send_login_email_task(self, mock_email_message):

    # call task
    token = Token.objects.get(email=self.email, uid=self.uid)
    print(token.email)
    accounts.tasks.send_login_email_task.apply_async((token.email,))
    self.assertEqual(mock_email_message.called, True)

    # patch EmailMessageprint(mock_email_message.call_args)
    args, kwargs = mock_email_message.call_args
    subject = args[0]
    self.assertEqual(subject, 'Activate your account.')
    self.assertEqual(kwargs, {'to': ['ama@example.com']})

Post a Comment for "How To Test If Email Was Sent After Executing Celery Task"