例題
例題を動かしてみよう。RedisPubSubパッケージにはC++版のPublisherとSubscriberの例題が含まれている。
publisherとsyncSubscriberの例
このpublisherはテキストデータとバイナリーデータを発信することができる。 それぞれ、チャネル名はmlftext、mlfdataである。 また、syncSubscriberはそれらのデータを受け取り表示する。 ソースコードを眺めながら、その使い方について解説する。 RedisPubSubパッケージを利用する場合は、下記の通りRedisPubSub.hファイルをincludeする。
#include "RedisPubSub.h"
クラスRedisPublisherの使い方
クラスRedisPublisherはRedisPubSubのpublish機能を提供する。 initメソッドで行うことは、Redisサーバへの接続であり、それが成功すると返り値として0を返す。 なんらかのエラーとなると、負の値を返すので、必ずチェックする必要がある。 Redisサーバのホスト名(IPアドレスでも良い)とポート番号はデフォルトで、それぞれlocalhost、6379となっている。 下記の例はデフォルト値を使った宣言である。
RedisPublisher pub;
status = pub.init();
Redisサーバのホスト名(IPアドレス)とポート番号の変更をしたい場合は、下記のように指定する。
RedisPublisher pub("192.168.100.10", 6666);
上記のような宣言の他に、スマートポインタを利用した方法がある。
スマートポインタは動的なメモリ確保、この場合はnew RedisPublisher("127.0.0.1")
で生成されたRedisPublisher用のポインタを確保し、使用済みの時に、自動的に開放してくれる。
メモリーリークの心配から開放してくれるので有用である。
std::unique_ptr<RedisPublisher> pub;
pub.reset(new RedisPublisher("127.0.0.1"));
status = pub->init();
一度上記のように、接続が確立すると、RedisPublisherはテキストデータやバイナリーデータをpublishメソッドで発信できるようになる。 下記の例は、テキストデータをmlftextというチャネル名で発信する。
int32_t loop = 0;
string s = "test data";
stringstream ss;
ss << s << loop;
status = pub.publish("mlftext", (char*)ss.str().c_str(), (int32_t)ss.str().size());
publishメソッドの引数は、publish(string, char*, int32_t)
である。返り値は、負の値はエラーとなり、0以上で、この発信されたデータを受け取ったSubscriberの数を返す。
バイナリーデータを送る場合も、同様にpublishメソッドを下記のように利用出来る。
char buf[1000];
for (int i=0; i<1000; i++) {
buf[i] = i+1;
}
buf[0] = (loop+1)%256;
status = pub.publish("mlfdata", (char*)buf, (int32_t)sizeof(buf));
クラスRedisSyncSubscriberの使い方
クラスRedisSyncSubscriberは スーパークラスとしてRedisSubscriberを持つ。 RedisSubscriberは、共通したSubscribe機能を提供する。 RedisSyncSubscriberはRedisSubscriberを継承し、同期型のsubscribe機能を提供する。 まだ非同期型のsubscribe機能は実装されていないが、早々に実装する予定である。
initメソッドで行うことは、redisサーバへの接続であり、それが成功すると返り値として0を返す。 なんらかのエラーとなると、負の値を返すので、必ずチェックする必要がある。
Redisサーバのホスト名(IPアドレス)とポート番号はデフォルトで、それぞれlocalhost、6379となっている。 下記の例はデフォルト値を使った宣言である。
すでに述べたようにスマートポインタを利用した場合と利用しない場合があるが、ここでは利用する例を示す。 少し分かりにくくなっているが、resetメソッドを呼び出す際、RedisSyncSubscriberのホスト名とポート番号の設定方法のいろいろを示している。
unique_ptr<RedisSyncSubscriber> sub;
//sub.reset(new RedisSyncSubscriber);
sub.reset(new RedisSyncSubscriber(host_name));
//sub.reset(new RedisSyncSubscriber(host_name, 6379));
//sub->setHost(host_name, 6379);
status = sub->init();
下記の例は、すでにsubscribeしてあるチャネル名がmlf*(mlftextとかmlfdataとか)であるチャネル名の全てを表示するものである。 getChannelsメソッドの返り値が0の場合は成功であり、負の値はエラーを意味する。 このメソッドはsubscribeメソッドを呼び出す前か、unsubscribeメソッドの後から呼び出し可能である。
vector<string> vec;
status = sub->getChannels("mlf*", vec); // all of channels
cout << "status = " << status << endl;
cout << "list of already subscribed channels : ";
for (int32_t i=0; i< (int32_t)vec.size(); i++)
cout << vec[i] << " ";
cout << endl;
subscribeメソッドは複数のチャネル名をsubscribe出来る。
string name_text = "mlftext";
string name_data = "mlfdata";
status = sub->subscribe(name_text);
cout << "status = " << status << endl;
status = sub->subscribe(name_data);
cout << "status = " << status << endl;
subscribeメソッドの返り値は、成功すると0、エラーが起こると負の値が返る。
次は、getメソッドでデータを読む方法を示す。同期型のメソッドなので、一定時間データがpublishされていない場合は、タイムアウトして返り値1が返る。 デフォルトのタイムアウト時間は2000ミリ秒(2秒)であるが、指定することも可能である。 返り値は、成功すると0であり、エラーが起こると負の値である。 成功しても、lengthが0の場合データは入っていないので、処理をしない。
for (int32_t i=0; i<loop; i++) {
status = sub->get(name, (char*)buf, length);
//status = sub.get(name, (char*)buf, length, (int32_t)2000);
if (status != 0) {
if (status == 1) {
cout << "time out..." << endl;
continue;
}
cout << "error..." << endl;
break;
}
cout << "name = " << name << " : ";
if (name == "mlftext") { // text message
if (length != 0) {
message = string(buf,length);
cout << "Length of text data = " << length << " : message = " << message << endl
}
} else { // binary
if (length != 0) {
cout << "Length of binary data = " << length;
cout << " : first data = " << (int32_t)buf[0] << endl;
}
}
}
sub->unsubscribe(name_text);
sub->unsubscribe(name_data);
getメソッドの呼び出しは、get(string, char*, int32_t[, int32_t])
である。
unsubscribeメソッドの呼び出しは、unsubscribe(const string& name)
である。
動かしてみよう
2つの端末を用意する。1つにはsyncSubscriber、もう1つにはpublisherを走らせる。
まずはmlftextデータを発信してそれを読み出す。syncSubscriberはpublisherが走る前に走らせた。
./syncSubscriber
status = 0
list of already subscribed channels :
status = 0
status = 0
type any char : 1
time out...
name = mlftext : Length of text data = 10 : message = test data0
name = mlftext : Length of text data = 10 : message = test data1
name = mlftext : Length of text data = 10 : message = test data2
name = mlftext : Length of text data = 10 : message = test data3
name = mlftext : Length of text data = 10 : message = test data4
name = mlftext : Length of text data = 10 : message = test data5
name = mlftext : Length of text data = 10 : message = test data6
name = mlftext : Length of text data = 10 : message = test data7
name = mlftext : Length of text data = 10 : message = test data8
./publisher
Text data will be published.
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 0
num of subscribers = 0
number of subscribersが1から0に変化しているのは、subscriberが終了したから、subscribeしているものがなくなったため。
次はmlfdataデータを発信してそれを読み出す。syncSubscriberはpublisherが走ってから走らせた。
./syncSubscriber
status = 0
list of already subscribed channels :
status = 0
status = 0
type any char : 1
name = mlfdata : Length of binary data = 1000 : first data = 5
name = mlfdata : Length of binary data = 1000 : first data = 6
name = mlfdata : Length of binary data = 1000 : first data = 7
name = mlfdata : Length of binary data = 1000 : first data = 8
name = mlfdata : Length of binary data = 1000 : first data = 9
name = mlfdata : Length of binary data = 1000 : first data = 10
name = mlfdata : Length of binary data = 1000 : first data = 11
name = mlfdata : Length of binary data = 1000 : first data = 12
name = mlfdata : Length of binary data = 1000 : first data = 13
name = mlfdata : Length of binary data = 1000 : first data = 14
./publisher 1
Binary data will be published.
num of subscribers = 0
num of subscribers = 0
num of subscribers = 0
num of subscribers = 0
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 1
num of subscribers = 0
num of subscribers = 0
最初のnum of subscribersが0なのは、まだsubscriberがsubscribeしていないからである。