...
 
Commits (8)
......@@ -10,6 +10,8 @@
#include "include/config.h"
#include "include/comms-lib.h"
#include "include/macros.h"
#include "include/utils.h"
Config::Config(const std::string& jsonfile)
{
......
......@@ -11,15 +11,8 @@
#ifndef CONFIG_HEADER
#define CONFIG_HEADER
#include "macros.h"
#include "utils.h"
#include <algorithm>
#include <atomic>
#include <complex.h>
#include <fstream> // std::ifstream
#include <iostream>
#include <stdexcept>
#include <stdio.h> /* for fprintf */
#include <unistd.h>
#include <vector>
#ifdef JSON
#include <nlohmann/json.hpp>
......
......@@ -55,7 +55,7 @@ struct Package {
struct SampleBuffer {
std::vector<char> buffer;
std::vector<bool> pkg_buf_inuse;
std::atomic_int* pkg_buf_inuse;
};
//std::atomic_int thread_count(0);
......
......@@ -22,21 +22,17 @@ struct Radio {
class RadioConfig {
public:
RadioConfig(Config* cfg);
static void* initBSRadio_launch(void* in_context);
void radioConfigure();
void radioStart();
void radioTrigger();
void radioStop();
void readSensors();
~RadioConfig();
void radioTx(const void* const* buffs);
void radioRx(void* const* buffs);
int radioTx(size_t, const void* const* buffs, int flags, long long& frameTime);
int radioRx(size_t, void* const* buffs, long long& frameTime);
void initAGC(SoapySDR::Device* iclSdrs);
void sync_delays(int cellIdx);
~RadioConfig();
void radioStart();
void radioStop();
void radioConfigure();
std::vector<struct Radio> radios;
private:
// use for create pthread
struct RadioConfigContext {
RadioConfig* ptr;
......@@ -45,7 +41,12 @@ public:
};
void initBSRadio(RadioConfigContext* context);
private:
static void* initBSRadio_launch(void* in_context);
void readSensors();
void radioTrigger();
void initAGC(SoapySDR::Device* iclSdrs);
void sync_delays(int cellIdx);
SoapySDR::Device* baseRadio(int cellId);
void collectCSI(bool&);
static void drain_buffers(SoapySDR::Device* ibsSdrs, SoapySDR::Stream* istream, std::vector<void*> buffs, int symSamp);
......
......@@ -10,6 +10,10 @@
*/
#include "include/receiver.h"
#include "include/macros.h"
#include "include/utils.h"
#include <atomic>
#include <unistd.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
......@@ -70,8 +74,6 @@ std::vector<pthread_t> Receiver::startClientThreads()
std::vector<pthread_t> Receiver::startRecvThreads(SampleBuffer* rx_buffer, unsigned in_core_id)
{
assert(rx_buffer[0].buffer.size() == config_->getPackageLength() * rx_buffer[0].pkg_buf_inuse.size());
assert(rx_buffer[0].pkg_buf_inuse.size() != 0);
assert(rx_buffer[0].buffer.size() != 0);
std::vector<pthread_t> created_threads;
......@@ -141,11 +143,11 @@ void Receiver::loopRecv(ReceiverContext* context)
moodycamel::ProducerToken local_ptok(*message_queue_);
const int bsSdrCh = config_->bsChannel.length();
int buffer_frame_num = rx_buffer[0].pkg_buf_inuse.size();
int buffer_chunk_size = rx_buffer[0].buffer.size() / config_->getPackageLength();
// handle two channels at each radio
// this is assuming buffer_frame_num is at least 2
std::vector<bool>& pkg_buf_inuse = rx_buffer[tid].pkg_buf_inuse;
// this is assuming buffer_chunk_size is at least 2
std::atomic_int* pkg_buf_inuse = rx_buffer[tid].pkg_buf_inuse;
char* buffer = rx_buffer[tid].buffer.data();
int num_radios = config_->nBsSdrs[0];
int radio_start = tid * num_radios / thread_num_;
......@@ -157,11 +159,6 @@ void Receiver::loopRecv(ReceiverContext* context)
while (config_->running) {
// receive data
for (int it = radio_start; it < radio_end; it++) {
// if buffer is full, exit
if (pkg_buf_inuse[cursor]) {
printf("thread %d buffer full\n", tid);
exit(0);
}
Package* pkg[bsSdrCh];
void* samp[bsSdrCh];
for (auto ch = 0; ch < bsSdrCh; ++ch) {
......@@ -186,22 +183,31 @@ void Receiver::loopRecv(ReceiverContext* context)
}
#endif
for (auto ch = 0; ch < bsSdrCh; ++ch) {
new (pkg[ch]) Package(frame_id, symbol_id, 0, ant_id + ch);
// move ptr & set status to full
pkg_buf_inuse[cursor + ch] = true; // has data, after it is read it should be set to 0
int bit = 1 << cursor % sizeof(std::atomic_int);
int offs = cursor / sizeof(std::atomic_int);
int old = std::atomic_fetch_or(&pkg_buf_inuse[offs], bit); // now full
// if buffer was full, exit
if (old & bit) {
printf("thread %d buffer full\n", tid);
exit(0);
}
// has data, after it is read it should be set to 0
new (pkg[ch]) Package(frame_id, symbol_id, 0, ant_id + ch);
// push EVENT_RX_SYMBOL event into the queue
Event_data package_message;
package_message.event_type = EVENT_RX_SYMBOL;
// data records the position of this packet in the buffer & tid of this socket
// (so that task thread could know which buffer it should visit)
package_message.data = cursor + ch + tid * buffer_frame_num;
package_message.data = cursor + ch + tid * buffer_chunk_size;
if (!message_queue_->enqueue(local_ptok, package_message)) {
printf("socket message enqueue failed\n");
exit(0);
}
cursor++;
cursor %= buffer_chunk_size;
}
cursor += bsSdrCh;
cursor %= buffer_frame_num;
}
}
}
......
......@@ -9,7 +9,9 @@
*/
#include "include/recorder.h"
#include "include/macros.h"
#include "include/signalHandler.hpp"
#include "include/utils.h"
// buffer length of each rx thread
const int Recorder::SAMPLE_BUFFER_FRAME_NUM = 80;
......@@ -35,9 +37,11 @@ Recorder::Recorder(Config* cfg)
if (rx_thread_num > 0) {
// initialize rx buffers
rx_buffer_ = new SampleBuffer[rx_thread_num];
int intsize = sizeof(std::atomic_int);
int arraysize = (buffer_chunk_size + intsize - 1) / intsize;
for (int i = 0; i < rx_thread_num; i++) {
rx_buffer_[i].buffer.resize(buffer_chunk_size * cfg->getPackageLength());
rx_buffer_[i].pkg_buf_inuse.resize(buffer_chunk_size);
rx_buffer_[i].pkg_buf_inuse = new std::atomic_int[arraysize];
}
}
......@@ -460,6 +464,8 @@ void Recorder::closeHDF5()
Recorder::~Recorder()
{
for (size_t i = 0; i < cfg->rx_thread_num; i++)
delete[] rx_buffer_[i].pkg_buf_inuse;
delete[] rx_buffer_;
}
......@@ -553,9 +559,9 @@ void Recorder::taskThread(EventHandlerContext* context)
// do Crop
herr_t Recorder::record(int, int offset)
{
int buffer_frame_num = cfg->symbolsPerFrame * SAMPLE_BUFFER_FRAME_NUM * cfg->getNumAntennas();
int buffer_id = offset / buffer_frame_num;
offset = offset - buffer_id * buffer_frame_num;
size_t buffer_chunk_size = SAMPLE_BUFFER_FRAME_NUM * cfg->symbolsPerFrame * cfg->getNumAntennas();
int buffer_id = offset / buffer_chunk_size;
offset = offset - buffer_id * buffer_chunk_size;
// read info
char* cur_ptr_buffer = rx_buffer_[buffer_id].buffer.data() + offset * cfg->getPackageLength();
struct Package* pkg = (struct Package*)cur_ptr_buffer;
......@@ -674,6 +680,8 @@ herr_t Recorder::record(int, int offset)
clean_exit:
// after finish
rx_buffer_[buffer_id].pkg_buf_inuse[offset] = false; // now empty
int bit = 1 << offset % sizeof(std::atomic_int);
int offs = offset / sizeof(std::atomic_int);
std::atomic_fetch_and(&rx_buffer_[buffer_id].pkg_buf_inuse[offs], ~bit); // now empty
return 0;
}
......@@ -10,6 +10,7 @@
#include "include/sdr-lib.h"
#include "include/comms-lib.h"
#include "include/macros.h"
#include "include/matplotlibcpp.h"
#include <fstream>
......@@ -200,12 +201,14 @@ void RadioConfig::initBSRadio(RadioConfigContext* context)
if (info["frontend"].find("CBRS") != std::string::npos) {
// receive gains
dev->setGain(SOAPY_SDR_RX, ch, "ATTN", -12); //[-18,0]
dev->setGain(SOAPY_SDR_RX, ch, "LNA1", 33); //[0,33]
if (_cfg->freq > 3e9)
if (_cfg->freq > 3e9) {
dev->setGain(SOAPY_SDR_RX, ch, "ATTN", 0); //[-18,0]
dev->setGain(SOAPY_SDR_RX, ch, "LNA2", 17); //LO[0,17]
else
} else {
dev->setGain(SOAPY_SDR_RX, ch, "ATTN", -12); //[-18,0]
dev->setGain(SOAPY_SDR_RX, ch, "LNA2", 14); //HI[0,14]
}
// transmit gains
if (_cfg->freq > 3e9) { // CBRS HI
......
......@@ -70,7 +70,7 @@ class hdf5_lib:
#compute CSI for each user and get a nice numpy array
#Returns csi with Frame, User, LTS (there are 2), BS ant, Subcarrier
#also, iq samples nic(Last 'user' is noise.)ely chunked out, same dims, but subcarrier is sample.
csi,iq = self.samps2csi(pilot_samples, num_cl, symbol_len, offset=offset)
csi,iq = samps2csi(pilot_samples, num_cl, symbol_len, offset=offset)
# create hdf5 file to dump csi to
h5f = h5py.File(filename[:-5]+'-csi.hdf5', 'w')
......@@ -163,6 +163,8 @@ class hdf5_lib:
ofdm_data_time = [] # np.zeros((num_cl, 320)).astype(complex)
for clIdx in range(num_cl):
this_str = 'OFDM_DATA_TIME_CL' + str(clIdx)
if not this_str in self.metadata.keys():
continue
data_per_cl = np.squeeze(self.metadata[this_str])
# some_list[start:stop:step]
if np.any(data_per_cl):
......@@ -177,6 +179,8 @@ class hdf5_lib:
ofdm_data = [] # np.zeros((num_cl, 320)).astype(complex)
for clIdx in range(num_cl):
this_str = 'OFDM_DATA_CL' + str(clIdx)
if not this_str in self.metadata.keys():
continue
data_per_cl = np.squeeze(self.metadata[this_str])
# some_list[start:stop:step]
if np.any(data_per_cl):
......@@ -189,8 +193,8 @@ class hdf5_lib:
return self.metadata
def samps2csi(self, samps, num_users, samps_per_user=224, fft_size=64, offset=0, bound=94, cp=0):
@staticmethod
def samps2csi(samps, num_users, samps_per_user=224, fft_size=64, offset=0, bound=94, cp=0):
"""Convert an Argos HDF5 log file with raw IQ in to CSI.
Asumes 802.11 style LTS used for trace collection.
......@@ -246,8 +250,8 @@ class hdf5_lib:
csi = np.delete(csi, [0, 1, 2, 3, 4, 5, 32, 59, 60, 61, 62, 63], 5)
return csi, iq
def samps2csi_large(self, samps, num_users, samps_per_user=224, offset=47, chunk_size=1000):
@staticmethod
def samps2csi_large(samps, num_users, samps_per_user=224, offset=47, chunk_size=1000):
"""Wrapper function for samps2csi_main for to speed up large logs by leveraging data-locality. Chunk_size may need to be adjusted based on your computer."""
if samps.shape[0] > chunk_size:
......@@ -260,16 +264,17 @@ class hdf5_lib:
(samps.shape[0], num_users, 2, samps.shape[1], 64), dtype='complex64')
chunk_num = samps.shape[0]//chunk_size
for i in range(chunk_num):
csi[i*chunk_size:i*chunk_size+chunk_size], iq[i*chunk_size:i*chunk_size+chunk_size] = self.samps2csi(
csi[i*chunk_size:i*chunk_size+chunk_size], iq[i*chunk_size:i*chunk_size+chunk_size] = samps2csi(
samps[i*chunk_size:(i*chunk_size+chunk_size), :, :, :], num_users, samps_per_user=samps_per_user)
csi[chunk_num*chunk_size:], iq[chunk_num*chunk_size:] = self.samps2csi(
csi[chunk_num*chunk_size:], iq[chunk_num*chunk_size:] = samps2csi(
samps[chunk_num*chunk_size:, :, :, :], num_users, samps_per_user=samps_per_user)
else:
csi, iq = self.samps2csi(
csi, iq = samps2csi(
samps, num_users, samps_per_user=samps_per_user, offset=offset)
return csi, iq
def csi_from_pilots(self, pilots_dump, z_padding=150, fft_size=64, cp=16, frm_st_idx=0, frame_to_plot=0, ref_ant=0):
@staticmethod
def csi_from_pilots(pilots_dump, z_padding=150, fft_size=64, cp=16, frm_st_idx=0, frame_to_plot=0, ref_ant=0):
"""
Finds the end of the pilots' frames, finds all the lts indices relative to that.
Divides the data with lts sequences, calculates csi per lts, csi per frame, csi total.
......@@ -485,7 +490,8 @@ class hdf5_lib:
return csi, m_filt, sf_start, cmpx_pilots, k_lts, n_lts
# add frame_start for plot indexing!
def frame_sanity(self, match_filt, k_lts, n_lts, st_frame = 0, frame_to_plot = 0, plt_ant=0, cp=16):
@staticmethod
def frame_sanity(match_filt, k_lts, n_lts, st_frame = 0, frame_to_plot = 0, plt_ant=0, cp=16):
"""
Creates a map of the frames per antenna. 3 categories: Good frames, bad frames, probably partial frames.
Good frames are those where all k_lts peaks are present and spaced n_lts samples apart.
......
......@@ -13,6 +13,7 @@
Author(s):
C. Nicolas Barati: nicobarati@rice.edu
Clayton Shepard: cws@rice.edu
Oscar Bejarano: obejarano@rice.edu
Rahman Doost-Mohammady: doost@rice.edu
......@@ -79,12 +80,12 @@ def verify_hdf5(hdf5, default_frame=100, ant_i =0, n_frm_st=0, deep_inspect=Fals
if deep_inspect:
csi_from_pilots_start = time.time()
csi_mat, match_filt, sub_fr_strt, cmpx_pilots, k_lts, n_lts = hdf5.csi_from_pilots(
csi_mat, match_filt, sub_fr_strt, cmpx_pilots, k_lts, n_lts = hdf5_lib.csi_from_pilots(
pilot_samples, z_padding, frm_st_idx=n_frm_st, frame_to_plot=frm_plt, ref_ant=ant_i)
csi_from_pilots_end = time.time()
frame_sanity_start = time.time()
match_filt_clr, frame_map, f_st = hdf5.frame_sanity(match_filt, k_lts, n_lts, n_frm_st, frm_plt, plt_ant=ant_i)
match_filt_clr, frame_map, f_st = hdf5_lib.frame_sanity(match_filt, k_lts, n_lts, n_frm_st, frm_plt, plt_ant=ant_i)
frame_sanity_end = time.time()
print(">>>> csi_from_pilots time: %f \n" % ( csi_from_pilots_end - csi_from_pilots_start) )
......@@ -110,7 +111,7 @@ def verify_hdf5(hdf5, default_frame=100, ant_i =0, n_frm_st=0, deep_inspect=Fals
# CSI: #Frames, #Cell, #Users, #Pilot Rep, #Antennas, #Subcarrier
# For correlation use a fft size of 64
print("*verify_hdf5(): Calling samps2csi with fft_size = 64, offset = {}, bound = cp = 0 *".format(offset))
csi, samps = hdf5.samps2csi(samples, num_cl_tmp, symbol_length, fft_size=64, offset=offset, bound=0, cp=0)
csi, samps = hdf5_lib.samps2csi(samples, num_cl_tmp, symbol_length, fft_size=64, offset=offset, bound=0, cp=0)
# Correlation (Debug plot useful for checking sync)
amps = np.mean(np.abs(samps[:, 0, 0, 0, 0, :]), axis=1)
......@@ -128,7 +129,7 @@ def verify_hdf5(hdf5, default_frame=100, ant_i =0, n_frm_st=0, deep_inspect=Fals
# For looking at the whole picture, use a fft size of whole symbol_length as fft window (for visualization),
# and no offset
print("*verify_hdf5():Calling samps2csi *AGAIN*(?) with fft_size = symbol_length, no offset*")
csi, samps = hdf5.samps2csi(samples, num_cl_tmp, symbol_length, fft_size=symbol_length, offset=0, bound=0, cp=0)
csi, samps = hdf5_lib.samps2csi(samples, num_cl_tmp, symbol_length, fft_size=symbol_length, offset=0, bound=0, cp=0)
# Verify default_frame does not exceed max number of collected frames
ref_frame = min(default_frame - n_frm_st, samps.shape[0])
......@@ -270,7 +271,7 @@ def analyze_hdf5(hdf5, frame=10, cell=0, zoom=0, pl=0):
# compute CSI for each user and get a nice numpy array
# Returns csi with Frame, User, LTS (there are 2), BS ant, Subcarrier
#also, iq samples nicely chunked out, same dims, but subcarrier is sample.
csi, _ = hdf5.samps2csi(pilot_samples, num_cl, symbol_length, offset=offset)
csi, _ = hdf5_lib.samps2csi(pilot_samples, num_cl, symbol_length, offset=offset)
csi = csi[:, cell, :, :, :, :]
# zoom in too look at behavior around peak (and reduce processing time)
if zoom > 0:
......