Skip to content

Commit 6e482c5

Browse files
smunautjmcorgan
authored andcommitted
gr-zeromq: Big rework for performance and correctness
- Use class hierarchy trying to maximize code re-use. - Dont' drop samples on receive if the output buffer doesn't have enough space. - Don't drop tags on receive by putting tags in the future. - Better metadata creation/parsing avoiding copying lots data. - Always do as much work as possible in a single call to work() to avoid scheduler overhead as long as possible. - Allow setting the high watermark to avoid older version of zeromq's default of buffering infinite messages and causing a paging thrash to/from disk when the flow graph can't keep up. Signed-off-by: Sylvain Munaut <[email protected]>
1 parent 393624c commit 6e482c5

29 files changed

+651
-474
lines changed

gr-zeromq/grc/zeromq_pub_sink.xml

+18-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<key>zeromq_pub_sink</key>
55
<category>ZeroMQ Interfaces</category>
66
<import>from gnuradio import zeromq</import>
7-
<make>zeromq.pub_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make>
7+
<make>zeromq.pub_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags, $hwm)</make>
88

99
<param>
1010
<name>IO Type</name>
@@ -61,7 +61,23 @@
6161
<name>Pass Tags</name>
6262
<key>pass_tags</key>
6363
<value>False</value>
64-
<type>bool</type>
64+
<type>enum</type>
65+
<option>
66+
<name>Yes</name>
67+
<key>True</key>
68+
</option>
69+
<option>
70+
<name>No</name>
71+
<key>False</key>
72+
</option>
73+
</param>
74+
75+
<param>
76+
<name>High Watermark</name>
77+
<key>hwm</key>
78+
<value>-1</value>
79+
<type>int</type>
80+
<hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
6581
</param>
6682

6783
<sink>

gr-zeromq/grc/zeromq_pull_source.xml

+18-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<key>zeromq_pull_source</key>
55
<category>ZeroMQ Interfaces</category>
66
<import>from gnuradio import zeromq</import>
7-
<make>zeromq.pull_source($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make>
7+
<make>zeromq.pull_source($type.itemsize, $vlen, $address, $timeout, $pass_tags, $hwm)</make>
88

99
<param>
1010
<name>IO Type</name>
@@ -61,7 +61,23 @@
6161
<name>Pass Tags</name>
6262
<key>pass_tags</key>
6363
<value>False</value>
64-
<type>bool</type>
64+
<type>enum</type>
65+
<option>
66+
<name>Yes</name>
67+
<key>True</key>
68+
</option>
69+
<option>
70+
<name>No</name>
71+
<key>False</key>
72+
</option>
73+
</param>
74+
75+
<param>
76+
<name>High Watermark</name>
77+
<key>hwm</key>
78+
<value>-1</value>
79+
<type>int</type>
80+
<hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
6581
</param>
6682

6783
<source>

gr-zeromq/grc/zeromq_push_sink.xml

+18-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<key>zeromq_push_sink</key>
55
<category>ZeroMQ Interfaces</category>
66
<import>from gnuradio import zeromq</import>
7-
<make>zeromq.push_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make>
7+
<make>zeromq.push_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags, $hwm)</make>
88

99
<param>
1010
<name>IO Type</name>
@@ -61,7 +61,23 @@
6161
<name>Pass Tags</name>
6262
<key>pass_tags</key>
6363
<value>False</value>
64-
<type>bool</type>
64+
<type>enum</type>
65+
<option>
66+
<name>Yes</name>
67+
<key>True</key>
68+
</option>
69+
<option>
70+
<name>No</name>
71+
<key>False</key>
72+
</option>
73+
</param>
74+
75+
<param>
76+
<name>High Watermark</name>
77+
<key>hwm</key>
78+
<value>-1</value>
79+
<type>int</type>
80+
<hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
6581
</param>
6682

6783
<sink>

gr-zeromq/grc/zeromq_rep_sink.xml

+18-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<key>zeromq_rep_sink</key>
55
<category>ZeroMQ Interfaces</category>
66
<import>from gnuradio import zeromq</import>
7-
<make>zeromq.rep_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make>
7+
<make>zeromq.rep_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags, $hwm)</make>
88

99
<param>
1010
<name>IO Type</name>
@@ -61,7 +61,23 @@
6161
<name>Pass Tags</name>
6262
<key>pass_tags</key>
6363
<value>False</value>
64-
<type>bool</type>
64+
<type>enum</type>
65+
<option>
66+
<name>Yes</name>
67+
<key>True</key>
68+
</option>
69+
<option>
70+
<name>No</name>
71+
<key>False</key>
72+
</option>
73+
</param>
74+
75+
<param>
76+
<name>High Watermark</name>
77+
<key>hwm</key>
78+
<value>-1</value>
79+
<type>int</type>
80+
<hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
6581
</param>
6682

6783
<sink>

gr-zeromq/grc/zeromq_req_source.xml

+18-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<key>zeromq_req_source</key>
55
<category>ZeroMQ Interfaces</category>
66
<import>from gnuradio import zeromq</import>
7-
<make>zeromq.req_source($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make>
7+
<make>zeromq.req_source($type.itemsize, $vlen, $address, $timeout, $pass_tags, $hwm)</make>
88

99
<param>
1010
<name>IO Type</name>
@@ -61,7 +61,23 @@
6161
<name>Pass Tags</name>
6262
<key>pass_tags</key>
6363
<value>False</value>
64-
<type>bool</type>
64+
<type>enum</type>
65+
<option>
66+
<name>Yes</name>
67+
<key>True</key>
68+
</option>
69+
<option>
70+
<name>No</name>
71+
<key>False</key>
72+
</option>
73+
</param>
74+
75+
<param>
76+
<name>High Watermark</name>
77+
<key>hwm</key>
78+
<value>-1</value>
79+
<type>int</type>
80+
<hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
6581
</param>
6682

6783
<source>

gr-zeromq/grc/zeromq_sub_source.xml

+18-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<key>zeromq_sub_source</key>
55
<category>ZeroMQ Interfaces</category>
66
<import>from gnuradio import zeromq</import>
7-
<make>zeromq.sub_source($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make>
7+
<make>zeromq.sub_source($type.itemsize, $vlen, $address, $timeout, $pass_tags, $hwm)</make>
88

99
<param>
1010
<name>IO Type</name>
@@ -61,7 +61,23 @@
6161
<name>Pass Tags</name>
6262
<key>pass_tags</key>
6363
<value>False</value>
64-
<type>bool</type>
64+
<type>enum</type>
65+
<option>
66+
<name>Yes</name>
67+
<key>True</key>
68+
</option>
69+
<option>
70+
<name>No</name>
71+
<key>False</key>
72+
</option>
73+
</param>
74+
75+
<param>
76+
<name>High Watermark</name>
77+
<key>hwm</key>
78+
<value>-1</value>
79+
<type>int</type>
80+
<hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
6581
</param>
6682

6783
<source>

gr-zeromq/include/gnuradio/zeromq/pub_sink.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,10 @@ namespace gr {
5353
* \param address ZMQ socket address specifier.
5454
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments.
5555
* \param pass_tags Whether sink will serialize and pass tags over the link.
56+
* \param hwm High Watermark to configure the socket to (-1 => zmq's default)
5657
*/
5758
static sptr make(size_t itemsize, size_t vlen, char *address,
58-
int timeout=100, bool pass_tags=false);
59+
int timeout=100, bool pass_tags=false, int hwm=-1);
5960
};
6061

6162
} // namespace zeromq

gr-zeromq/include/gnuradio/zeromq/pull_source.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,10 @@ namespace gr {
5050
* \param address ZMQ socket address specifier.
5151
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments.
5252
* \param pass_tags Whether source will look for and deserialize tags.
53+
* \param hwm High Watermark to configure the socket to (-1 => zmq's default)
5354
*/
5455
static sptr make(size_t itemsize, size_t vlen, char *address,
55-
int timeout=100, bool pass_tags=false);
56+
int timeout=100, bool pass_tags=false, int hwm=-1);
5657
};
5758

5859
} // namespace zeromq

gr-zeromq/include/gnuradio/zeromq/push_sink.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ namespace gr {
5454
* \param address ZMQ socket address specifier.
5555
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments.
5656
* \param pass_tags Whether sink will serialize and pass tags over the link.
57+
* \param hwm High Watermark to configure the socket to (-1 => zmq's default)
5758
*/
5859
static sptr make(size_t itemsize, size_t vlen, char *address,
59-
int timeout=100, bool pass_tags=false);
60+
int timeout=100, bool pass_tags=false, int hwm=-1);
6061
};
6162

6263
} // namespace zeromq

gr-zeromq/include/gnuradio/zeromq/rep_sink.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,10 @@ namespace gr {
5252
* \param address ZMQ socket address specifier.
5353
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments.
5454
* \param pass_tags Whether sink will serialize and pass tags over the link.
55+
* \param hwm High Watermark to configure the socket to (-1 => zmq's default)
5556
*/
5657
static sptr make(size_t itemsize, size_t vlen, char *address,
57-
int timeout=100, bool pass_tags=false);
58+
int timeout=100, bool pass_tags=false, int hwm=-1);
5859
};
5960

6061
} // namespace zeromq

gr-zeromq/include/gnuradio/zeromq/req_source.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,10 @@ namespace gr {
5050
* \param address ZMQ socket address specifier.
5151
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments.
5252
* \param pass_tags Whether source will look for and deserialize tags.
53+
* \param hwm High Watermark to configure the socket to (-1 => zmq's default)
5354
*/
5455
static sptr make(size_t itemsize, size_t vlen, char *address,
55-
int timeout=100, bool pass_tags=false);
56+
int timeout=100, bool pass_tags=false, int hwm=-1);
5657
};
5758

5859
} // namespace zeromq

gr-zeromq/include/gnuradio/zeromq/sub_source.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,10 @@ namespace gr {
5050
* \param address ZMQ socket address specifier.
5151
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments.
5252
* \param pass_tags Whether source will look for and deserialize tags.
53+
* \param hwm High Watermark to configure the socket to (-1 => zmq's default)
5354
*/
5455
static sptr make(size_t itemsize, size_t vlen, char *address,
55-
int timeout=100, bool pass_tags=false);
56+
int timeout=100, bool pass_tags=false, int hwm=-1);
5657
};
5758

5859
} // namespace zeromq

gr-zeromq/lib/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ endif(ENABLE_GR_CTRLPORT)
3737
# Setup library
3838
########################################################################
3939
list(APPEND zeromq_sources
40+
base_impl.cc
4041
pub_sink_impl.cc
4142
pub_msg_sink_impl.cc
4243
sub_source_impl.cc

0 commit comments

Comments
 (0)