|
7 | 7 | #define DTBUF_CHUNK_PAYLOAD_SIZE 4000 |
8 | 8 |
|
9 | 9 | struct header { |
10 | | - time_ms timestamp; |
11 | | - chunk_length data_length; |
| 10 | + time_ms timestamp; |
| 11 | + chunk_length data_length; |
12 | 12 | }; |
13 | 13 |
|
14 | 14 | #define DTBUF_CHUNK_SIZE (sizeof (struct header) + DTBUF_CHUNK_PAYLOAD_SIZE) |
15 | 15 |
|
16 | | -int dtbuf_init(struct dtbuf *dtbuf, size_t capacity) { |
17 | | - // to avoid splitting a chunk on the circular buffer boundaries, add |
18 | | - // (DTBUF_CHUNK_SIZE-1) bytes at the end: a chunk starting at (capacity-1) |
19 | | - // will still fit |
20 | | - dtbuf->real_capacity = capacity + DTBUF_CHUNK_SIZE - 1; |
21 | | - if (!(dtbuf->data = malloc(dtbuf->real_capacity))) { |
22 | | - return 1; |
23 | | - } |
24 | | - dtbuf->capacity = capacity; |
25 | | - dtbuf->head = 0; |
26 | | - dtbuf->tail = 0; |
27 | | - return 0; |
| 16 | +int dtbuf_init(struct dtbuf *dtbuf, size_t capacity) |
| 17 | +{ |
| 18 | + // to avoid splitting a chunk on the circular buffer boundaries, add |
| 19 | + // (DTBUF_CHUNK_SIZE-1) bytes at the end: a chunk starting at (capacity-1) |
| 20 | + // will still fit |
| 21 | + dtbuf->real_capacity = capacity + DTBUF_CHUNK_SIZE - 1; |
| 22 | + if (!(dtbuf->data = malloc(dtbuf->real_capacity))) { |
| 23 | + return 1; |
| 24 | + } |
| 25 | + dtbuf->capacity = capacity; |
| 26 | + dtbuf->head = 0; |
| 27 | + dtbuf->tail = 0; |
| 28 | + return 0; |
28 | 29 | } |
29 | 30 |
|
30 | | -void dtbuf_free(struct dtbuf *dtbuf) { |
31 | | - free(dtbuf->data); |
| 31 | +void dtbuf_free(struct dtbuf *dtbuf) |
| 32 | +{ |
| 33 | + free(dtbuf->data); |
32 | 34 | } |
33 | 35 |
|
34 | | -int dtbuf_is_empty(struct dtbuf *dtbuf) { |
35 | | - return dtbuf->head == dtbuf->tail; |
| 36 | +int dtbuf_is_empty(struct dtbuf *dtbuf) |
| 37 | +{ |
| 38 | + return dtbuf->head == dtbuf->tail; |
36 | 39 | } |
37 | 40 |
|
38 | | -int dtbuf_is_full(struct dtbuf *dtbuf) { |
39 | | - // When dtbuf->head >= dtbuf->capacity, it "cycles" (reset to 0) if and |
40 | | - // only if there is enough space at the start for a full chunk. |
41 | | - // Thus, if dtbuf->head has not cycled while it is after capacity, then the |
42 | | - // buffer is full. |
43 | | - // Else, if head >= tail, there is always enough space (by design). |
44 | | - // Else (if head < tail), there is enough space only if dtbuf->tail is far |
45 | | - // enough (ie we can put a full chunk at the start). |
46 | | - return dtbuf->head >= dtbuf->capacity || (dtbuf->head < dtbuf->tail |
47 | | - && dtbuf->tail - dtbuf->head <= |
48 | | - DTBUF_CHUNK_SIZE); |
| 41 | +int dtbuf_is_full(struct dtbuf *dtbuf) |
| 42 | +{ |
| 43 | + // When dtbuf->head >= dtbuf->capacity, it "cycles" (reset to 0) if and |
| 44 | + // only if there is enough space at the start for a full chunk. |
| 45 | + // Thus, if dtbuf->head has not cycled while it is after capacity, then the |
| 46 | + // buffer is full. |
| 47 | + // Else, if head >= tail, there is always enough space (by design). |
| 48 | + // Else (if head < tail), there is enough space only if dtbuf->tail is far |
| 49 | + // enough (ie we can put a full chunk at the start). |
| 50 | + return dtbuf->head >= dtbuf->capacity || (dtbuf->head < dtbuf->tail |
| 51 | + && dtbuf->tail - dtbuf->head <= |
| 52 | + DTBUF_CHUNK_SIZE); |
49 | 53 | } |
50 | 54 |
|
51 | | -time_ms dtbuf_next_timestamp(struct dtbuf *dtbuf) { |
52 | | - struct header *header = (struct header *) &dtbuf->data[dtbuf->tail]; |
53 | | - return header->timestamp; |
| 55 | +time_ms dtbuf_next_timestamp(struct dtbuf *dtbuf) |
| 56 | +{ |
| 57 | + struct header *header = (struct header *) &dtbuf->data[dtbuf->tail]; |
| 58 | + return header->timestamp; |
54 | 59 | } |
55 | 60 |
|
56 | | -ssize_t dtbuf_write_chunk(struct dtbuf *dtbuf, int fd_in, time_ms timestamp) { |
57 | | - ssize_t r; |
58 | | - struct header header; |
59 | | - // directly write to dtbuf, at the right index |
60 | | - int payload_index = dtbuf->head + sizeof(struct header); |
61 | | - if ((r = |
62 | | - read(fd_in, &dtbuf->data[payload_index], |
63 | | - DTBUF_CHUNK_PAYLOAD_SIZE)) > 0) { |
64 | | - // write headers |
65 | | - header.timestamp = timestamp; |
66 | | - header.data_length = (chunk_length) r; |
67 | | - memcpy(&dtbuf->data[dtbuf->head], &header, sizeof(header)); |
68 | | - dtbuf->head = payload_index + r; |
69 | | - if (dtbuf->head >= dtbuf->capacity && dtbuf->tail >= DTBUF_CHUNK_SIZE) { |
70 | | - // not enough space at the end of the buffer, cycle if there is enough |
71 | | - // at the start |
72 | | - dtbuf->head = 0; |
| 61 | +ssize_t dtbuf_write_chunk(struct dtbuf *dtbuf, int fd_in, time_ms timestamp) |
| 62 | +{ |
| 63 | + ssize_t r; |
| 64 | + struct header header; |
| 65 | + // directly write to dtbuf, at the right index |
| 66 | + int payload_index = dtbuf->head + sizeof(struct header); |
| 67 | + if ((r = read(fd_in, &dtbuf->data[payload_index], |
| 68 | + DTBUF_CHUNK_PAYLOAD_SIZE)) > 0) { |
| 69 | + // write headers |
| 70 | + header.timestamp = timestamp; |
| 71 | + header.data_length = (chunk_length) r; |
| 72 | + memcpy(&dtbuf->data[dtbuf->head], &header, sizeof(header)); |
| 73 | + dtbuf->head = payload_index + r; |
| 74 | + if (dtbuf->head >= dtbuf->capacity && dtbuf->tail >= DTBUF_CHUNK_SIZE) { |
| 75 | + // not enough space at the end of the buffer, cycle if there is |
| 76 | + // enough at the start |
| 77 | + dtbuf->head = 0; |
| 78 | + } |
| 79 | + } else if (r == -1) { |
| 80 | + perror("read()"); |
73 | 81 | } |
74 | | - } else if (r == -1) { |
75 | | - perror("read()"); |
76 | | - } |
77 | | - return r; |
| 82 | + return r; |
78 | 83 | } |
79 | 84 |
|
80 | | -ssize_t dtbuf_read_chunk(struct dtbuf * dtbuf, int fd_out) { |
81 | | - ssize_t w; |
82 | | - struct header *pheader = (struct header *) &dtbuf->data[dtbuf->tail]; |
83 | | - struct header header; |
84 | | - chunk_length length = pheader->data_length; |
85 | | - // directly read from dtbuf, at the right index |
86 | | - int payload_index = dtbuf->tail + sizeof(struct header); |
87 | | - if ((w = write(fd_out, &dtbuf->data[payload_index], length)) > 0) { |
88 | | - if (w == length) { |
89 | | - // we succeed to write all the data |
90 | | - dtbuf->tail = payload_index + w; |
91 | | - if (dtbuf->tail >= dtbuf->capacity) { |
92 | | - // the next chunk cannot be after capacity |
93 | | - dtbuf->tail = 0; |
94 | | - if (dtbuf->head >= dtbuf->capacity) { |
95 | | - // can happen if capacity < DTBUF_CHUNK_SIZE |
96 | | - dtbuf->head = 0; |
| 85 | +ssize_t dtbuf_read_chunk(struct dtbuf *dtbuf, int fd_out) |
| 86 | +{ |
| 87 | + ssize_t w; |
| 88 | + struct header *pheader = (struct header *) &dtbuf->data[dtbuf->tail]; |
| 89 | + struct header header; |
| 90 | + chunk_length length = pheader->data_length; |
| 91 | + // directly read from dtbuf, at the right index |
| 92 | + int payload_index = dtbuf->tail + sizeof(struct header); |
| 93 | + if ((w = write(fd_out, &dtbuf->data[payload_index], length)) > 0) { |
| 94 | + if (w == length) { |
| 95 | + // we succeed to write all the data |
| 96 | + dtbuf->tail = payload_index + w; |
| 97 | + if (dtbuf->tail >= dtbuf->capacity) { |
| 98 | + // the next chunk cannot be after capacity |
| 99 | + dtbuf->tail = 0; |
| 100 | + if (dtbuf->head >= dtbuf->capacity) { |
| 101 | + // can happen if capacity < DTBUF_CHUNK_SIZE |
| 102 | + dtbuf->head = 0; |
| 103 | + } |
| 104 | + } |
| 105 | + } else { |
| 106 | + dtbuf->tail += w; |
| 107 | + // set the timestamp for writing at the new tail position |
| 108 | + header.timestamp = pheader->timestamp; |
| 109 | + // set the remaining length |
| 110 | + header.data_length = length - w; |
| 111 | + memcpy(&dtbuf->data[dtbuf->tail], &header, sizeof(header)); |
97 | 112 | } |
98 | | - } |
99 | | - } else { |
100 | | - dtbuf->tail += w; |
101 | | - // set the timestamp for writing at the new tail position |
102 | | - header.timestamp = pheader->timestamp; |
103 | | - // set the remaining length |
104 | | - header.data_length = length - w; |
105 | | - memcpy(&dtbuf->data[dtbuf->tail], &header, sizeof(header)); |
106 | | - } |
107 | | - if (dtbuf->head >= dtbuf->capacity && dtbuf->tail >= DTBUF_CHUNK_SIZE) { |
108 | | - // there is enough space at the start now, head can cycle |
109 | | - dtbuf->head = 0; |
| 113 | + if (dtbuf->head >= dtbuf->capacity && dtbuf->tail >= DTBUF_CHUNK_SIZE) { |
| 114 | + // there is enough space at the start now, head can cycle |
| 115 | + dtbuf->head = 0; |
| 116 | + } |
| 117 | + } else if (w == -1) { |
| 118 | + perror("write()"); |
110 | 119 | } |
111 | | - } else if (w == -1) { |
112 | | - perror("write()"); |
113 | | - } |
114 | | - return w; |
| 120 | + return w; |
115 | 121 | } |
0 commit comments