Skip to content

Commit b6159b2

Browse files
author
Martin Braun
committed
blocks: Added tagged stream mux feature to preserve "head" position of tags on other inputs than 0
1 parent 407c356 commit b6159b2

File tree

5 files changed

+61
-9
lines changed

5 files changed

+61
-9
lines changed

gr-blocks/grc/blocks_tagged_stream_mux.xml

+8-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<name>Tagged Stream Mux</name>
33
<key>blocks_tagged_stream_mux</key>
44
<import>from gnuradio import blocks</import>
5-
<make>blocks.tagged_stream_mux($type.size*$vlen, $lengthtagname)</make>
5+
<make>blocks.tagged_stream_mux($type.size*$vlen, $lengthtagname, $tag_preserve_head_pos)</make>
66
<param>
77
<name>IO Type</name>
88
<key>type</key>
@@ -49,6 +49,13 @@
4949
<value>1</value>
5050
<type>int</type>
5151
</param>
52+
<param>
53+
<name>Tags: Preserve head position on input</name>
54+
<key>tag_preserve_head_pos</key>
55+
<value>0</value>
56+
<type>int</type>
57+
<hide>part</hide>
58+
</param>
5259
<sink>
5360
<name>in</name>
5461
<type>$type</type>

gr-blocks/include/gnuradio/blocks/tagged_stream_mux.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ namespace gr {
4141
*
4242
* All other tags are propagated as expected, i.e. they stay associated
4343
* with the same input item.
44+
* There are cases when this behaviour is undesirable. One special case
45+
* is when a tag at the first element (the head item) of one input port
46+
* must stay on the head item of the output port. To achieve this,
47+
* set \p tag_preserve_head_pos to the port that will receive these special
48+
* tags.
4449
*/
4550
class BLOCKS_API tagged_stream_mux : virtual public tagged_stream_block
4651
{
@@ -52,8 +57,9 @@ namespace gr {
5257
*
5358
* \param itemsize Items size (number of bytes per item)
5459
* \param lengthtagname Length tag key
60+
* \param tag_preserve_head_pos Preserves the head position of tags on this input port
5561
*/
56-
static sptr make(size_t itemsize, const std::string &lengthtagname);
62+
static sptr make(size_t itemsize, const std::string &lengthtagname, unsigned int tag_preserve_head_pos=0);
5763
};
5864

5965
} // namespace blocks

gr-blocks/lib/tagged_stream_mux_impl.cc

+9-5
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,18 @@ namespace gr {
3131
namespace blocks {
3232

3333
tagged_stream_mux::sptr
34-
tagged_stream_mux::make(size_t itemsize, const std::string &lengthtagname)
34+
tagged_stream_mux::make(size_t itemsize, const std::string &lengthtagname, unsigned int tag_preserve_head_pos)
3535
{
36-
return gnuradio::get_initial_sptr (new tagged_stream_mux_impl(itemsize, lengthtagname));
36+
return gnuradio::get_initial_sptr (new tagged_stream_mux_impl(itemsize, lengthtagname, tag_preserve_head_pos));
3737
}
3838

39-
tagged_stream_mux_impl::tagged_stream_mux_impl(size_t itemsize, const std::string &lengthtagname)
39+
tagged_stream_mux_impl::tagged_stream_mux_impl(size_t itemsize, const std::string &lengthtagname, unsigned int tag_preserve_head_pos)
4040
: tagged_stream_block("tagged_stream_mux",
4141
io_signature::make(1, -1, itemsize),
4242
io_signature::make(1, 1, itemsize),
4343
lengthtagname),
44-
d_itemsize(itemsize)
44+
d_itemsize(itemsize),
45+
d_tag_preserve_head_pos(tag_preserve_head_pos)
4546
{
4647
set_tag_propagation_policy(TPP_DONT);
4748
}
@@ -77,7 +78,10 @@ namespace gr {
7778
std::vector<tag_t> tags;
7879
get_tags_in_range(tags, i, nitems_read(i), nitems_read(i)+ninput_items[i]);
7980
for (unsigned int j = 0; j < tags.size(); j++) {
80-
const uint64_t offset = tags[j].offset - nitems_read(i) + nitems_written(0) + n_produced;
81+
uint64_t offset = tags[j].offset - nitems_read(i) + nitems_written(0) + n_produced;
82+
if (i == d_tag_preserve_head_pos && tags[j].offset == nitems_read(i)) {
83+
offset -= n_produced;
84+
}
8185
add_item_tag(0, offset, tags[j].key, tags[j].value);
8286
}
8387
memcpy((void *) out, (const void *) in, ninput_items[i] * d_itemsize);

gr-blocks/lib/tagged_stream_mux_impl.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ namespace gr {
3333
{
3434
private:
3535
size_t d_itemsize;
36+
unsigned int d_tag_preserve_head_pos;
3637

3738
protected:
38-
int calculate_output_stream_length(const std::vector<int> &ninput_items);
39+
int calculate_output_stream_length(const gr_vector_int &ninput_items);
3940

4041
public:
41-
tagged_stream_mux_impl(size_t itemsize, const std::string &lengthtagname);
42+
tagged_stream_mux_impl(size_t itemsize, const std::string &lengthtagname, unsigned int d_tag_preserve_head_pos);
4243
~tagged_stream_mux_impl();
4344

4445
int work(int noutput_items,

gr-blocks/python/blocks/qa_tagged_stream_mux.py

+34
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,40 @@ def test_1(self):
104104
]
105105
self.assertEqual(tags, tags_expected)
106106

107+
def test_preserve_tag_head_pos(self):
108+
""" Test the 'preserve head position' function.
109+
This will add a 'special' tag to item 0 on stream 1.
110+
It should be on item 0 of the output stream. """
111+
special_tag = gr.tag_t()
112+
special_tag.key = pmt.string_to_symbol('spam')
113+
special_tag.offset = 0
114+
special_tag.value = pmt.to_pmt('eggs')
115+
len_tag_key = "length"
116+
packet_len_1 = 5
117+
packet_len_2 = 3
118+
mux = blocks.tagged_stream_mux(gr.sizeof_float, len_tag_key, 1)
119+
sink = blocks.vector_sink_f()
120+
self.tb.connect(
121+
blocks.vector_source_f(range(packet_len_1)),
122+
blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len_1, len_tag_key),
123+
(mux, 0)
124+
)
125+
self.tb.connect(
126+
blocks.vector_source_f(range(packet_len_2), False, 1, (special_tag,)),
127+
blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len_2, len_tag_key),
128+
(mux, 1)
129+
)
130+
self.tb.connect(mux, sink)
131+
self.tb.run()
132+
self.assertEqual(sink.data(), tuple(range(packet_len_1) + range(packet_len_2)))
133+
tags = [gr.tag_to_python(x) for x in sink.tags()]
134+
tags = sorted([(x.offset, x.key, x.value) for x in tags])
135+
tags_expected = [
136+
(0, 'length', packet_len_1 + packet_len_2),
137+
(0, 'spam', 'eggs'),
138+
]
139+
self.assertEqual(tags, tags_expected)
140+
107141

108142
if __name__ == '__main__':
109143
gr_unittest.run(qa_tagged_stream_mux, "qa_tagged_stream_mux.xml")

0 commit comments

Comments
 (0)