asrt
Automated System Runtime Testing library
Loading...
Searching...
No Matches
stream.hpp
1
11#pragma once
12
13#include "../asrtl/asrt_assert.h"
14#include "../asrtl/log.h"
15#include "../asrtl/util.h"
16#include "../asrtlpp/callback.hpp"
17#include "../asrtlpp/task.hpp"
18#include "../asrtlpp/util.hpp"
19#include "../asrtr/stream.h"
20#include "./task_unit.hpp"
21
22namespace asrt
23{
24
25template < typename T >
27
28template <>
29struct strm_field_traits< uint8_t >
30{
31 static constexpr auto tag = ASRT_STRM_FIELD_U8;
32 static constexpr auto size = 1;
33 static void encode( uint8_t*& p, uint8_t v ) { *p++ = v; }
34};
35
36template <>
37struct strm_field_traits< uint16_t >
38{
39 static constexpr auto tag = ASRT_STRM_FIELD_U16;
40 static constexpr auto size = 2;
41 static void encode( uint8_t*& p, uint16_t v ) { asrt_add_u16( &p, v ); }
42};
43
44template <>
45struct strm_field_traits< uint32_t >
46{
47 static constexpr auto tag = ASRT_STRM_FIELD_U32;
48 static constexpr auto size = 4;
49 static void encode( uint8_t*& p, uint32_t v ) { asrt_add_u32( &p, v ); }
50};
51
52template <>
53struct strm_field_traits< int8_t >
54{
55 static constexpr auto tag = ASRT_STRM_FIELD_I8;
56 static constexpr auto size = 1;
57 static void encode( uint8_t*& p, int8_t v ) { *p++ = static_cast< uint8_t >( v ); }
58};
59
60template <>
61struct strm_field_traits< int16_t >
62{
63 static constexpr auto tag = ASRT_STRM_FIELD_I16;
64 static constexpr auto size = 2;
65 static void encode( uint8_t*& p, int16_t v )
66 {
67 asrt_add_u16( &p, static_cast< uint16_t >( v ) );
68 }
69};
70
71template <>
72struct strm_field_traits< int32_t >
73{
74 static constexpr auto tag = ASRT_STRM_FIELD_I32;
75 static constexpr auto size = 4;
76 static void encode( uint8_t*& p, int32_t v ) { asrt_add_i32( &p, v ); }
77};
78
79template <>
80struct strm_field_traits< float >
81{
82 static constexpr auto tag = ASRT_STRM_FIELD_FLOAT;
83 static constexpr auto size = 4;
84 static void encode( uint8_t*& p, float v )
85 {
86 uint32_t u;
87 std::memcpy( &u, &v, 4 );
88 asrt_add_u32( &p, u );
89 }
90};
91
92template <>
93struct strm_field_traits< bool >
94{
95 static constexpr auto tag = ASRT_STRM_FIELD_BOOL;
96 static constexpr auto size = 1;
97 static void encode( uint8_t*& p, bool v ) { *p++ = v ? 1 : 0; }
98};
99
101ASRT_NODISCARD inline status init( ref< asrt_stream_client > client, asrt_node& prev )
102{
103 return asrt_stream_client_init( client, &prev );
104}
105
108ASRT_NODISCARD inline status define(
109 ref< asrt_stream_client > client,
110 uint8_t schema_id,
111 enum asrt_strm_field_type_e const* fields,
112 uint8_t field_count,
114{
115 return asrt_stream_client_define(
116 client, schema_id, fields, field_count, done_cb.fn, done_cb.ptr );
117}
120ASRT_NODISCARD inline status emit(
121 ref< asrt_stream_client > client,
122 uint8_t schema_id,
123 uint8_t const* data,
124 uint16_t data_size,
126{
127 return asrt_stream_client_emit(
128 client, schema_id, data, data_size, done_cb.fn, done_cb.ptr );
129}
130
132ASRT_NODISCARD inline status reset( ref< asrt_stream_client > client )
133{
134 return asrt_stream_client_reset( client );
135}
136
138inline void deinit( ref< asrt_stream_client > client )
139{
140 asrt_stream_client_deinit( client );
141}
142
148template < typename... Ts >
150{
151 static constexpr uint16_t emit_size = ( strm_field_traits< Ts >::size + ... );
152
155 ref< asrt_stream_client > client,
156 uint8_t schema_id,
158 : _client( client )
159 , _schema_id( schema_id )
160 {
161 auto s = define( _client, _schema_id, fields, sizeof...( Ts ), done_cb );
162 if ( s != ASRT_SUCCESS ) {
163 ASRT_ERR_LOG( "asrt_stream_schema", "define failed" );
164 ASRT_ASSERT( false );
165 }
166 }
167
168 stream_schema( ref< asrt_stream_client > c, uint8_t id )
169 : _client( c )
170 , _schema_id( id )
171 {
172 }
173
174
175 stream_schema( stream_schema const& ) = delete;
176 stream_schema& operator=( stream_schema const& ) = delete;
177
178 stream_schema( stream_schema&& o ) noexcept
179 : _client( o._client )
180 , _schema_id( o._schema_id )
181 {
182 o._client = nullptr;
183 }
184
185 stream_schema& operator=( stream_schema&& o ) noexcept
186 {
187 _client = o._client;
188 _schema_id = o._schema_id;
189 o._client = nullptr;
190 return *this;
191 }
192
194 ASRT_NODISCARD status emit( Ts... args, callback< asrt_stream_done_cb > done_cb )
195 {
196 uint8_t* p = _emit_buf;
197 ( strm_field_traits< Ts >::encode( p, args ), ... );
198 return asrt::emit( _client, _schema_id, _emit_buf, emit_size, done_cb );
199 }
200
202 ASRT_NODISCARD status
203 emit_raw( uint8_t const* buf, callback< asrt_stream_done_cb > done_cb )
204 {
205 return asrt::emit( _client, _schema_id, buf, emit_size, done_cb );
206 }
207
208 ~stream_schema() = default;
209
210 static constexpr enum asrt_strm_field_type_e fields[] = { strm_field_traits< Ts >::tag... };
211
212private:
213 asrt_stream_client* _client;
214 uint8_t _schema_id;
215 uint8_t _emit_buf[emit_size > 0 ? emit_size : 1];
216};
217
219template < typename... Ts >
220struct _stream_define_ctx
221{
222 using completion_signatures = ecor::completion_signatures<
223 ecor::set_value_t( stream_schema< Ts... > ),
224 ecor::set_error_t( status ) >;
225
226 asrt_stream_client* client;
227 uint8_t schema_id;
228
229 template < typename OP >
230 void start( OP& op )
231 {
232 auto cb = +[]( void* ptr, enum asrt_status s ) {
233 auto& o = *static_cast< OP* >( ptr );
234 if ( s == ASRT_SUCCESS )
235 o.receiver.set_value(
236 stream_schema< Ts... >{ o.ctx.client, o.ctx.schema_id } );
237 else
238 o.receiver.set_error( s );
239 };
240 auto s = define(
241 *client,
242 schema_id,
243 stream_schema< Ts... >::fields,
244 sizeof...( Ts ),
245 { cb, &op } );
246 if ( s != ASRT_SUCCESS )
247 op.receiver.set_error( s );
248 }
249};
250
254template < typename... Ts >
255using stream_define_sender = ecor::sender_from< _stream_define_ctx< Ts... > >;
256
259template < typename... Ts >
260ecor::sender auto define( asrt_stream_client& client, uint8_t schema_id )
261{
262 return stream_define_sender< Ts... >{ { &client, schema_id } };
263}
264
265
267template < typename... Ts >
268struct _stream_emit_ctx
269{
270 using completion_signatures =
271 ecor::completion_signatures< ecor::set_value_t(), ecor::set_error_t( status ) >;
272
273 stream_schema< Ts... >* schema;
274 uint8_t buf[stream_schema< Ts... >::emit_size];
275
276 template < typename OP >
277 void start( OP& op )
278 {
279 auto cb = +[]( void* ptr, enum asrt_status s ) {
280 auto& o = *static_cast< OP* >( ptr );
281 if ( s == ASRT_SUCCESS )
282 o.receiver.set_value();
283 else
284 o.receiver.set_error( s );
285 };
286 auto st = schema->emit_raw( buf, { cb, &op } );
287 if ( st != ASRT_SUCCESS )
288 op.receiver.set_error( st );
289 }
290};
291
295template < typename... Ts >
296using stream_emit_sender = ecor::sender_from< _stream_emit_ctx< Ts... > >;
297
300template < typename... Ts >
301ecor::sender auto emit( stream_schema< Ts... >& schema, Ts... args )
302{
303 _stream_emit_ctx< Ts... > ctx{ &schema, {} };
304 uint8_t* p = ctx.buf;
305 ( strm_field_traits< Ts >::encode( p, args ), ... );
306 return stream_emit_sender< Ts... >{ std::move( ctx ) };
307}
308
309
310} // namespace asrt
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee ...
Definition: callback.hpp:17
ASRT_NODISCARD status define(ref< asrt_stream_client > client, uint8_t schema_id, enum asrt_strm_field_type_e const *fields, uint8_t field_count, callback< asrt_stream_done_cb > done_cb)
Send a DEFINE message registering schema_id with the given fields.
Definition: stream.hpp:108
ASRT_NODISCARD status emit(ref< asrt_stream_client > client, uint8_t schema_id, uint8_t const *data, uint16_t data_size, callback< asrt_stream_done_cb > done_cb)
Send one DATA record for schema_id.
Definition: stream.hpp:120
ASRT_NODISCARD status reset(ref< asrt_stream_client > client)
Clear all pending state on the client (e.g. at a test boundary).
Definition: stream.hpp:132
ASRT_NODISCARD status start(ref< asrt_controller > c, callback< asrt_init_callback > cb, uint32_t timeout)
Begin the protocol handshake; cb is invoked once the target responds or the operation times out.
Definition: controller.hpp:36
ASRT_NODISCARD enum asrt_status init(ref< asrt_cntr_assm > assm, asrt_allocator alloc)
Initialise the controller assembly — wires controller, diag, param, collect and stream channels.
Definition: cntr_assm.hpp:25
ecor::sender_from< _stream_emit_ctx< Ts... > > stream_emit_sender
Sender for co_await emit(schema, args...).
Definition: stream.hpp:296
void deinit(ref< asrt_cntr_assm > assm)
Release all resources owned by the assembly.
Definition: cntr_assm.hpp:52
ecor::sender_from< _stream_define_ctx< Ts... > > stream_define_sender
Sender for co_await define<Ts...>(client, schema_id).
Definition: stream.hpp:255
A type-erasing wrapper for C-style callback + void* pairs.
Definition: callback.hpp:27
Compile-time typed stream schema.
Definition: stream.hpp:150
ASRT_NODISCARD status emit(Ts... args, callback< asrt_stream_done_cb > done_cb)
Encode args in-place and send a DATA message using the underlying emit function.
Definition: stream.hpp:194
stream_schema(ref< asrt_stream_client > client, uint8_t schema_id, callback< asrt_stream_done_cb > done_cb)
XXX: error handling in constructor is bad.
Definition: stream.hpp:154
ASRT_NODISCARD status emit_raw(uint8_t const *buf, callback< asrt_stream_done_cb > done_cb)
Emit from a pre-encoded buffer; used internally by stream_emit_sender.
Definition: stream.hpp:203
Definition: stream.hpp:26
A node in a doubly-linked channel chain.
Definition: chann.h:122
Reactor-side stream client (ASRT_STRM channel).
Definition: stream.h:53