例題

例題を動かしてみよう。RedisPubSubパッケージにはC++版のPublisherとSubscriberの例題が含まれている。

publisherとsyncSubscriberの例

このpublisherはテキストデータとバイナリーデータを発信することができる。 それぞれ、チャネル名はmlftext、mlfdataである。 また、syncSubscriberはそれらのデータを受け取り表示する。 ソースコードを眺めながら、その使い方について解説する。 RedisPubSubパッケージを利用する場合は、下記の通りRedisPubSub.hファイルをincludeする。

#include "RedisPubSub.h"

クラスRedisPublisherの使い方

クラスRedisPublisherはRedisPubSubのpublish機能を提供する。 initメソッドで行うことは、Redisサーバへの接続であり、それが成功すると返り値として0を返す。 なんらかのエラーとなると、負の値を返すので、必ずチェックする必要がある。 Redisサーバのホスト名(IPアドレスでも良い)とポート番号はデフォルトで、それぞれlocalhost、6379となっている。 下記の例はデフォルト値を使った宣言である。

RedisPublisher pub;

status = pub.init();

Redisサーバのホスト名(IPアドレス)とポート番号の変更をしたい場合は、下記のように指定する。

RedisPublisher pub("192.168.100.10", 6666);

上記のような宣言の他に、スマートポインタを利用した方法がある。 スマートポインタは動的なメモリ確保、この場合はnew RedisPublisher("127.0.0.1")で生成されたRedisPublisher用のポインタを確保し、使用済みの時に、自動的に開放してくれる。 メモリーリークの心配から開放してくれるので有用である。

std::unique_ptr<RedisPublisher> pub;
pub.reset(new RedisPublisher("127.0.0.1"));
status = pub->init();

一度上記のように、接続が確立すると、RedisPublisherはテキストデータやバイナリーデータをpublishメソッドで発信できるようになる。 下記の例は、テキストデータをmlftextというチャネル名で発信する。

int32_t loop = 0;
string s = "test data";
stringstream ss;
ss << s << loop;
status = pub.publish("mlftext", (char*)ss.str().c_str(), (int32_t)ss.str().size());

publishメソッドの引数は、publish(string, char*, int32_t)である。返り値は、負の値はエラーとなり、0以上で、この発信されたデータを受け取ったSubscriberの数を返す。 バイナリーデータを送る場合も、同様にpublishメソッドを下記のように利用出来る。

char buf[1000];
for (int i=0; i<1000; i++) {
  buf[i] = i+1;
}
buf[0] = (loop+1)%256; 
status = pub.publish("mlfdata", (char*)buf, (int32_t)sizeof(buf));

クラスRedisSyncSubscriberの使い方

クラスRedisSyncSubscriberは スーパークラスとしてRedisSubscriberを持つ。 RedisSubscriberは、共通したSubscribe機能を提供する。 RedisSyncSubscriberはRedisSubscriberを継承し、同期型のsubscribe機能を提供する。 まだ非同期型のsubscribe機能は実装されていないが、早々に実装する予定である。

initメソッドで行うことは、redisサーバへの接続であり、それが成功すると返り値として0を返す。 なんらかのエラーとなると、負の値を返すので、必ずチェックする必要がある。

Redisサーバのホスト名(IPアドレス)とポート番号はデフォルトで、それぞれlocalhost、6379となっている。 下記の例はデフォルト値を使った宣言である。

すでに述べたようにスマートポインタを利用した場合と利用しない場合があるが、ここでは利用する例を示す。 少し分かりにくくなっているが、resetメソッドを呼び出す際、RedisSyncSubscriberのホスト名とポート番号の設定方法のいろいろを示している。

unique_ptr<RedisSyncSubscriber> sub;
//sub.reset(new RedisSyncSubscriber);
sub.reset(new RedisSyncSubscriber(host_name));
//sub.reset(new RedisSyncSubscriber(host_name, 6379));
//sub->setHost(host_name, 6379);
status = sub->init();

下記の例は、すでにsubscribeしてあるチャネル名がmlf*(mlftextとかmlfdataとか)であるチャネル名の全てを表示するものである。 getChannelsメソッドの返り値が0の場合は成功であり、負の値はエラーを意味する。 このメソッドはsubscribeメソッドを呼び出す前か、unsubscribeメソッドの後から呼び出し可能である。

vector<string> vec;
status = sub->getChannels("mlf*", vec); // all of channels
cout << "status = " << status << endl;
cout << "list of already subscribed channels : ";
for (int32_t i=0; i< (int32_t)vec.size(); i++)
  cout << vec[i] << " ";
cout << endl;

subscribeメソッドは複数のチャネル名をsubscribe出来る。

string name_text = "mlftext";
string name_data = "mlfdata";
status = sub->subscribe(name_text);
cout << "status = " << status << endl;
status = sub->subscribe(name_data);
cout << "status = " << status << endl;

subscribeメソッドの返り値は、成功すると0、エラーが起こると負の値が返る。

次は、getメソッドでデータを読む方法を示す。同期型のメソッドなので、一定時間データがpublishされていない場合は、タイムアウトして返り値1が返る。 デフォルトのタイムアウト時間は2000ミリ秒(2秒)であるが、指定することも可能である。 返り値は、成功すると0であり、エラーが起こると負の値である。 成功しても、lengthが0の場合データは入っていないので、処理をしない。

for (int32_t i=0; i<loop; i++) {
  status = sub->get(name, (char*)buf, length);
  //status = sub.get(name, (char*)buf, length, (int32_t)2000);
  if (status != 0) {
    if (status == 1) {
      cout << "time out..." << endl;
      continue;
    }
    cout << "error..." << endl;
    break;
  }
  cout << "name = " << name << " : ";
  if (name ==  "mlftext") { // text message
    if (length != 0) { 
      message = string(buf,length);
      cout << "Length of text data = " << length << " : message = " << message  << endl
    }
  } else { // binary
    if (length != 0) {
      cout << "Length of binary data = " << length;
      cout  << " : first data = " << (int32_t)buf[0] << endl;
    }
  }
}
sub->unsubscribe(name_text);
sub->unsubscribe(name_data);

getメソッドの呼び出しは、get(string, char*, int32_t[, int32_t])である。 unsubscribeメソッドの呼び出しは、unsubscribe(const string& name)である。

動かしてみよう

2つの端末を用意する。1つにはsyncSubscriber、もう1つにはpublisherを走らせる。

まずはmlftextデータを発信してそれを読み出す。syncSubscriberはpublisherが走る前に走らせた。

./syncSubscriber
status = 0
list of already subscribed channels : 
status = 0
status = 0
type any char : 1
time out...
name = mlftext : Length of text data = 10 : message = test data0
name = mlftext : Length of text data = 10 : message = test data1
name = mlftext : Length of text data = 10 : message = test data2
name = mlftext : Length of text data = 10 : message = test data3
name = mlftext : Length of text data = 10 : message = test data4
name = mlftext : Length of text data = 10 : message = test data5
name = mlftext : Length of text data = 10 : message = test data6
name = mlftext : Length of text data = 10 : message = test data7
name = mlftext : Length of text data = 10 : message = test data8

./publisher
Text data will be published.
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 0
num of subscribers = 0

number of subscribersが1から0に変化しているのは、subscriberが終了したから、subscribeしているものがなくなったため。

次はmlfdataデータを発信してそれを読み出す。syncSubscriberはpublisherが走ってから走らせた。

./syncSubscriber
status = 0
list of already subscribed channels : 
status = 0
status = 0
type any char : 1
name = mlfdata : Length of binary data = 1000 : first data = 5
name = mlfdata : Length of binary data = 1000 : first data = 6
name = mlfdata : Length of binary data = 1000 : first data = 7
name = mlfdata : Length of binary data = 1000 : first data = 8
name = mlfdata : Length of binary data = 1000 : first data = 9
name = mlfdata : Length of binary data = 1000 : first data = 10
name = mlfdata : Length of binary data = 1000 : first data = 11
name = mlfdata : Length of binary data = 1000 : first data = 12
name = mlfdata : Length of binary data = 1000 : first data = 13
name = mlfdata : Length of binary data = 1000 : first data = 14

./publisher 1
Binary data will be published.
num of subscribers = 0
num of subscribers = 0
num of subscribers = 0
num of subscribers = 0
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 0
num of subscribers = 0

最初のnum of subscribersが0なのは、まだsubscriberがsubscribeしていないからである。