MiniBachelor
Makrodefinitionen | Funktionen | Variablen
Farmer.cpp-Dateireferenz
#include <vector>
#include <sstream>
#include <iostream>
#include <fstream>
#include <string>
#include <mutex>
#include <chrono>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include "gen-cpp/Solver.h"
#include "include/Buffer.hpp"
#include "include/Connection.hpp"

Makrodefinitionen

#define CONFIGFILE   "farmer.conf"
 
#define INITIALOPT   0
 

Funktionen

void fillInitialFix (void)
 
bool isValid (vector< bool > fix)
 
void branch (vector< bool > fix)
 
void workerFunction (SolverClient *remote)
 
void farmerFunction (void)
 
void distributor (string fileName)
 
int main (int argc, char *argv[])
 

Variablen

Instance in
 diese Daten werden später an die Worker gesendet Mehr ...
 
int workerCnt
 
int vorbelegung
 
mutex muSol
 
SolveResult sol
 
mutex muJobs
 
long long int jobs
 
long long int jobAdd
 
Buffer< vector< bool > > jobBuffer
 
bool killWorker
 
chrono::steady_clock::time_point go
 Im C++2011 Standard gibt es nicht nur mutex, sondern auch bessere Zeitmessungs-Funktionen. Mehr ...
 
chrono::steady_clock::time_point fin
 
vector< vector< bool > > initalFix
 In dieser Variablen werden alle Vorbelegungen inital abgespeichert. Mehr ...
 

Makro-Dokumentation

#define CONFIGFILE   "farmer.conf"
#define INITIALOPT   0

Dokumentation der Funktionen

void branch ( vector< bool >  fix)

Diese Funktion fügt an eine Vorbelgung eine weitere Vorbelegung hinzu. Wenn diese neue Vorbelegung gültig ist, wird sie dem JobBuffer begefügt. Diese Funktion ist ein Produzent, da sie in den Buffer Jobs ablegt. Es kann etwas verwirrend sein, weil ein Worker-Thread in diesem Beispiel im Grunde sowohl Konsument als auch Produzent ist, weil in einem Worker die branch() Funktion aufgerufen wird. Diese Vermischung ist unüblich.

Parameter
fixist ein Bool-Vektor mit einer Projekt-Vorbelegung
Rückgabe
nichts
118 {
119  int u = fix.size();
120  fix.resize(u + vorbelegung);
121 
122  // jede neue Vorbelegung an die Bisherige anhängen
123  for (vector<bool> & val : initalFix) {
124  for (int i = 0; i < vorbelegung; ++i) fix[u + i] = val[i];
125  // nur gültige Vorbelegungen (Budget-Grenzen!) werden Buffer beigefügt
126  if (isValid(fix) == true) {
127  muJobs.lock();
128  ++jobs;
129  jobBuffer.push(fix);
130  muJobs.unlock();
131  }
132  }
133  muSol.lock();
134  fin = chrono::steady_clock::now();
135  cout << chrono::duration_cast<chrono::seconds>(fin-go).count();
136  cout << "s, branch()" << endl;
137  muSol.unlock();
138 }
chrono::steady_clock::time_point go
Im C++2011 Standard gibt es nicht nur mutex, sondern auch bessere Zeitmessungs-Funktionen.
Definition: Farmer.cpp:56
int vorbelegung
Definition: Farmer.cpp:30
vector< vector< bool > > initalFix
In dieser Variablen werden alle Vorbelegungen inital abgespeichert.
Definition: Farmer.cpp:59
long long int jobs
Definition: Farmer.cpp:45
bool isValid(vector< bool > fix)
Definition: Farmer.cpp:90
Buffer< vector< bool > > jobBuffer
Definition: Farmer.cpp:52
chrono::steady_clock::time_point fin
Definition: Farmer.cpp:56
mutex muJobs
Definition: Farmer.cpp:44
mutex muSol
Definition: Farmer.cpp:37
void distributor ( string  fileName)

Diese Funktion stellt die Verbindung zu allen Workern her und startet zur asynchronen Verarbeitung der Verbindungen Worker-Threads (= Proxies). Ein Farmer-Thread, der primär eine überwachende Funktion über die Worker-Threads und den jobBuffer hat, wird ebenfalls gestartet.

Parameter
fileNameDer Name der Konfigurations-Datei, in der alle Infos zur Verteilung (engl.: Distribution) stehen
Rückgabe
nichts
220 {
221  // config einlesen
222  ifstream file(fileName);
223  if (file.is_open() == false) {
224  throw "missing file or other file error.";
225  }
226  string temp, rawdata;
227  while (getline(file, temp)) rawdata += temp;
228  file.close();
229  stringstream f(rawdata);
230 
231  f >> workerCnt;
232  f >> vorbelegung;
233  f >> in._timeoutMs;
234 
235  // einmal zu Anfang alle möglichen Vorbelegungen erzeugen
236  fillInitialFix();
237 
238  vector<Connection> conn(workerCnt);
239  for (int i=0; i < workerCnt; ++i) {
240  char ip[39]; //ipv6 ?
241  int port;
242  f >> ip;
243  f >> port;
244  cout << "[" << ip << "]:" << port << endl;
245  conn[i].socket = PtrTran(new TSocket(ip, port));
246  conn[i].transport = PtrTran(new TBufferedTransport(conn[i].socket));
247  conn[i].protocol = PtrProt(new TBinaryProtocol(conn[i].transport));
248  conn[i].remote = new SolverClient(conn[i].protocol);
249  // Verbindung zum Worker oeffnen
250  conn[i].transport->open();
251  // Instanz uebermitteln
252  conn[i].remote->setInstance(in);
253  // auf Daten im jobBuffer wartenden Worker-Proxy-Thread erzeugen
254  conn[i].proxy = new thread(workerFunction, conn[i].remote);
255  }
256 
257  // den Farmer-Thread starten
258  thread tFarmer = thread(farmerFunction);
259  // auf das Edne des Farmer-Thread warten
260  tFarmer.join();
261  // auf das Ende alle Worker-Proxy-Threads warten
262  for (int i=0; i < workerCnt; ++i) {
263  conn[i].proxy->join();
264  }
265  // Verbindung schlie/3en und Speicher fuer (nicht shared) Pointer freigeben
266  for (int i=0; i < workerCnt; ++i) {
267  conn[i].transport->close();
268  delete conn[i].remote;
269  delete conn[i].proxy;
270  }
271 }
boost::shared_ptr< TTransport > PtrTran
Definition: Connection.hpp:24
void farmerFunction(void)
Definition: Farmer.cpp:188
boost::shared_ptr< TProtocol > PtrProt
Durch ein typedef lassen sich lange Typen etwas verkürzen.
Definition: Connection.hpp:26
int workerCnt
Definition: Farmer.cpp:29
int vorbelegung
Definition: Farmer.cpp:30
void workerFunction(SolverClient *remote)
Definition: Farmer.cpp:151
void fillInitialFix(void)
Definition: Farmer.cpp:68
Instance in
diese Daten werden später an die Worker gesendet
Definition: Farmer.cpp:28
void farmerFunction ( void  )

Diese Funktion läuft auch als Thread. Sie ist ein Produzent weil sie mit push() in den Buffer Jobs ablegt. Eine kleine Endlos-while-Schleife prüft alle 500000 Microsekunden, ob bereits alle Jobs aus den Buffer erledigt wurden. Ohne das usleep würde dieser Farmer-Thread viel CPU-Zeit kosten.

Rückgabe
nichts
189 {
190  vector<bool> emptyFix;
191  // inital erste Vorbelegungen/Jobs erzeugen
192  branch(emptyFix);
193 
194  while( true ) {
195  muJobs.lock();
196  if(jobs == 0) {
197  muJobs.unlock();
198  break;
199  }
200  muJobs.unlock();
201  usleep(500000);
202  }
203  killWorker = true;
204  /* Mit leeren Jobs die Worker aus dem "auf den buffer warten" aufwecken,
205  * damit diese sich beenden können */
206  for (int i = 0; i < workerCnt; ++i) {
207  jobBuffer.push(emptyFix);
208  }
209 }
int workerCnt
Definition: Farmer.cpp:29
bool killWorker
Definition: Farmer.cpp:53
void branch(vector< bool > fix)
Definition: Farmer.cpp:117
long long int jobs
Definition: Farmer.cpp:45
Buffer< vector< bool > > jobBuffer
Definition: Farmer.cpp:52
mutex muJobs
Definition: Farmer.cpp:44
void fillInitialFix ( void  )

Um in der Funktion branch() nicht jedes mal einen binären Vektor hoch zu zählen, wird dies einmal zu Anfang mit dieser Funktion gemacht und die Werte dem bestehenden fix-Vector nur noch beigefügt. Statt Parameter arbeitet die funktion mit der globalen Variable vorbelegung.

Rückgabe
nichts; arbeitet mit globalen Variablen jobAdd und initialFix
69 {
70  jobAdd = (long long int) 1 << vorbelegung;
71  vector<bool> counter(vorbelegung, true);
72  initalFix.resize(jobAdd);
73  for (long long int n = 0; n < jobAdd; ++n) {
74  initalFix[n] = counter;
75  long long int tmp = 1;
76  for (int i = 0; i < vorbelegung; ++i) {
77  if (((n+1) % tmp) == 0)
78  counter[i] = !(counter[i]);
79  tmp *= 2;
80  }
81  }
82 }
int vorbelegung
Definition: Farmer.cpp:30
vector< vector< bool > > initalFix
In dieser Variablen werden alle Vorbelegungen inital abgespeichert.
Definition: Farmer.cpp:59
long long int jobAdd
Definition: Farmer.cpp:46
bool isValid ( vector< bool >  fix)

Die Funktion isValid() testet einen binären Vektor, ob dieser eine gültige Projekt-Vorbelegung enthält.

Parameter
fixist ein Bool-Vektor mit einer Projekt-Vorbelegung
Rückgabe
bool-Wert ist true, wenn eine Vorbelegung die Budgetgrenzen einhält.
91 {
92  int fi = fix.size();
93  int T = in._budgets.size();
94 
95  int price;
96  for (int t = 0; t < T; ++t) {
97  price = 0;
98  for (int i = 0; i < fi; ++i) {
99  if (fix[i]) price += in._costs[t][i];
100 
101  if (price > in._budgets[t]) return false;
102  }
103  }
104  return true;
105 }
Instance in
diese Daten werden später an die Worker gesendet
Definition: Farmer.cpp:28
int main ( int  argc,
char *  argv[] 
)

Der "Farmer" ist die Client Anwendung, die die in kleine Teilprobleme aufgeteilten Aufgaben für glpk auf die Worker (Server) verteilt.

278 {
279  killWorker = false;
280  sol._optimum = INITIALOPT;
281 
282  if (argc < 2) {
283  cout << "usage: " << argv[0] << " <instance file>" << endl;
284  return 1;
285  }
286 
287  // Instanz aus Datei einlesen ------------
288  ifstream file(argv[1]);
289  if (file.is_open() == false) {
290  throw "missing file or other file error.";
291  }
292  string temp, rawdata;
293  while (getline(file, temp)) rawdata += temp;
294  file.close();
295  stringstream f(rawdata);
296 
297  int N, T, opt;
298 
299  f >> N; f >> T; f >> opt;
300  in._profits.resize(N);
301  for (int n = 0; n < N; ++n) {
302  f >> in._profits[n];
303  }
304  in._costs.resize(T);
305  for (int t = 0; t < T; ++t) {
306  in._costs[t].resize(N);
307  for (int n = 0; n < N; ++n) {
308  f >> in._costs[t][n];
309  }
310  }
311  in._budgets.resize(T);
312  for (int t = 0; t < T; ++t) {
313  f >> in._budgets[t];
314  }
315 
316  // verteilen und loesen ---------
317  go = chrono::steady_clock::now();
319  fin = chrono::steady_clock::now();
320  cout << chrono::duration_cast<chrono::seconds>(fin-go).count() << "s ";
321 
322  // ausgabe ----------------------
323  for (bool val : sol._projects) {
324  val ? cout << "I" : cout << "o";
325  }
326  cout << " Optimum: " << sol._optimum << endl;
327  return 0;
328 }
void distributor(string fileName)
Definition: Farmer.cpp:219
chrono::steady_clock::time_point go
Im C++2011 Standard gibt es nicht nur mutex, sondern auch bessere Zeitmessungs-Funktionen.
Definition: Farmer.cpp:56
SolveResult sol
Definition: Farmer.cpp:38
#define CONFIGFILE
Definition: Farmer.cpp:23
bool killWorker
Definition: Farmer.cpp:53
#define INITIALOPT
Definition: Farmer.cpp:24
chrono::steady_clock::time_point fin
Definition: Farmer.cpp:56
Instance in
diese Daten werden später an die Worker gesendet
Definition: Farmer.cpp:28
void workerFunction ( SolverClient *  remote)

Diese Funktion wird als Thread benutzt und ist ein Proxy. Zu jeder Verbindung zu einem Service eines Workers existiert auf dem Farmer so ein Thread. Solange wie dieser aus jobBuffer Jobs holen kann und die Variable killWorker keinen Abbruch erzeugt, werden Jobs an den Service der Worker zum lösen - also solve() - gesendet. Dieser Thread ist ein Konsoment und durch eine Semaphore in Buffer geschieht beim pop() aufruf nichts, solange im Buffer nichts ist.

Parameter
remoteRepräsentiert den Service des Workers
Rückgabe
nichts
152 {
153  while (true) {
154  vector<bool> fix = jobBuffer.pop();
155  if (killWorker == true) return;
156 
157  SolveResult _return;
158  remote->solve(_return, fix, sol._optimum);
159 
160  if (_return._flag == Flag::OK) {
161  muSol.lock();
162  if (sol._optimum < _return._optimum) {
163  sol = _return;
164  fin = chrono::steady_clock::now();
165  cout << chrono::duration_cast<chrono::seconds>(fin-go).count();
166  cout << "s, new Best: " ;
167  cout << _return._optimum << endl;
168  }
169  muSol.unlock();
170  }
171  if (_return._flag == Flag::TIMEOUT) {
172  branch(fix);
173  }
174  muJobs.lock();
175  --jobs;
176  muJobs.unlock();
177  }
178 }
chrono::steady_clock::time_point go
Im C++2011 Standard gibt es nicht nur mutex, sondern auch bessere Zeitmessungs-Funktionen.
Definition: Farmer.cpp:56
SolveResult sol
Definition: Farmer.cpp:38
bool killWorker
Definition: Farmer.cpp:53
void branch(vector< bool > fix)
Definition: Farmer.cpp:117
long long int jobs
Definition: Farmer.cpp:45
Buffer< vector< bool > > jobBuffer
Definition: Farmer.cpp:52
chrono::steady_clock::time_point fin
Definition: Farmer.cpp:56
mutex muJobs
Definition: Farmer.cpp:44
mutex muSol
Definition: Farmer.cpp:37

Variablen-Dokumentation

chrono::steady_clock::time_point fin
chrono::steady_clock::time_point go

Im C++2011 Standard gibt es nicht nur mutex, sondern auch bessere Zeitmessungs-Funktionen.

Instance in

diese Daten werden später an die Worker gesendet

vector<vector<bool> > initalFix

In dieser Variablen werden alle Vorbelegungen inital abgespeichert.

long long int jobAdd
Buffer<vector<bool> > jobBuffer

Dieser Buffer nimmt alle Jobs auf - also alle Projekt-Vorbelegungen, bei denen die Worker mit glpk eine optimale Lösung finden sollen.

long long int jobs
bool killWorker
mutex muJobs

Die Variable jobs wird ebenfalls von allen Threads ausgelesen und verändert. Dieser kritische Bereich muss ebenfalls geschützt werden.

mutex muSol

Da alle Threads (= unsere Worker-Proxies) auf die sol Variable zugreifen können, muss dieser Zugriff mit einer Mutex-Semaphore geschützt werden.

SolveResult sol
int vorbelegung
int workerCnt