Concurrent Logic Programming Languages: Concurrent Prolog, Flat Concurrent Prolog, Parlog, Flat Parlog, Guarded Horn Clauses, Flat GHC, Strand, Janus, KL1, PCN Basic structure: • A number of clauses with same name and arity • Each clause has a head, guards and a body p(A¹, ...) :- G¹, ... | B¹, ... p(A², ...) :- G², ... | B², ... p(A³, ...) :- G³, ... | B³, ... "Committed choice" • Once Head matches and guards succeed, no further clauses are considered "Flat" languages • Guards restricted to limited set of predicates OR parallelism • Select clauses in parallel AND parallelism: • Execute B's in parallel PCN ... "(""," ...")" "," ... "}" // sequential | "{||" "," ... "}" // parallel | "{?" "," ... "->" "}" // choice | | | ... /* sequential execution */ main() { x = "hello", writeln(x), writeln("world") } (Definitional) variables • either unbound: access to value blocks current statement • or bound: value never changes • variables can be bound to other variables /* parallel execution */ main() {|| writeln(x), writeln("world"), x = "hello" } Basic execution model • Ring buffer of active processes • Take process, match "head", push statements of parallel composition onto ring-buffer • Repeat Extended execution model • Tail calls: last statement is not pushed but executed, for N steps • Yield: check for I/O, timers, signals, messages Threads • Each HW thread has own process pool, heap, GC • Communication/synchronization between threads done via elaborate message protocol Streams as communication channels • produce stream: s = [x|more] • consume stream: s ?= [x|more] /* producer/consumer */ main() {|| prod([1, 2, 3, 4, 5], s), cons(s) } /* ... producer */ prod(i, o) {? i ?= [x|more] -> { y = x * 2, o = [y|next], prod(more, next) }, default -> o = [] } /* ... consumer */ cons(s) s ?= [x|more] -> { writeln(x), cons(more) } /* producer/consumer + filter */ main() {|| prod([1, 2, 3, 4, 5], s), filter(s, s2), cons(s2) } /* ... filter */ filter(i, o) {? i ?= [x|more] -> // even? if(x & 1 == 0) filter(more, o) else { o = [x|next], filter(more, next) }, default -> o = [] } Variables can be shared my any number of processes /* broadcast */ main() {|| i over 1..10 :: spawn(i, c), c = [] } spawn(i, c) data(c) -> writeln(i) Technique: short circuit • determine that a number of processes completed /* short-circuit */ main() {|| spawn(10, [], r), if(data(r)) writeln("done") } /* ... spawn */ spawn(i, l, r) {? i == 0 -> r = l, default -> {|| spawn(i - 1, l2, r), timer(integer(rnd() * 1000), t, _), if(data(t)) r = l } } spawn(10, [], r) spawn(9, l¹, r) spawn(8, l², r) ... spawn(0, lⁿ, r) → lⁿ = r Task: grep • search text in file and show matching lines • utilize user-defined number of native threads • textual data = list of code points /* grep */ main(_, argv, _) argv ?= [_, str, name|_] -> {|| open_file(name, "r", fd), io:read_lines(fd, lines), string_to_list(str, strl, []), partition(lines, length(lines) / nodes(), strl, r), { g over r :: fmt:format("~s\n", [g]) } } /* ... partition over threads */ partition(lines, n, str, r) {? lines == [] -> r = [], default -> {|| list:cut(n, lines, [], part, rest), search(str, part, r, r2), partition(rest, n, str, r2)@fwd } } /* ... search string in block */ search(needle, haystack, l, r) {? haystack ?= [line|more] -> {|| list:search(needle, line, pos), if(pos == 0) search(needle, more, l, r) else {|| l = [line|l2], search(needle, more, l2, r) } }, default -> l = r } main → [line1, ..., lineN] partition (thread #1): [line1, ..., lineM] → [found1-M, ...|R1] partition (thread #2): [lineM+1, ..., lineK] → R1 = [foundM+1, ...|R2] ... partition (...) → RN = [] Problem: Stream producers may outpace consumers /* runaway stream */ main() {|| fib(0, 1, s), process(s) } fib(n, m, s) { k = n + m, s = [k|s2], fib(m, k, s2) } /* ... process */ process(s) s ?= [n|s2] -> { timer(n * 1000, t, _), if(data(t)) process(s2) } *** ERROR: <1> 37880: 'a.pcn:9$4'/7 - heap exhausted Solution: Make consumer create locations to be filled by producer /* bounded stream */ main() {|| fib(0, 1, i, s), process(i, s) } fib(n, m, i, s) i ?= [v|i2] -> { v = n + m, s = [v|s2], fib(m, k, i2, s2) } /* ... process */ process(i, s) {|| i = [v|i2], {? s ?= [n|s2] -> { timer(n * 1000, t, _), if(data(t)) process(i2, s2) } } } Task: merge information from different sources • watch a number of files • "dashboard": show last line of each file • update every second /* mergers */ main(_, argv, _) argv ?= [_|fs] -> {|| spawn(fs, 1, s), clock(1000, t, _), merger([merge(c)|s], s2), follow(s2, []), wait(t, c) } spawn(fs, n, s) fs ?= [fn|fs2] -> {|| open_file(fn, "r", fd), tail_f(fd, n, fo), io:parse_lines(fo, lo), lines(lo, n, lns), s = [merge(lns)|s2], spawn(fs2, n + 1, s2) } lines(ln, n, lns) ln ?= [l|more] -> {|| lns = [line(n, l)|lns2], lines(more, n, lns2) } tail_f(fd, n, o) {|| listen(fd, in), if(data(in)) { read_file(fd, 256, data, []), if(data == []) { sleep(100), tail_f(fd, n, o) } else {|| list:append(data, next, o), tail_f(fd, n, next) } } } sleep(ms) {|| timer(ms, t, _), {? data(t) -> {} } } follow(s, m) {? s ?= [time(tm)|s2] -> { fmt:format("__~s____\n", [tm]), map:map_to_list(m, ml), x over ml :: {? x ?= {_, n, str} -> fmt:format("~s\n", [str]) }, follow(s2, m) }, s ?= [line(n, str)|s2] -> { map:insert(n, str, m, m2), follow(s2, m2) } } wait(c, s) c ?= [_|c2] -> {|| proc:capture(["date"], tm), s = [time(tm)|s2], wait(c2, s2) } Other PCN features • Easy calls to C functions (call by reference) • Mutable variables and arrays • Uses C preprocessor Differences to FGHC/Strand • FGHC + Strand use Prolog syntax • Compilers generate FLENG • FLENG is translated to asm (ARM, AMD, RISCV) • Simple machine model • No dependencies (cc + binutils + POSIX sh)