Samet AKIN

Samet AKIN

ALAYLI YAZILIMCI

RabbitMQ işlemleri ve zamanlanmış mesaj gönderme ( Kuyruk İşlemleri )

RabbitMq ile neler yapabileceğiniz kısa anlatmak gerekirse, kısaca asenkron işlemler yapmanızı sağlar. Php senkron çalışan bir dildir yani ilk sıradaki işlemi yapmadan 2. sıradaki işlemi yapmaz. Mesela bir kullanıcı kaydı yapıyorsunuz veritabanıza veriyi girdiniz ve mail, sms gondermeniz gerekiyor diyelim. Hem sms, hem mail kullanıcının bayağı bir beklemesini gerektirir. Bu hususta siz rabbitmq ye “hacı ben verileri girdim mail ile sms sende” diyip yolunuza hiç kullanıcıyı bekletmeden devam edebilirsiniz. Sms ile mail işlemini rabbitmqde kuyruğa attığınızda rabbitmq ram’in müsait olduğu anda bu işlemleri arkaplanda yapar. Sizde bekleme süresi uzun olan işlemlerde kullanıcılarınızı boşuna bekletip sıkmamış olursunuz ve bilgisayara fazla yük bindirmemiş olursunuz.

Rabitmq’nin mantığı şu: sizin sunucunuzun ram’inize diyorki, al sana 3mb sen bununla idare bu işi yap. al sana 100mb seni sevdim sende şu işi yap diyor. Yani ram’ini birden fazla işçiye çeviriyor. Bu şekilde belirlenen limitler çerçevesinde sizin istediğiniz işlemi yapıyor. Kullanıcı bekleme yapmıyor, bilgisayar zorlanmıyor ve aynı anda birden fazla işlem yapmanızı sağlıyor.

Küçük bir örnek daha : Mesela bir işlemi 10 saniye yada 1 dk sonra yapmasını istiyorsunuz diyelim. Bu zamanlanmış işlemi’de yapıyor. İşlemi ileri bir zamana atabiliyorsunuz.

Siz mesajınızı send.php göndereceksiniz. Arkaplanda çalışan consumer ( process.php ) da yaptıracaksınız. Mantık çok basit. Veriyi kuruğa gönder arkaplanda consumer bu işlemi yapsın.

 

Şimdi RabbitMq’ye bağlanıp send.php ile  bir mesaj gönderelimde başlangıçı yapalım. Mesajınımızı send.php ile göndereceğiz ve process.php de yaptığı işlemi görebileceğiz.

Bu oluşturduğumuz send.php ile sadace direkt gönderme işlemi yapabiliriz. zamanlanmış gönderi yapamayız. zamanlanmış gönderi en sonda.


<?php
//kütüphanemizi projemize dahil edelim
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//AMQP bağlantı
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); /* bunları sunucuya geçtiğiniz kesinlikle değiştirin */
$channel = $connection->channel();

$datetime = date("d-m-Y H:i:s");
$queueName = "test"; /* kuyruk adı */

//kanalımızı tanımlayalım
$channel->queue_declare($queueName, false, false, false, false);
$msg = new AMQPMessage("Sizin Mesajınız ve Gönderi Zamanı : ".$datetime);
$channel->basic_publish($msg, '', $queueName); /* mesajı gonderiyoruz */

echo "İçerik Gönderi Zamanı : ".$datetime." <br>";

//kanalımızı ve bağlantımızı kapatalım, kapatmasanızda ölümcül bir sonuç doğurmaz ( bende sıkıntı olmadı hiç )
$channel->close();
$connection->close();
?>

 

Not : Burada “queue_declare” dediğimiz anda eğerki o an kuyruk yok ise bu kuyruğu oluşturur

 

Sıra geldi consumer’a process.php dosyamız


<?php
//autoload ile amqplib kütüphanemizi ekleyelim ve kullanalım
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

//AMQP bağlantı
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); /* bunları sunucuya geçtiğiniz kesinlikle değiştirin */
$channel = $connection->channel();

$queueName = "test"; /* kuyruk adı */
$channel->queue_declare($queueName, false, false, false, false);
echo 'Kuyruğa mesaj gönderildiğinde alt kısımda görünecektir. iptal icin CTRL+C yapın, linux sunucudalar durmadan çalışması için "nohup php process.php &" komutunu kullanın \r\n';

$callback = function($msg) {
echo " Gönderinin teslim alındığı tarih : ".$msg->body." \r\n";
}

$channel->basic_consume($queueName, '', false, true, false, false, $callback);

//kanalımızı dinleyelim
while(count($channel->callbacks)) {
$channel->wait();
}
?>

 

Bu dosyayıda oluşturduktan sonra yapmamız gereken işlem işlem. command manager ( windows’ta cmd ) üzerinde bunu çalıştırmak.

  • Dosyanın bulunduğu klasöre gidin “cd /path/way/project”
  • php process.php ile çalıştırın
  • Unix sunucularda durmadan çalışmasını istiyorsanız “nohup php process.php &” yaparak çalıştırın. konsolu kapatsanız bile çalışmaya devam eder

 

Konsol üzerinde dosyamızı çalıştıdığımıza göre. send.php yi çalıştırarak mesajımızı gönderebiliriz ve sonucu konsol üzerinde görebiliriz.

Buraya kadar olan işlemlerin hepsi mesajınızı direkt gönderme üzerineydi.  Şimdi birde ileri tarihli bir mesaj gönderelim yani geçikmeli mesaj.

Geçikmeli mesaj için sendDelayed.php dosyamızı oluşturalım


<?php
//kütüphanemizi projemize dahil edelim
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//AMQP bağlantı
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$datetime = date("d-m-Y H:i:s");
$delay = 5; /* 5 saniye geçikmeli olsun */
$queueName = "test"; /* kuyruk adı */

//kanalımızı tanımlayalım
$channel->queue_declare($queueName, false, false, false, false);
$delayed = $queueName.'_delayed_'.$delay;
$channel->queue_declare($delayed, false, true, false, true,false, array(
'x-expires' => array('I',$delay*1000+100), /* mesaj gönderildikten kaç milisaniye sonra silinmesi gerektiğini belirtir */
'x-message-ttl' => array('I',$delay*1000), /* kaç milisaniye sonra mesajın asıl kuyruğa aktarılması gerektiğini belirtir */
'x-dead-letter-routing-key' => array('S',$queueName), /* zamanı geldiğinde geçikmeli mesajınızın hangi kuyruğa aktarılacağını belirtir */
'x-dead-letter-exchange' => array('S','') /* bu parametreyi eklemezseniz çalışmaz. exchange işlemi yapmak istersenizde burada ikinci parametreye takas yapılacak olan kuyruk adını yazın */
));
$msg = new AMQPMessage("Sizin Mesajınız ve Gönderi Zamanı : ".$datetime,array("delivery_mode"=>"1"));
$channel->basic_publish($msg, '', $queueName); /* mesajı gonderiyoruz */

echo "İçerik Gönderi Zamanı : ".$datetime." <br>";

//kanalımızı ve bağlantımızı kapatalım, kapatmasanızda ölümcül bir sonuç doğurmaz ( bende sıkıntı olmadı hiç )
$channel->close();
$connection->close();
?>

 

Oluşturduğumuz projeyi ve rabbitmq kütüphanesini buradan indirebilirsiniz.

Burada dikkat etmeniz gereken husus şu;  queue_declare methodunda bazı parametreleri değiştirdim. Benim oluştuduğum kuyruk geçici. Yani sizin göndermek istediğiniz mesajı aklında istediğiniz süre boyunca tutuyor ve işlemini asıl veriyi işleyecek olan kuyruğa aktarıyor ve kendini sonlandırıyor. Dilerseniz 5. sıradaki true olan kısmı false yaparak ve x-expires parametresini kaldırarak bu kuyruğu kalıcı hale getirebilirsiniz.

Kuyruğun geçici olması size birşey kaybettirmez.  Mesela bir işleminiz var 1dk sonra yapılacak ancak 20. saniyesine geldiğinizde 1-2 tane daha fazladan 1dk ileri tarihli daha işlem eklemek istiyorsanız ekleyin sıkıntı olmaz. Yeni eklediğiniz işlemlerinizi bitirmeden kuyruk kendisini sonlandırmaz. Mantık olarak baktığınızda yeni eklenen işlemlerin süresi ilk eklediğinizdeki kendini sonlandırma süresini aşıyor ancak bunu düşünmüşler ve sonradan kuyruğa eklediğiniz mesajın sonlanma zamanı önemli. ( x-expires )

 

Öneride bulunmam gerekirse, farklı tipdeki işlemleri farklı kuyruklar altında toplayın. Performans açısından daha iyidir. Aynı kuyrukta birden fazla veri biriktirmemeye çalışın.

Kuyruktaki işlemleri localhost:15672 ‘den takip edebilirsiniz ve unacknowledged kısmında biriken mesaj sizi korkutmasın. Sizi asıl korkutması gereken Ready kısmında bekleyen mesajlar olsun. Bu consumar’ın çalışmadığı anlamına gelir.

3 Comments

    1. samet

      teşekkürler. Çalışan kodumdan örnek alarak yapmıştım kodu düzelttim gözümden kaçmış.

      Reply
  1. Erhan

    bir de “basic_publish” metodunun son parametresi “$delayed” olmalıdır. oluşturduğumuz mesaj “test_delated_5” kanalına yollanmalıdır.

    Reply

DROP A COMMENT

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir